Skip to content

Commit 5500f16

Browse files
authored
Merge pull request #6 from BinaryIgor/improvements
Improvements
2 parents e860788 + 5524744 commit 5500f16

35 files changed

Lines changed: 676 additions & 232 deletions

README.md

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have three tables (postgres syntax):
15+
We just need to have a few tables (postgres syntax):
1616

1717
```sql
1818
CREATE TABLE topic (
@@ -21,26 +21,51 @@ CREATE TABLE topic (
2121
created_at TIMESTAMP NOT NULL DEFAULT NOW()
2222
);
2323

24+
CREATE TABLE consumer (
25+
topic TEXT NOT NULL,
26+
name TEXT NOT NULL,
27+
partition SMALLINT NOT NULL,
28+
first_event_id BIGINT,
29+
last_event_id BIGINT,
30+
last_consumption_at TIMESTAMP,
31+
consumed_events BIGINT NOT NULL,
32+
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
33+
PRIMARY KEY (topic, name, partition)
34+
);
35+
2436
CREATE TABLE event (
2537
topic TEXT NOT NULL,
2638
id BIGSERIAL NOT NULL,
2739
partition SMALLINT NOT NULL,
2840
key TEXT,
2941
value BYTEA NOT NULL,
42+
buffered_at TIMESTAMP NOT NULL,
3043
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
3144
metadata JSON NOT NULL,
3245
PRIMARY KEY (topic, id)
3346
) PARTITION BY LIST (topic);
3447

35-
CREATE TABLE consumer (
48+
-- Same schema as event, just not partitioned. --
49+
-- It is used to handle eventual consistency of auto increment; --
50+
-- there is no guarantee that record of id 2 is visible after id 1 record. --
51+
-- Events are first inserted to the event_buffer; --
52+
-- they are then moved to event table in bulk, by a single, serialized writer; --
53+
-- because there is only one writer, it fixes eventual consistency issue --
54+
CREATE TABLE event_buffer (
3655
topic TEXT NOT NULL,
37-
name TEXT NOT NULL,
56+
id BIGSERIAL PRIMARY KEY,
3857
partition SMALLINT NOT NULL,
39-
last_event_id BIGINT,
40-
last_consumption_at TIMESTAMP,
58+
key TEXT,
59+
value BYTEA NOT NULL,
4160
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
42-
PRIMARY KEY (topic, name, partition)
61+
metadata JSON NOT NULL
62+
);
63+
-- Used to lock single event_buffer to event writer; --
64+
-- there cannot be more than one record of this table! --
65+
CREATE TABLE event_buffer_lock (
66+
created_at TIMESTAMP NOT NULL DEFAULT NOW()
4367
);
68+
INSERT INTO event_buffer_lock VALUES (DEFAULT);
4469
```
4570

4671
To consume messages, we just need to periodically (every one to a few seconds) do:
@@ -69,7 +94,7 @@ COMMIT;
6994
Optionally, to increase throughput & concurrency, we might have a partitioned topic and consumers (-1 partition standing
7095
for not partitioned topic/consumer/event).
7196

72-
Distribution of partitioned events is the sole responsibility of the publisher.
97+
Distribution of partitioned events is the sole responsibility of a publisher.
7398

7499
Consumption of such events per partition (0 in an example) might look like this:
75100

@@ -105,14 +130,14 @@ them:
105130
```java
106131

107132
import com.binaryigor.eventsql.EventSQL;
108-
import javax.sql.DataSource;
109133
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
110134
// as of now, only POSTGRES has fully tested support;
111135
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
112-
import org.jooq.SQLDialect;
136+
import com.binaryigor.eventsql.EventSQLDialect;
137+
import javax.sql.DataSource;
113138

114-
var eventSQL = new EventSQL(dataSource, SQLDialect.POSTGRES);
115-
ver shardedEventSQL = new EventSQL(dataSources, SQLDialect.POSTGRES);
139+
var eventSQL = new EventSQL(dataSource, EventSQLDialect.POSTGRES);
140+
ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
116141
```
117142

118143
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
@@ -223,7 +248,7 @@ var consumers = eventSQL.consumers();
223248
consumers.startConsumer("txt_topic", "single-consumer", event -> {
224249
// handle single event
225250
});
226-
// with more frequent polling - by default it is 1 second
251+
// with more frequent polling - by default it is 0.5 second
227252
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
228253
// handle single event
229254
}, Duration.ofMillis(100));

