Skip to content

Commit 5524744

Browse files
committed
Buffer and lock idea
1 parent ffb6ef8 commit 5524744

23 files changed

Lines changed: 572 additions & 256 deletions

README.md

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have three tables (postgres syntax):
15+
We just need to have a few tables (postgres syntax):
1616

1717
```sql
1818
CREATE TABLE topic (
@@ -21,28 +21,51 @@ CREATE TABLE topic (
2121
created_at TIMESTAMP NOT NULL DEFAULT NOW()
2222
);
2323

24+
CREATE TABLE consumer (
25+
topic TEXT NOT NULL,
26+
name TEXT NOT NULL,
27+
partition SMALLINT NOT NULL,
28+
first_event_id BIGINT,
29+
last_event_id BIGINT,
30+
last_consumption_at TIMESTAMP,
31+
consumed_events BIGINT NOT NULL,
32+
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
33+
PRIMARY KEY (topic, name, partition)
34+
);
35+
2436
CREATE TABLE event (
2537
topic TEXT NOT NULL,
2638
id BIGSERIAL NOT NULL,
2739
partition SMALLINT NOT NULL,
2840
key TEXT,
2941
value BYTEA NOT NULL,
42+
buffered_at TIMESTAMP NOT NULL,
3043
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
3144
metadata JSON NOT NULL,
3245
PRIMARY KEY (topic, id)
3346
) PARTITION BY LIST (topic);
3447

35-
CREATE TABLE consumer (
48+
-- Same schema as event, just not partitioned. --
49+
-- It is used to handle eventual consistency of auto increment; --
50+
-- there is no guarantee that record of id 2 is visible after id 1 record. --
51+
-- Events are first inserted to the event_buffer; --
52+
-- they are then moved to event table in bulk, by a single, serialized writer; --
53+
-- because there is only one writer, it fixes eventual consistency issue --
54+
CREATE TABLE event_buffer (
3655
topic TEXT NOT NULL,
37-
name TEXT NOT NULL,
56+
id BIGSERIAL PRIMARY KEY,
3857
partition SMALLINT NOT NULL,
39-
first_event_id BIGINT,
40-
last_event_id BIGINT,
41-
last_consumption_at TIMESTAMP,
42-
consumed_events BIGINT NOT NULL,
58+
key TEXT,
59+
value BYTEA NOT NULL,
4360
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
44-
PRIMARY KEY (topic, name, partition)
61+
metadata JSON NOT NULL
62+
);
63+
-- Used to lock single event_buffer to event writer; --
64+
-- there cannot be more than one record of this table! --
65+
CREATE TABLE event_buffer_lock (
66+
created_at TIMESTAMP NOT NULL DEFAULT NOW()
4567
);
68+
INSERT INTO event_buffer_lock VALUES (DEFAULT);
4669
```
4770

4871
To consume messages, we just need to periodically (every one to a few seconds) do:
@@ -56,9 +79,6 @@ FOR UPDATE SKIP LOCKED;
5679

5780
SELECT * FROM event
5881
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
59-
-- eventual consistency of auto increment; there is no guarantee that record of id 2 is visible after id 1 record --
60-
-- in the implementation, we set insert statements timeout to '250 ms' so it is safe --
61-
AND created_at < (NOW() - interval '333 ms')
6282
ORDER BY id LIMIT :limit;
6383

6484
(process events)
@@ -74,7 +94,7 @@ COMMIT;
7494
Optionally, to increase throughput & concurrency, we might have a partitioned topic and consumers (-1 partition standing
7595
for not partitioned topic/consumer/event).
7696

77-
Distribution of partitioned events is the sole responsibility of the publisher.
97+
Distribution of partitioned events is the sole responsibility of a publisher.
7898

7999
Consumption of such events per partition (0 in an example) might look like this:
80100

@@ -87,9 +107,6 @@ FOR UPDATE SKIP LOCKED;
87107

88108
SELECT * FROM event
89109
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
90-
-- eventual consistency of auto increment; there is no guarantee that record of id 2 is visible after id 1 record --
91-
-- in the implementation, we set insert statements timeout to '250 ms' so it is safe --
92-
AND created_at < (NOW() - interval '333 ms')
93110
ORDER BY id LIMIT :limit;
94111

95112
(process events)
@@ -107,20 +124,20 @@ definition has. It's a rather acceptable tradeoff and easy to enforce at the lib
107124

108125
## How to use it
109126

110-
`EventSQL` is an entrypoint to the whole library. It requires single data source properties or a list of
127+
`EventSQL` is an entrypoint to the whole library. It requires standard Java `javax.sql.DataSource` or a list of
111128
them:
112129

