Skip to content

Commit 3d1a5a4

Browse files
authored
Merge pull request #7 from BinaryIgor/no-partitions-approach
No partitions approach
2 parents 5500f16 + b64d4af commit 3d1a5a4

25 files changed

Lines changed: 289 additions & 213 deletions

README.md

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have a few tables (postgres syntax):
15+
We just need to have a few tables (postgres syntax, schema managed fully by EventSQL):
1616

1717
```sql
1818
CREATE TABLE topic (
@@ -33,42 +33,39 @@ CREATE TABLE consumer (
3333
PRIMARY KEY (topic, name, partition)
3434
);
3535

36-
CREATE TABLE event (
37-
topic TEXT NOT NULL,
38-
id BIGSERIAL NOT NULL,
36+
CREATE TABLE {topic}_event (
37+
id BIGSERIAL PRIMARY KEY,
3938
partition SMALLINT NOT NULL,
4039
key TEXT,
4140
value BYTEA NOT NULL,
4241
buffered_at TIMESTAMP NOT NULL,
4342
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
44-
metadata JSON NOT NULL,
45-
PRIMARY KEY (topic, id)
46-
) PARTITION BY LIST (topic);
43+
metadata JSON NOT NULL
44+
);
4745

48-
-- Same schema as event, just not partitioned. --
46+
-- Same schema as event, just not partitioned (by topic). --
4947
-- It is used to handle eventual consistency of auto increment; --
5048
-- there is no guarantee that record of id 2 is visible after id 1 record. --
5149
-- Events are first inserted to the event_buffer; --
52-
-- they are then moved to event table in bulk, by a single, serialized writer; --
50+
-- they are then moved to the {topic}_event table in bulk, by a single, serialized writer (per topic); --
5351
-- because there is only one writer, it fixes eventual consistency issue --
5452
CREATE TABLE event_buffer (
55-
topic TEXT NOT NULL,
5653
id BIGSERIAL PRIMARY KEY,
54+
topic TEXT NOT NULL,
5755
partition SMALLINT NOT NULL,
5856
key TEXT,
5957
value BYTEA NOT NULL,
6058
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
6159
metadata JSON NOT NULL
6260
);
63-
-- Used to lock single event_buffer to event writer; --
64-
-- there cannot be more than one record of this table! --
61+
CREATE INDEX event_buffer_topic_id ON event_buffer (topic, id);
62+
-- Used to lock single (per topic) event_buffer to {topic}_event writer --
6563
CREATE TABLE event_buffer_lock (
66-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
64+
topic TEXT PRIMARY KEY
6765
);
68-
INSERT INTO event_buffer_lock VALUES (DEFAULT);
6966
```
7067

71-
To consume messages, we just need to periodically (every one to a few seconds) do:
68+
To consume events, we just need to periodically (every one to a few seconds) do:
7269

7370
```sql
7471
BEGIN;
@@ -77,8 +74,8 @@ SELECT * FROM consumer
7774
WHERE topic = :topic AND name = :c_name
7875
FOR UPDATE SKIP LOCKED;
7976

80-
SELECT * FROM event
81-
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
77+
SELECT * FROM {topic}_event
78+
WHERE (:last_event_id IS NULL OR id > :last_event_id)
8279
ORDER BY id LIMIT :limit;
8380

8481
(process events)
@@ -105,8 +102,8 @@ SELECT * FROM consumer
105102
WHERE topic = :topic AND name = :c_name AND partition = 0
106103
FOR UPDATE SKIP LOCKED;
107104

108-
SELECT * FROM event
109-
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
105+
SELECT * FROM {topic}_event
106+
WHERE partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
110107
ORDER BY id LIMIT :limit;
111108

112109
(process events)
@@ -130,9 +127,8 @@ them:
130127
```java
131128

132129
import com.binaryigor.eventsql.EventSQL;
133-
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
134-
// as of now, only POSTGRES has fully tested support;
135-
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
130+
// dialect of your events backend - POSTGRES, MYSQL, MARIADB;
131+
// as of now, only POSTGRES has fully tested support, but should work on others as well
136132
import com.binaryigor.eventsql.EventSQLDialect;
137133
import javax.sql.DataSource;
138134

@@ -142,6 +138,9 @@ ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
142138

143139
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
144140

141+
Required tables are managed automatically by the library, but if you want to customize their schema a bit, you can provide your own `EventSQLRegistry.TablesManager` implementation.
142+
See `EventSQLRegistry` for details.
143+
145144
### Topics and Consumers
146145

147146
Having `EventSQL` instance, we can register topics and their consumers:
@@ -248,7 +247,7 @@ var consumers = eventSQL.consumers();
248247
consumers.startConsumer("txt_topic", "single-consumer", event -> {
249248
// handle single event
250249
});
251-
// with more frequent polling - by default it is 0.5 second
250+
// with more frequent polling - by default it is 1 second
252251
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
253252
// handle single event
254253
}, Duration.ofMillis(100));

