Skip to content

Commit 89340e7

Browse files
committed
Managing schema automatically
1 parent 2ccbc50 commit 89340e7

5 files changed

Lines changed: 23 additions & 59 deletions

File tree

README.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have a few tables (postgres syntax, schema managed by EventSQL):
15+
We just need to have a few tables (postgres syntax, schema managed fully by EventSQL):
1616

1717
```sql
1818
CREATE TABLE topic (
@@ -33,17 +33,15 @@ CREATE TABLE consumer (
3333
PRIMARY KEY (topic, name, partition)
3434
);
3535

36-
CREATE TABLE event (
37-
topic TEXT NOT NULL,
38-
id BIGSERIAL NOT NULL,
36+
CREATE TABLE {topic}_event (
37+
id BIGSERIAL PRIMARY KEY,
3938
partition SMALLINT NOT NULL,
4039
key TEXT,
4140
value BYTEA NOT NULL,
4241
buffered_at TIMESTAMP NOT NULL,
4342
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
44-
metadata JSON NOT NULL,
45-
PRIMARY KEY (topic, id)
46-
) PARTITION BY LIST (topic);
43+
metadata JSON NOT NULL
44+
);
4745

4846
-- Same schema as event, just not partitioned. --
4947
-- It is used to handle eventual consistency of auto increment; --
@@ -52,8 +50,8 @@ CREATE TABLE event (
5250
-- they are then moved to event table in bulk, by a single, serialized writer; --
5351
-- because there is only one writer, it fixes eventual consistency issue --
5452
CREATE TABLE event_buffer (
55-
topic TEXT NOT NULL,
5653
id BIGSERIAL PRIMARY KEY,
54+
topic TEXT NOT NULL,
5755
partition SMALLINT NOT NULL,
5856
key TEXT,
5957
value BYTEA NOT NULL,
@@ -63,12 +61,12 @@ CREATE TABLE event_buffer (
6361
-- Used to lock single event_buffer to event writer; --
6462
-- there cannot be more than one record of this table! --
6563
CREATE TABLE event_buffer_lock (
66-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
64+
id TEXT PRIMARY KEY
6765
);
68-
INSERT INTO event_buffer_lock VALUES (DEFAULT);
66+
INSERT INTO event_buffer_lock VALUES ('singleton-lock');
6967
```
7068

71-
To consume messages, we just need to periodically (every one to a few seconds) do:
69+
To consume events, we just need to periodically (every one to a few seconds) do:
7270

