Skip to content

Commit 95aa04a

Browse files
committed
Improvements
1 parent e860788 commit 95aa04a

17 files changed

Lines changed: 64 additions & 57 deletions

File tree

README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,8 @@ import javax.sql.DataSource;
109109
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
110110
// as of now, only POSTGRES has fully tested support;
111111
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
112-
import org.jooq.SQLDialect;
113-
114-
var eventSQL = new EventSQL(dataSource, SQLDialect.POSTGRES);
115-
ver shardedEventSQL = new EventSQL(dataSources, SQLDialect.POSTGRES);
112+
var eventSQL = new EventSQL(dataSource, EventSQLDialect.POSTGRES);
113+
ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
116114
```
117115

118116
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.

TODO.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@
77
* JavaDocs
88
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
99
* using JOOQ tradeoffs?
10+
* custom enums for db dialect - hide JOOQ usage!
1011
* full MySQL/MariaDB support
12+
* randomize delay option to increase parallelism

benchmarks/app/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.binaryigor.eventsql.benchmarks;
22

33
import com.binaryigor.eventsql.EventSQL;
4+
import com.binaryigor.eventsql.EventSQLDialect;
45
import com.zaxxer.hikari.HikariConfig;
56
import com.zaxxer.hikari.HikariDataSource;
6-
import org.jooq.SQLDialect;
77
import org.springframework.boot.SpringApplication;
88
import org.springframework.boot.autoconfigure.SpringBootApplication;
99
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,7 +25,7 @@ EventSQL eventSQL(EventsProperties eventsProperties) {
2525
.filter(EventsProperties.DataSourceProperties::enabled)
2626
.map(this::dataSource)
2727
.toList();
28-
return new EventSQL(dataSources, SQLDialect.POSTGRES);
28+
return new EventSQL(dataSources, EventSQLDialect.POSTGRES);
2929
}
3030

3131
private DataSource dataSource(EventsProperties.DataSourceProperties properties) {

benchmarks/app/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ spring:
77
true
88

99
server:
10-
port: 8080
10+
port: 8090
1111

1212
management:
1313
endpoints:

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.binaryigor.eventsql.*;
44
import com.zaxxer.hikari.HikariConfig;
55
import com.zaxxer.hikari.HikariDataSource;
6-
import org.jooq.SQLDialect;
76

87
import javax.sql.DataSource;
98
import java.math.BigDecimal;
@@ -25,7 +24,7 @@ public class EventSQLBenchmarksRunner {
2524
static final String DB_USERNAME = envValueOrDefault("DB_URL", "events");
2625
static final String DB_PASSWORD = envValueOrDefault("DB_PASSWORD", "events");
2726
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 25);
28-
static final SQLDialect SQL_DIALECT = SQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
27+
static final EventSQLDialect SQL_DIALECT = EventSQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
2928
static final int RUNNER_INSTANCES = envIntValueOrDefault("RUNNER_INSTANCES", 1);
3029
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 10_000);
3130
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1000);

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLConsumers;
77
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLPublisher;
88
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLRegistry;
9-
import com.binaryigor.eventsql.internal.sql.SqlConsumerRepository;
10-
import com.binaryigor.eventsql.internal.sql.SqlEventRepository;
11-
import com.binaryigor.eventsql.internal.sql.SqlTopicRepository;
12-
import com.binaryigor.eventsql.internal.sql.SqlTransactions;
9+
import com.binaryigor.eventsql.internal.sql.SQLConsumerRepository;
10+
import com.binaryigor.eventsql.internal.sql.SQLEventRepository;
11+
import com.binaryigor.eventsql.internal.sql.SQLTopicRepository;
12+
import com.binaryigor.eventsql.internal.sql.SQLTransactions;
1313
import org.jooq.SQLDialect;
1414
import org.jooq.impl.DSL;
1515

@@ -27,20 +27,20 @@ public class EventSQL {
2727
private final EventSQLPublisher publisher;
2828
private final EventSQLConsumers consumers;
2929

30-
public EventSQL(DataSource dataSource, SQLDialect sqlDialect) {
30+
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect) {
3131
this(dataSource, sqlDialect, Clock.systemUTC());
3232
}
3333

34-
public EventSQL(DataSource dataSource, SQLDialect sqlDialect, Clock clock) {
34+
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect, Clock clock) {
3535
this(List.of(dataSource), sqlDialect, clock);
3636
}
3737

38-
public EventSQL(Collection<DataSource> dataSources, SQLDialect sqlDialect) {
38+
public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect) {
3939
this(dataSources, sqlDialect, Clock.systemUTC());
4040
}
4141

4242
public EventSQL(Collection<DataSource> dataSources,
43-
SQLDialect sqlDialect,
43+
EventSQLDialect sqlDialect,
4444
Clock clock) {
4545
if (dataSources.isEmpty()) {
4646
throw new IllegalArgumentException("At least one data source is required");
@@ -53,13 +53,15 @@ public EventSQL(Collection<DataSource> dataSources,
5353
System.setProperty("org.jooq.no-logo", "true");
5454
System.setProperty("org.jooq.no-tips", "true");
5555

56+
var jooqDialect = SQLDialect.valueOf(sqlDialect.name());
57+
5658
dataSources.forEach(dataSource -> {
57-
var dslContext = DSL.using(dataSource, sqlDialect);
58-
var transactions = new SqlTransactions(dslContext);
59+
var dslContext = DSL.using(dataSource, jooqDialect);
60+
var transactions = new SQLTransactions(dslContext);
5961

60-
var topicRepository = new SqlTopicRepository(transactions);
61-
var consumerRepository = new SqlConsumerRepository(transactions);
62-
var eventRepository = new SqlEventRepository(transactions, sqlDialect);
62+
var topicRepository = new SQLTopicRepository(transactions);
63+
var consumerRepository = new SQLConsumerRepository(transactions);
64+
var eventRepository = new SQLEventRepository(transactions, jooqDialect);
6365

6466
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
6567

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.binaryigor.eventsql;
2+
3+
public enum EventSQLDialect {
4+
POSTGRES, MYSQL, MARIADB
5+
}

src/main/java/com/binaryigor/eventsql/internal/sql/DslContextProvider.java renamed to src/main/java/com/binaryigor/eventsql/internal/sql/DSLContextProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
import org.jooq.DSLContext;
44

5-
public interface DslContextProvider {
5+
public interface DSLContextProvider {
66
DSLContext get();
77
}

src/main/java/com/binaryigor/eventsql/internal/sql/SqlConsumerRepository.java renamed to src/main/java/com/binaryigor/eventsql/internal/sql/SQLConsumerRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@
1313
import java.util.List;
1414
import java.util.Optional;
1515

16-
public class SqlConsumerRepository implements ConsumerRepository {
16+
public class SQLConsumerRepository implements ConsumerRepository {
1717

1818
private static final Table<?> CONSUMER = DSL.table("consumer");
1919
private static final Field<String> TOPIC = DSL.field("topic", String.class);
2020
private static final Field<String> NAME = DSL.field("name", String.class);
2121
private static final Field<Short> PARTITION = DSL.field("partition", Short.class);
2222
private static final Field<Long> LAST_EVENT_ID = DSL.field("last_event_id", Long.class);
2323
private static final Field<Instant> LAST_CONSUMPTION_AT = DSL.field("last_consumption_at", Instant.class);
24-
private final DslContextProvider contextProvider;
24+
private final DSLContextProvider contextProvider;
2525

26-
public SqlConsumerRepository(DslContextProvider contextProvider) {
26+
public SQLConsumerRepository(DSLContextProvider contextProvider) {
2727
this.contextProvider = contextProvider;
2828
}
2929

src/main/java/com/binaryigor/eventsql/internal/sql/SqlEventRepository.java renamed to src/main/java/com/binaryigor/eventsql/internal/sql/SQLEventRepository.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@
1414
import java.util.Collection;
1515
import java.util.List;
1616

17-
public class SqlEventRepository implements EventRepository {
17+
public class SQLEventRepository implements EventRepository {
1818

19-
private static final Logger logger = LoggerFactory.getLogger(SqlEventRepository.class);
19+
private static final Logger logger = LoggerFactory.getLogger(SQLEventRepository.class);
2020
private static final Table<?> EVENT = DSL.table("event");
2121
private static final Field<String> TOPIC = DSL.field("topic", String.class);
2222
private static final Field<Long> ID = DSL.field("id", Long.class);
2323
private static final Field<Short> PARTITION = DSL.field("partition", Short.class);
2424
private static final Field<String> KEY = DSL.field("key", String.class);
2525
private static final Field<byte[]> VALUE = DSL.field("value", byte[].class);
2626
private static final Field<JSON> METADATA = DSL.field("metadata", JSON.class);
27-
private final DslContextProvider contextProvider;
27+
private final DSLContextProvider contextProvider;
2828
private final SQLDialect dialect;
2929

30-
public SqlEventRepository(DslContextProvider contextProvider, SQLDialect dialect) {
30+
public SQLEventRepository(DSLContextProvider contextProvider, SQLDialect dialect) {
3131
this.contextProvider = contextProvider;
3232
this.dialect = dialect;
3333
}
@@ -80,7 +80,7 @@ public void createAll(Collection<EventInput> events) {
8080

8181
events.forEach(ei -> {
8282
var e = ei.publication();
83-
var metadataJSON = SimpleJsonMapper.toJson(e.metadata());
83+
var metadataJSON = SimpleJSONMapper.toJSON(e.metadata());
8484
insert.values(e.topic(), ei.partition(), e.key(), e.value(), JSON.valueOf(metadataJSON));
8585
});
8686

0 commit comments

Comments
 (0)