TODO.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,4 @@
44
* compact topics - unique key
55
* join, aka streams
66
* more elaborate definitions change support - especially around partitions growth & shrinkage
7-
* JavaDocs
8-
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
9-
* using JOOQ tradeoffs?
10-
* full MySQL/MariaDB support
7+
* JavaDocs

benchmarks/events-db/init_db.sql

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,3 @@ CREATE DATABASE events;
22

33
CREATE USER events WITH password 'events';
44
ALTER DATABASE events OWNER TO events;
5-
6-
\c events;
7-
SET ROLE events;
8-
9-
CREATE TABLE topic (
10-
name TEXT PRIMARY KEY,
11-
partitions SMALLINT NOT NULL,
12-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
13-
);
14-
15-
CREATE TABLE consumer (
16-
topic TEXT NOT NULL,
17-
name TEXT NOT NULL,
18-
partition SMALLINT NOT NULL,
19-
first_event_id BIGINT,
20-
last_event_id BIGINT,
21-
last_consumption_at TIMESTAMP,
22-
consumed_events BIGINT NOT NULL,
23-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
24-
PRIMARY KEY (topic, name, partition)
25-
);
26-
27-
CREATE TABLE event (
28-
topic TEXT NOT NULL,
29-
id BIGSERIAL NOT NULL,
30-
partition SMALLINT NOT NULL,
31-
key TEXT,
32-
value BYTEA NOT NULL,
33-
buffered_at TIMESTAMP NOT NULL,
34-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
35-
metadata JSON NOT NULL,
36-
PRIMARY KEY (topic, id)
37-
) PARTITION BY LIST (topic);
38-
39-
CREATE TABLE event_buffer (
40-
topic TEXT NOT NULL,
41-
id BIGSERIAL PRIMARY KEY,
42-
partition SMALLINT NOT NULL,
43-
key TEXT,
44-
value BYTEA NOT NULL,
45-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
46-
metadata JSON NOT NULL
47-
);
48-
49-
CREATE TABLE event_buffer_lock (
50-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
51-
);
52-
INSERT INTO event_buffer_lock VALUES (DEFAULT);

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ static ConsumerDefinition consumerDefinition(EventSQL eventSQL) {
128128
}
129129