7371
```sql
7472
BEGIN;
@@ -77,8 +75,8 @@ SELECT * FROM consumer
7775
WHERE topic = :topic AND name = :c_name
7876
FOR UPDATE SKIP LOCKED;
7977

80-
SELECT * FROM event
81-
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
78+
SELECT * FROM {topic}_event
79+
WHERE (:last_event_id IS NULL OR id > :last_event_id)
8280
ORDER BY id LIMIT :limit;
8381

8482
(process events)
@@ -105,8 +103,8 @@ SELECT * FROM consumer
105103
WHERE topic = :topic AND name = :c_name AND partition = 0
106104
FOR UPDATE SKIP LOCKED;
107105

108-
SELECT * FROM event
109-
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
106+
SELECT * FROM {topic}_event
107+
WHERE partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
110108
ORDER BY id LIMIT :limit;
111109

112110
(process events)
@@ -142,6 +140,9 @@ ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
142140

143141
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
144142

143+
Required tables are managed automatically by the library, but if you want to customize their schema a bit, you can provide your own `EventSQLRegistry.TableManager` implementation.
144+
See `EventSQLRegistry` for details.
145+
145146
### Topics and Consumers
146147

147148
Having `EventSQL` instance, we can register topics and their consumers:
@@ -248,7 +249,7 @@ var consumers = eventSQL.consumers();
248249
consumers.startConsumer("txt_topic", "single-consumer", event -> {
249250
// handle single event
250251
});
251-
// with more frequent polling - by default it is 0.5 second
252+
// with more frequent polling - by default it is 1 second
252253
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
253254
// handle single event
254255
}, Duration.ofMillis(100));

TODO.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,4 @@
44
* compact topics - unique key
55
* join, aka streams
66
* more elaborate definitions change support - especially around partitions growth & shrinkage
7-
* JavaDocs
8-
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
9-
* using JOOQ tradeoffs?
10-
* full MySQL/MariaDB support
7+
* JavaDocs

benchmarks/events-db/init_db.sql

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,3 @@ CREATE DATABASE events;
22

33
CREATE USER events WITH password 'events';
44
ALTER DATABASE events OWNER TO events;
5-
6-
\c events;
7-
SET ROLE events;
8-
9-
CREATE TABLE topic (
10-
name TEXT PRIMARY KEY,
11-
partitions SMALLINT NOT NULL,
12-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
13-
);
14-
15-
CREATE TABLE consumer (
16-
topic TEXT NOT NULL,
17-
name TEXT NOT NULL,
18-
partition SMALLINT NOT NULL,
19-
first_event_id BIGINT,
20-
last_event_id BIGINT,
21-
last_consumption_at TIMESTAMP,
22-
consumed_events BIGINT NOT NULL,
23-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
24-
PRIMARY KEY (topic, name, partition)
25-
);
26-
27-
CREATE TABLE event_buffer (
28-
topic TEXT NOT NULL,
29-
id BIGSERIAL PRIMARY KEY,
30-
partition SMALLINT NOT NULL,
31-
key TEXT,
32-
value BYTEA NOT NULL,
33-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
34-
metadata JSON NOT NULL
35-
);
36-
37-
CREATE TABLE event_buffer_lock (
38-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
39-
);
40-
INSERT INTO event_buffer_lock VALUES (DEFAULT);

src/main/java/com/binaryigor/eventsql/EventSQLConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
public interface EventSQLConsumers {
99

10-
Duration DEFAULT_POLLING_DELAY = Duration.ofMillis(500);
10+
Duration DEFAULT_POLLING_DELAY = Duration.ofSeconds(1);
1111
int DEFAULT_IN_MEMORY_EVENTS = 10;
1212

1313
void startConsumer(String topic, String name,

src/main/java/com/binaryigor/eventsql/internal/sql/SQLConsumerRepository.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.jooq.Field;
88
import org.jooq.Table;
99
import org.jooq.impl.DSL;
10+
import org.jooq.impl.SQLDataType;
1011

1112
import java.sql.Timestamp;
1213
import java.time.Instant;
@@ -40,7 +41,7 @@ public void createTable() {
4041
.column(PARTITION, PARTITION.getDataType().notNull())
4142
.column(FIRST_EVENT_ID)
4243
.column(LAST_EVENT_ID)
43-
.column(LAST_CONSUMPTION_AT)
44+
.column(LAST_CONSUMPTION_AT, SQLDataType.TIMESTAMP)
4445
.column(CONSUMED_EVENTS, CONSUMED_EVENTS.getDataType().notNull())
4546
.column(CREATED_AT, CREATED_AT.getDataType().notNull().defaultValue(DSL.now()))
4647
.constraint(DSL.constraint().primaryKey(TOPIC, NAME, PARTITION))
@@ -61,7 +62,8 @@ public void saveAll(Collection<Consumer> consumers) {
6162
.insertInto(CONSUMER)
6263
.columns(TOPIC, NAME, PARTITION, FIRST_EVENT_ID, LAST_EVENT_ID, LAST_CONSUMPTION_AT, CONSUMED_EVENTS);
6364

64-
consumers.forEach(c -> insert.values(c.topic(), c.name(), (short) c.partition(), c.firstEventId(), c.lastEventId(), c.lastConsumptionAt(), c.consumedEvents()));
65+
consumers.forEach(c -> insert.values(c.topic(), c.name(), (short) c.partition(),
66+
c.firstEventId(), c.lastEventId(), c.lastConsumptionAt(), c.consumedEvents()));
6567

6668
insert.onConflict(TOPIC, NAME, PARTITION)
6769
.doUpdate()

0 commit comments

Comments
 (0)