benchmarks/app/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export run_cmd="docker run -d \\
4646
-e DB0_URL=\"$DB0_URL\" -e DB0_ENABLED=\"$DB0_ENABLED\" \\
4747
-e DB1_URL=\"$DB1_URL\" -e DB1_ENABLED=\"$DB1_ENABLED\" \\
4848
-e DB2_URL=\"$DB2_URL\" -e DB2_ENABLED=\"$DB2_ENABLED\" \\
49-
--network host --restart unless-stopped \\
49+
--network host \\
5050
--name $app $tagged_image"
5151

5252
cd ..

benchmarks/app/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.binaryigor.eventsql.benchmarks;
22

33
import com.binaryigor.eventsql.EventSQL;
4+
import com.binaryigor.eventsql.EventSQLDialect;
45
import com.zaxxer.hikari.HikariConfig;
56
import com.zaxxer.hikari.HikariDataSource;
6-
import org.jooq.SQLDialect;
77
import org.springframework.boot.SpringApplication;
88
import org.springframework.boot.autoconfigure.SpringBootApplication;
99
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,7 +25,7 @@ EventSQL eventSQL(EventsProperties eventsProperties) {
2525
.filter(EventsProperties.DataSourceProperties::enabled)
2626
.map(this::dataSource)
2727
.toList();
28-
return new EventSQL(dataSources, SQLDialect.POSTGRES);
28+
return new EventSQL(dataSources, EventSQLDialect.POSTGRES);
2929
}
3030

3131
private DataSource dataSource(EventsProperties.DataSourceProperties properties) {

benchmarks/events-db/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export tag=$tag
3333
export run_cmd="docker run -d \\
3434
-e \"POSTGRES_USER=postgres\" -e \"POSTGRES_PASSWORD=postgres\" \\
3535
--memory ${memory_limit} --cpus ${cpus_limit} \
36-
--network host ${volume} --restart unless-stopped --name $app $tagged_image"
36+
--network host ${volume} --name $app $tagged_image"
3737

3838
cd ..
3939
envsubst '${app} ${tag}' < scripts/template_load_and_run_app.bash > "$app/dist/load_and_run_app.bash"

benchmarks/events-db/init_db.sql

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,41 @@ CREATE TABLE topic (
1212
created_at TIMESTAMP NOT NULL DEFAULT NOW()
1313
);
1414

15+
CREATE TABLE consumer (
16+
topic TEXT NOT NULL,
17+
name TEXT NOT NULL,
18+
partition SMALLINT NOT NULL,
19+
first_event_id BIGINT,
20+
last_event_id BIGINT,
21+
last_consumption_at TIMESTAMP,
22+
consumed_events BIGINT NOT NULL,
23+
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
24+
PRIMARY KEY (topic, name, partition)
25+
);
26+
1527
CREATE TABLE event (
1628
topic TEXT NOT NULL,
1729
id BIGSERIAL NOT NULL,
1830
partition SMALLINT NOT NULL,
1931
key TEXT,
2032
value BYTEA NOT NULL,
33+
buffered_at TIMESTAMP NOT NULL,
2134
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
2235
metadata JSON NOT NULL,
2336
PRIMARY KEY (topic, id)
2437
) PARTITION BY LIST (topic);
2538

26-
CREATE TABLE consumer (
39+
CREATE TABLE event_buffer (
2740
topic TEXT NOT NULL,
28-
name TEXT NOT NULL,
41+
id BIGSERIAL PRIMARY KEY,
2942
partition SMALLINT NOT NULL,
30-
last_event_id BIGINT,
31-
last_consumption_at TIMESTAMP,
43+
key TEXT,
44+
value BYTEA NOT NULL,
3245
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
33-
PRIMARY KEY (topic, name, partition)
46+
metadata JSON NOT NULL
47+
);
48+
49+
CREATE TABLE event_buffer_lock (
50+
created_at TIMESTAMP NOT NULL DEFAULT NOW()
3451
);
52+
INSERT INTO event_buffer_lock VALUES (DEFAULT);

benchmarks/runner/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export app=$app
3131
export tag=$tag
3232
export run_cmd="docker run -d \\
3333
-e EVENTS_RATE -e EVENTS_TO_PUBLISH -e RUNNER_INSTANCES \\
34-
--memory 2G --cpus 2 \\
34+
--memory 2G --cpus 4 \\
3535
--network host \\
3636
--name $app $tagged_image"
3737

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@
33
import com.binaryigor.eventsql.*;
44
import com.zaxxer.hikari.HikariConfig;
55
import com.zaxxer.hikari.HikariDataSource;
6-
import org.jooq.SQLDialect;
76

87
import javax.sql.DataSource;
98
import java.math.BigDecimal;
109
import java.math.RoundingMode;
1110
import java.nio.charset.StandardCharsets;
1211
import java.sql.ResultSet;
13-
import java.time.Clock;
1412
import java.time.Duration;
1513
import java.time.Instant;
1614
import java.time.LocalDateTime;
@@ -24,11 +22,11 @@ public class EventSQLBenchmarksRunner {
2422
static final String DB_URL = envValueOrDefault("DB_URL", "jdbc:postgresql://localhost:5432/events");
2523
static final String DB_USERNAME = envValueOrDefault("DB_URL", "events");
2624
static final String DB_PASSWORD = envValueOrDefault("DB_PASSWORD", "events");
27-
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 25);
28-
static final SQLDialect SQL_DIALECT = SQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
25+
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 50);
26+
static final EventSQLDialect SQL_DIALECT = EventSQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
2927
static final int RUNNER_INSTANCES = envIntValueOrDefault("RUNNER_INSTANCES", 1);
30-
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 10_000);
31-
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1000);
28+
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 60_000);
29+
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1_000);
3230
static final String TEST_TOPIC = envValueOrDefault("TEST_TOPIC", "account_created");
3331
static final String TEST_CONSUMER = envValueOrDefault("TEST_CONSUMER", "benchmarks-consumer");
3432