130130
static EventTableStats eventTableStats(DataSource source) {
131-
return executeQuery(source, "SELECT partition, MAX(id) FROM event WHERE topic = '%s' GROUP BY partition"
131+
return executeQuery(source, "SELECT partition, MAX(id) FROM %s_event GROUP BY partition"
132132
.formatted(TEST_TOPIC), r -> {
133133
var lastIdsPerPartition = new HashMap<Integer, Long>();
134134
while (r.next()) {

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect,
6969

7070
var topicRepository = new SQLTopicRepository(transactions);
7171
var consumerRepository = new SQLConsumerRepository(transactions);
72-
var eventRepository = new SQLEventRepository(transactions, transactions, sqlDialect);
72+
var eventRepository = new SQLEventRepository(transactions, transactions);
7373

7474
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
7575

src/main/java/com/binaryigor/eventsql/EventSQLConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
public interface EventSQLConsumers {
99

10-
Duration DEFAULT_POLLING_DELAY = Duration.ofMillis(500);
10+
Duration DEFAULT_POLLING_DELAY = Duration.ofSeconds(1);
1111
int DEFAULT_IN_MEMORY_EVENTS = 10;
1212

1313
void startConsumer(String topic, String name,

src/main/java/com/binaryigor/eventsql/EventSQLRegistry.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,20 @@ public interface EventSQLRegistry {
1515
EventSQLRegistry unregisterConsumer(String topic, String name);
1616

1717
List<ConsumerDefinition> listConsumers();
18+
19+
void configureTablesManager(TablesManager tablesManager);
20+
21+
TablesManager tablesManager();
22+
23+
// all methods must be idempotent
24+
interface TablesManager {
25+
26+
void prepareTopicTable();
27+
28+
void prepareConsumerTable();
29+
30+
void prepareEventTable(String topic);
31+
32+
void dropEventTable(String topic);
33+
}
1834
}

src/main/java/com/binaryigor/eventsql/internal/ConsumerRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
public interface ConsumerRepository {
88

9+
void createTable();
10+
911
void save(Consumer consumer);
1012

1113
void saveAll(Collection<Consumer> consumers);

src/main/java/com/binaryigor/eventsql/internal/DefaultEventSQLPublisher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
import java.time.Duration;
1010
import java.util.Collection;
11+
import java.util.Collections;
1112
import java.util.List;
13+
import java.util.Set;
14+
import java.util.concurrent.ConcurrentHashMap;
1215
import java.util.concurrent.CountDownLatch;
1316
import java.util.concurrent.TimeUnit;
1417
import java.util.concurrent.atomic.AtomicBoolean;
@@ -24,6 +27,7 @@ public class DefaultEventSQLPublisher implements EventSQLPublisher {
2427
private final EventRepository eventRepository;
2528
private Partitioner partitioner;
2629
private final AtomicBoolean published = new AtomicBoolean(false);
30+
private final Set<String> publishedTopics = Collections.newSetFromMap(new ConcurrentHashMap<>());
2731
private final AtomicBoolean flushPublishBufferThreadSet = new AtomicBoolean(false);
2832
private volatile Thread flushPublishBufferThread;
2933
private final int flushPublishBufferSize;
@@ -67,6 +71,8 @@ private void publish(String topicName, Collection<EventPublication> publications
6771
eventRepository.createAll(toCreateEvents);
6872

6973
published.set(true);
74+
publishedTopics.add(topicName);
75+
7076
var shouldSetFlushPublishBufferThread = !flushPublishBufferThreadSet.compareAndExchange(false, true);
7177
if (shouldSetFlushPublishBufferThread) {
7278
startFlushPublishBufferThread();
@@ -82,7 +88,7 @@ private void startFlushPublishBufferThread() {
8288
Thread.sleep(flushPublishBufferDelay);
8389
}
8490
if (moreToFlush || published.getAndSet(false)) {
85-
var flushed = eventRepository.flushBuffer(flushPublishBufferSize);
91+
var flushed = eventRepository.flushBuffer(publishedTopics, flushPublishBufferSize);
8692
moreToFlush = flushed > flushPublishBufferSize;
8793
}
8894
} catch (Exception e) {

src/main/java/com/binaryigor/eventsql/internal/DefaultEventSQLRegistry.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class DefaultEventSQLRegistry implements EventSQLRegistry {
1717
private final EventRepository eventRepository;
1818
private final ConsumerRepository consumerRepository;
1919
private final Transactions transactions;
20+
private TablesManager tablesManager;
2021

2122
public DefaultEventSQLRegistry(TopicRepository topicRepository,
2223
EventRepository eventRepository,
@@ -26,6 +27,14 @@ public DefaultEventSQLRegistry(TopicRepository topicRepository,
2627
this.eventRepository = eventRepository;
2728
this.consumerRepository = consumerRepository;
2829
this.transactions = transactions;
30+
this.tablesManager = new DefaultTablesManager(topicRepository, consumerRepository, eventRepository);
31+
32+
registerTables();
33+
}
34+
35+
private void registerTables() {
36+
tablesManager.prepareTopicTable();
37+
tablesManager.prepareConsumerTable();
2938
}
3039

3140
// TODO: support more complex modifications
@@ -40,7 +49,7 @@ public EventSQLRegistry registerTopic(TopicDefinition topic) {
4049
if (currentTopicDefinitionOpt.isEmpty()) {
4150
transactions.execute(() -> {
4251
topicRepository.save(topic);
43-
eventRepository.createPartition(topic.name());
52+
tablesManager.prepareEventTable(topic.name());
4453
});
4554
return this;
4655
}
@@ -72,7 +81,7 @@ public EventSQLRegistry unregisterTopic(String topic) {
7281
}
7382

7483
topicRepository.delete(topic);
75-
eventRepository.deletePartition(topic);
84+
tablesManager.dropEventTable(topic);
7685
});
7786

7887
return this;
@@ -92,6 +101,8 @@ public EventSQLRegistry registerConsumer(ConsumerDefinition consumer) {
92101
.formatted(topic.name(), consumer.name()));
93102
}
94103

104+
tablesManager.prepareConsumerTable();
105+
95106
var currentConsumers = consumerRepository.allOf(consumer.topic(), consumer.name());
96107
if (consumerDefinitionHaveNotChanged(currentConsumers, topic, consumer)) {
97108
return this;
@@ -140,6 +151,16 @@ public List<ConsumerDefinition> listConsumers() {
140151
.toList();
141152
}
142153

154+
@Override
155+
public void configureTablesManager(TablesManager tablesManager) {
156+
this.tablesManager = tablesManager;
157+
}
158+
159+
@Override
160+
public TablesManager tablesManager() {
161+
return tablesManager;
162+
}
163+
143164
private TopicDefinition findTopicDefinition(String topic) {
144165
return topicRepository.ofName(topic)
145166
.orElseThrow(() -> new IllegalArgumentException("%s topic doesn't exist".formatted(topic)));

0 commit comments

Comments
 (0)