113130
```java
131+
114132
import com.binaryigor.eventsql.EventSQL;
115-
// EventSQL.Dialect is a dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
133+
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
116134
// as of now, only POSTGRES has fully tested support;
117135
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
118-
var eventSQL = EventSQL.of(new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "dbUrl", "dbUsername", "dbPassword"));
119-
ver shardedEventSQL = EventSQL.of(
120-
List.of(
121-
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db0Url", "db0Username", "db0Password"),
122-
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db1Url", "db1Username", "db1Password"),
123-
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db2Url", "db2Username", "db2Password")));
136+
import com.binaryigor.eventsql.EventSQLDialect;
137+
import javax.sql.DataSource;
138+
139+
var eventSQL = new EventSQL(dataSource, EventSQLDialect.POSTGRES);
140+
ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
124141
```
125142

126143
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
@@ -231,7 +248,7 @@ var consumers = eventSQL.consumers();
231248
consumers.startConsumer("txt_topic", "single-consumer", event -> {
232249
// handle single event
233250
});
234-
// with more frequent polling - by default it is 1 second
251+
// with more frequent polling - by default it is 0.5 second
235252
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
236253
// handle single event
237254
}, Duration.ofMillis(100));

benchmarks/app/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export run_cmd="docker run -d \\
4646
-e DB0_URL=\"$DB0_URL\" -e DB0_ENABLED=\"$DB0_ENABLED\" \\
4747
-e DB1_URL=\"$DB1_URL\" -e DB1_ENABLED=\"$DB1_ENABLED\" \\
4848
-e DB2_URL=\"$DB2_URL\" -e DB2_ENABLED=\"$DB2_ENABLED\" \\
49-
--network host --restart unless-stopped \\
49+
--network host \\
5050
--name $app $tagged_image"
5151

5252
cd ..
Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package com.binaryigor.eventsql.benchmarks;
22

33
import com.binaryigor.eventsql.EventSQL;
4+
import com.binaryigor.eventsql.EventSQLDialect;
5+
import com.zaxxer.hikari.HikariConfig;
6+
import com.zaxxer.hikari.HikariDataSource;
47
import org.springframework.boot.SpringApplication;
58
import org.springframework.boot.autoconfigure.SpringBootApplication;
69
import org.springframework.boot.context.properties.EnableConfigurationProperties;
710
import org.springframework.context.annotation.Bean;
811

12+
import javax.sql.DataSource;
13+
914
@SpringBootApplication
1015
@EnableConfigurationProperties(EventsProperties.class)
1116
public class EventSQLBenchmarksApp {
@@ -16,11 +21,20 @@ public static void main(String[] args) {
1621

1722
@Bean
1823
EventSQL eventSQL(EventsProperties eventsProperties) {
19-
var dataSourceProperties = eventsProperties.dataSources().stream()
24+
var dataSources = eventsProperties.dataSources().stream()
2025
.filter(EventsProperties.DataSourceProperties::enabled)
21-
.map(ps -> new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES,
22-
ps.url(), ps.username(), ps.password()))
26+
.map(this::dataSource)
2327
.toList();
24-
return EventSQL.of(dataSourceProperties);
28+
return new EventSQL(dataSources, EventSQLDialect.POSTGRES);
29+
}
30+
31+
private DataSource dataSource(EventsProperties.DataSourceProperties properties) {
32+
var config = new HikariConfig();
33+
config.setJdbcUrl(properties.url());
34+
config.setUsername(properties.username());
35+
config.setPassword(properties.password());
36+
config.setMinimumIdle(properties.connections());
37+
config.setMaximumPoolSize(properties.connections());
38+
return new HikariDataSource(config);
2539
}
2640
}

benchmarks/events-db/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export tag=$tag
3333
export run_cmd="docker run -d \\
3434
-e \"POSTGRES_USER=postgres\" -e \"POSTGRES_PASSWORD=postgres\" \\
3535
--memory ${memory_limit} --cpus ${cpus_limit} \
36-
--network host ${volume} --restart unless-stopped --name $app $tagged_image"
36+
--network host ${volume} --name $app $tagged_image"
3737

3838
cd ..
3939
envsubst '${app} ${tag}' < scripts/template_load_and_run_app.bash > "$app/dist/load_and_run_app.bash"

benchmarks/events-db/init_db.sql

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,41 @@ CREATE TABLE topic (
1212
created_at TIMESTAMP NOT NULL DEFAULT NOW()
1313
);
1414

15+
CREATE TABLE consumer (
16+
topic TEXT NOT NULL,
17+
name TEXT NOT NULL,
18+
partition SMALLINT NOT NULL,
19+
first_event_id BIGINT,
20+
last_event_id BIGINT,
21+
last_consumption_at TIMESTAMP,
22+
consumed_events BIGINT NOT NULL,
23+
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
24+
PRIMARY KEY (topic, name, partition)
25+
);
26+
1527
CREATE TABLE event (
1628
topic TEXT NOT NULL,
1729
id BIGSERIAL NOT NULL,
1830
partition SMALLINT NOT NULL,
1931
key TEXT,
2032
value BYTEA NOT NULL,
33+
buffered_at TIMESTAMP NOT NULL,
2134
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
2235
metadata JSON NOT NULL,
2336
PRIMARY KEY (topic, id)
2437
) PARTITION BY LIST (topic);
2538