@@ -41,7 +39,7 @@ public static void main(String[] args) throws Exception {
4139

4240
var dataSource = dataSource(DB_URL, DB_USERNAME, DB_PASSWORD);
4341

44-
var eventSQL = new EventSQL(dataSource, SQL_DIALECT, Clock.systemUTC());
42+
var eventSQL = new EventSQL(dataSource, SQL_DIALECT);
4543

4644
printDelimiter();
4745

@@ -62,6 +60,7 @@ public static void main(String[] args) throws Exception {
6260
var start = System.currentTimeMillis();
6361

6462
publishEvents(eventSQL.publisher());
63+
waitForPublishBufferToEmpty(dataSource);
6564
var publicationDuration = Duration.ofMillis(System.currentTimeMillis() - start);
6665

6766
printDelimiter();
@@ -232,6 +231,25 @@ static void awaitForFutures(List<Future<?>> futures) {
232231
});
233232
}
234233

234+
static void waitForPublishBufferToEmpty(DataSource dataSource) {
235+
while (true) {
236+
try {
237+
var bufferSize = executeQuery(dataSource, "SELECT COUNT(*) FROM event_buffer", r -> {
238+
if (r.next()) {
239+
return r.getInt(1);
240+
}
241+
return 0;
242+
});
243+
if (bufferSize == 0) {
244+
break;
245+
}
246+
Thread.sleep(250);
247+
} catch (Exception e) {
248+
e.printStackTrace();
249+
}
250+
}
251+
}
252+
235253
static void waitForConsumers(DataSource dataSource, ConsumerDefinition consumerDefinition) throws Exception {
236254
var eventsStats = eventTableStats(dataSource);
237255
while (true) {
@@ -260,7 +278,7 @@ static void waitForConsumers(DataSource dataSource, ConsumerDefinition consumerD
260278
break;
261279
}
262280

263-
Thread.sleep(1000);
281+
Thread.sleep(500);
264282
}
265283
}
266284

@@ -277,7 +295,6 @@ record EventTableStats(long lastId,
277295
throw new IllegalArgumentException("lastIdsPerPartition cannot be null or empty");
278296
}
279297
}
280-
281298
}
282299