26-
CREATE TABLE consumer (
39+
CREATE TABLE event_buffer (
2740
topic TEXT NOT NULL,
28-
name TEXT NOT NULL,
41+
id BIGSERIAL PRIMARY KEY,
2942
partition SMALLINT NOT NULL,
30-
first_event_id BIGINT,
31-
last_event_id BIGINT,
32-
last_consumption_at TIMESTAMP,
33-
consumed_events BIGINT NOT NULL,
43+
key TEXT,
44+
value BYTEA NOT NULL,
3445
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
35-
PRIMARY KEY (topic, name, partition)
46+
metadata JSON NOT NULL
47+
);
48+
49+
CREATE TABLE event_buffer_lock (
50+
created_at TIMESTAMP NOT NULL DEFAULT NOW()
3651
);
52+
INSERT INTO event_buffer_lock VALUES (DEFAULT);

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public class EventSQLBenchmarksRunner {
2323
static final String DB_USERNAME = envValueOrDefault("DB_URL", "events");
2424
static final String DB_PASSWORD = envValueOrDefault("DB_PASSWORD", "events");
2525
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 50);
26-
static final EventSQL.Dialect SQL_DIALECT = EventSQL.Dialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
26+
static final EventSQLDialect SQL_DIALECT = EventSQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
2727
static final int RUNNER_INSTANCES = envIntValueOrDefault("RUNNER_INSTANCES", 1);
2828
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 60_000);
29-
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1000);
29+
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1_000);
3030
static final String TEST_TOPIC = envValueOrDefault("TEST_TOPIC", "account_created");
3131
static final String TEST_CONSUMER = envValueOrDefault("TEST_CONSUMER", "benchmarks-consumer");
3232

@@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception {
3939

4040
var dataSource = dataSource(DB_URL, DB_USERNAME, DB_PASSWORD);
4141

42-
var eventSQL = EventSQL.of(new EventSQL.DataSourceProperties(SQL_DIALECT, DB_URL, DB_USERNAME, DB_PASSWORD, DATA_SOURCE_POOL_SIZE));
42+
var eventSQL = new EventSQL(dataSource, SQL_DIALECT);
4343

4444
printDelimiter();
4545

@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {
6060
var start = System.currentTimeMillis();
6161

6262
publishEvents(eventSQL.publisher());
63+
waitForPublishBufferToEmpty(dataSource);
6364
var publicationDuration = Duration.ofMillis(System.currentTimeMillis() - start);
6465

6566
printDelimiter();
@@ -230,6 +231,25 @@ static void awaitForFutures(List<Future<?>> futures) {
230231
});
231232
}
232233

234+
static void waitForPublishBufferToEmpty(DataSource dataSource) {
235+
while (true) {
236+
try {
237+
var bufferSize = executeQuery(dataSource, "SELECT COUNT(*) FROM event_buffer", r -> {
238+
if (r.next()) {
239+
return r.getInt(1);
240+
}
241+
return 0;
242+
});
243+
if (bufferSize == 0) {
244+
break;
245+
}
246+
Thread.sleep(250);
247+
} catch (Exception e) {
248+
e.printStackTrace();
249+
}
250+
}
251+
}
252+
233253
static void waitForConsumers(DataSource dataSource, ConsumerDefinition consumerDefinition) throws Exception {
234254
var eventsStats = eventTableStats(dataSource);
235255
while (true) {
@@ -258,7 +278,7 @@ static void waitForConsumers(DataSource dataSource, ConsumerDefinition consumerD
258278
break;
259279
}
260280

261-
Thread.sleep(1000);
281+
Thread.sleep(500);
262282
}
263283
}
264284

pom.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,6 @@
3535
<artifactId>jooq</artifactId>
3636
<version>${jooq.version}</version>
3737
</dependency>
38-
<dependency>
39-
<groupId>com.zaxxer</groupId>
40-
<artifactId>HikariCP</artifactId>
41-
<version>${hikari.version}</version>
42-
</dependency>
4338

4439
<dependency>
4540
<groupId>org.junit.jupiter</groupId>
@@ -71,6 +66,12 @@
7166
<version>${testcontainers.version}</version>
7267
<scope>test</scope>
7368
</dependency>
69+
<dependency>
70+
<groupId>com.zaxxer</groupId>
71+
<artifactId>HikariCP</artifactId>
72+
<version>${hikari.version}</version>
73+
<scope>test</scope>
74+
</dependency>
7475
<dependency>
7576
<groupId>org.postgresql</groupId>
7677
<artifactId>postgresql</artifactId>

0 commit comments

Comments
 (0)