283300
record ConsumerTableStats(Map<Integer, Long> lastIdsPerPartition) {

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package com.binaryigor.eventsql;
22

3+
import com.binaryigor.eventsql.internal.DefaultEventSQLConsumers;
4+
import com.binaryigor.eventsql.internal.DefaultEventSQLPublisher;
35
import com.binaryigor.eventsql.internal.DefaultEventSQLRegistry;
4-
import com.binaryigor.eventsql.internal.EventSQLOps;
56
import com.binaryigor.eventsql.internal.TopicDefinitionsCache;
67
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLConsumers;
78
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLPublisher;
89
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLRegistry;
9-
import com.binaryigor.eventsql.internal.sql.SqlConsumerRepository;
10-
import com.binaryigor.eventsql.internal.sql.SqlEventRepository;
11-
import com.binaryigor.eventsql.internal.sql.SqlTopicRepository;
12-
import com.binaryigor.eventsql.internal.sql.SqlTransactions;
10+
import com.binaryigor.eventsql.internal.sql.SQLConsumerRepository;
11+
import com.binaryigor.eventsql.internal.sql.SQLEventRepository;
12+
import com.binaryigor.eventsql.internal.sql.SQLTopicRepository;
13+
import com.binaryigor.eventsql.internal.sql.SQLTransactions;
1314
import org.jooq.SQLDialect;
1415
import org.jooq.impl.DSL;
1516

1617
import javax.sql.DataSource;
1718
import java.time.Clock;
19+
import java.time.Duration;
1820
import java.util.ArrayList;
1921
import java.util.Collection;
2022
import java.util.List;
@@ -23,25 +25,31 @@
2325

2426
public class EventSQL {
2527

28+
public static final int DEFAULT_FLUSH_PUBLISH_BUFFER_SIZE = 500;
29+
// delay is applied only if there is no more records to flush; see DefaultEventSQLPublisher.startFlushPublishBufferThread()
30+
public static final Duration DEFAULT_FLUSH_PUBLISH_BUFFER_DELAY = Duration.ofMillis(250);
2631
private final EventSQLRegistry registry;
2732
private final EventSQLPublisher publisher;
2833
private final EventSQLConsumers consumers;
2934

30-
public EventSQL(DataSource dataSource, SQLDialect sqlDialect) {
35+
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect) {
3136
this(dataSource, sqlDialect, Clock.systemUTC());
3237
}
3338

34-
public EventSQL(DataSource dataSource, SQLDialect sqlDialect, Clock clock) {
39+
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect, Clock clock) {
3540
this(List.of(dataSource), sqlDialect, clock);
3641
}
3742

38-
public EventSQL(Collection<DataSource> dataSources, SQLDialect sqlDialect) {
43+
public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect) {
3944
this(dataSources, sqlDialect, Clock.systemUTC());
4045
}
4146

42-
public EventSQL(Collection<DataSource> dataSources,
43-
SQLDialect sqlDialect,
44-
Clock clock) {
47+
public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect, Clock clock) {
48+
this(dataSources, sqlDialect, clock, DEFAULT_FLUSH_PUBLISH_BUFFER_SIZE, DEFAULT_FLUSH_PUBLISH_BUFFER_DELAY);
49+
}
50+
51+
public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect, Clock clock,
52+
int flushPublishBufferSize, Duration flushPublishBufferDelay) {
4553
if (dataSources.isEmpty()) {
4654
throw new IllegalArgumentException("At least one data source is required");
4755
}
@@ -53,25 +61,29 @@ public EventSQL(Collection<DataSource> dataSources,
5361
System.setProperty("org.jooq.no-logo", "true");
5462
System.setProperty("org.jooq.no-tips", "true");
5563

64+
var jooqDialect = SQLDialect.valueOf(sqlDialect.name());
65+
5666
dataSources.forEach(dataSource -> {
57-
var dslContext = DSL.using(dataSource, sqlDialect);
58-
var transactions = new SqlTransactions(dslContext);
67+
var dslContext = DSL.using(dataSource, jooqDialect);
68+
var transactions = new SQLTransactions(dslContext);
5969

60-
var topicRepository = new SqlTopicRepository(transactions);
61-
var consumerRepository = new SqlConsumerRepository(transactions);
62-
var eventRepository = new SqlEventRepository(transactions, sqlDialect);
70+
var topicRepository = new SQLTopicRepository(transactions);
71+
var consumerRepository = new SQLConsumerRepository(transactions);
72+
var eventRepository = new SQLEventRepository(transactions, transactions, sqlDialect);
6373

6474
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
6575

6676
var topicDefinitionsCache = new TopicDefinitionsCache(topicRepository);
67-
var ops = new EventSQLOps(topicDefinitionsCache, transactions, consumerRepository, eventRepository, clock);
77+
var publisher = new DefaultEventSQLPublisher(topicDefinitionsCache, transactions,
78+
eventRepository, flushPublishBufferSize, flushPublishBufferDelay);
79+
var consumers = new DefaultEventSQLConsumers(topicDefinitionsCache, transactions,
80+
consumerRepository, eventRepository, publisher, clock);
6881

6982
registryList.add(registry);
70-
publisherList.add(ops);
71-
consumersList.add(ops);
83+
publisherList.add(publisher);
84+
consumersList.add(consumers);
7285
});
7386

74-
7587
if (dataSources.size() == 1) {
7688
registry = registryList.getFirst();
7789
publisher = publisherList.getFirst();

0 commit comments

Comments
 (0)