Skip to content

Commit 2ccbc50

Browse files
committed
No partitions approach
1 parent 5500f16 commit 2ccbc50

17 files changed

Lines changed: 232 additions & 123 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have a few tables (postgres syntax):
15+
We just need to have a few tables (postgres syntax, schema managed by EventSQL):
1616

1717
```sql
1818
CREATE TABLE topic (

benchmarks/events-db/init_db.sql

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,6 @@ CREATE TABLE consumer (
2424
PRIMARY KEY (topic, name, partition)
2525
);
2626

27-
CREATE TABLE event (
28-
topic TEXT NOT NULL,
29-
id BIGSERIAL NOT NULL,
30-
partition SMALLINT NOT NULL,
31-
key TEXT,
32-
value BYTEA NOT NULL,
33-
buffered_at TIMESTAMP NOT NULL,
34-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
35-
metadata JSON NOT NULL,
36-
PRIMARY KEY (topic, id)
37-
) PARTITION BY LIST (topic);
38-
3927
CREATE TABLE event_buffer (
4028
topic TEXT NOT NULL,
4129
id BIGSERIAL PRIMARY KEY,

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ static ConsumerDefinition consumerDefinition(EventSQL eventSQL) {
128128
}
129129

130130
static EventTableStats eventTableStats(DataSource source) {
131-
return executeQuery(source, "SELECT partition, MAX(id) FROM event WHERE topic = '%s' GROUP BY partition"
131+
return executeQuery(source, "SELECT partition, MAX(id) FROM %s_event GROUP BY partition"
132132
.formatted(TEST_TOPIC), r -> {
133133
var lastIdsPerPartition = new HashMap<Integer, Long>();
134134
while (r.next()) {

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect,
6969

7070
var topicRepository = new SQLTopicRepository(transactions);
7171
var consumerRepository = new SQLConsumerRepository(transactions);
72-
var eventRepository = new SQLEventRepository(transactions, transactions, sqlDialect);
72+
var eventRepository = new SQLEventRepository(transactions, transactions);
7373

7474
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
7575

src/main/java/com/binaryigor/eventsql/EventSQLRegistry.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,20 @@ public interface EventSQLRegistry {
1515
EventSQLRegistry unregisterConsumer(String topic, String name);
1616

1717
List<ConsumerDefinition> listConsumers();
18+
19+
void configureTableManager(TableManager tableManager);
20+
21+
TableManager tableManager();
22+
23+
// all methods must be idempotent
24+
interface TableManager {
25+
26+
void prepareTopicTable();
27+
28+
void prepareConsumerTable();
29+
30+
void prepareEventTable(String topic);
31+
32+
void dropEventTable(String topic);
33+
}
1834
}

src/main/java/com/binaryigor/eventsql/internal/ConsumerRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
public interface ConsumerRepository {
88

9+
void createTable();
10+
911
void save(Consumer consumer);
1012

1113
void saveAll(Collection<Consumer> consumers);

src/main/java/com/binaryigor/eventsql/internal/DefaultEventSQLRegistry.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class DefaultEventSQLRegistry implements EventSQLRegistry {
1717
private final EventRepository eventRepository;
1818
private final ConsumerRepository consumerRepository;
1919
private final Transactions transactions;
20+
private TableManager tableManager;
2021

2122
public DefaultEventSQLRegistry(TopicRepository topicRepository,
2223
EventRepository eventRepository,
@@ -26,6 +27,14 @@ public DefaultEventSQLRegistry(TopicRepository topicRepository,
2627
this.eventRepository = eventRepository;
2728
this.consumerRepository = consumerRepository;
2829
this.transactions = transactions;
30+
this.tableManager = new DefaultTableManager(topicRepository, consumerRepository, eventRepository);
31+
32+
registerTables();
33+
}
34+
35+
private void registerTables() {
36+
tableManager.prepareTopicTable();
37+
tableManager.prepareConsumerTable();
2938
}
3039

3140
// TODO: support more complex modifications
@@ -40,7 +49,7 @@ public EventSQLRegistry registerTopic(TopicDefinition topic) {
4049
if (currentTopicDefinitionOpt.isEmpty()) {
4150
transactions.execute(() -> {
4251
topicRepository.save(topic);
43-
eventRepository.createPartition(topic.name());
52+
tableManager.prepareEventTable(topic.name());
4453
});
4554
return this;
4655
}
@@ -72,7 +81,7 @@ public EventSQLRegistry unregisterTopic(String topic) {
7281
}
7382

7483
topicRepository.delete(topic);
75-
eventRepository.deletePartition(topic);
84+
tableManager.dropEventTable(topic);
7685
});
7786

7887
return this;
@@ -92,6 +101,8 @@ public EventSQLRegistry registerConsumer(ConsumerDefinition consumer) {
92101
.formatted(topic.name(), consumer.name()));
93102
}
94103

104+
tableManager.prepareConsumerTable();
105+
95106
var currentConsumers = consumerRepository.allOf(consumer.topic(), consumer.name());
96107
if (consumerDefinitionHaveNotChanged(currentConsumers, topic, consumer)) {
97108
return this;
@@ -140,6 +151,16 @@ public List<ConsumerDefinition> listConsumers() {
140151
.toList();
141152
}
142153

154+
@Override
155+
public void configureTableManager(TableManager tableManager) {
156+
this.tableManager = tableManager;
157+
}
158+
159+
@Override
160+
public TableManager tableManager() {
161+
return tableManager;
162+
}
163+
143164
private TopicDefinition findTopicDefinition(String topic) {
144165
return topicRepository.ofName(topic)
145166
.orElseThrow(() -> new IllegalArgumentException("%s topic doesn't exist".formatted(topic)));
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.binaryigor.eventsql.internal;
2+
3+
import com.binaryigor.eventsql.EventSQLRegistry;
4+
5+
public class DefaultTableManager implements EventSQLRegistry.TableManager {
6+
7+
private final TopicRepository topicRepository;
8+
private final ConsumerRepository consumerRepository;
9+
private final EventRepository eventRepository;
10+
11+
public DefaultTableManager(TopicRepository topicRepository, ConsumerRepository consumerRepository, EventRepository eventRepository) {
12+
this.topicRepository = topicRepository;
13+
this.consumerRepository = consumerRepository;
14+
this.eventRepository = eventRepository;
15+
}
16+
17+
@Override
18+
public void prepareTopicTable() {
19+
topicRepository.createTable();
20+
}
21+
22+
@Override
23+
public void prepareConsumerTable() {
24+
consumerRepository.createTable();
25+
}
26+
27+
@Override
28+
public void prepareEventTable(String topic) {
29+
eventRepository.prepareBufferLock();
30+
eventRepository.createBuffer();
31+
eventRepository.createPartition(topic);
32+
}
33+
34+
@Override
35+
public void dropEventTable(String topic) {
36+
eventRepository.dropPartition(topic);
37+
}
38+
}

src/main/java/com/binaryigor/eventsql/internal/EventRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@
77

88
public interface EventRepository {
99

10+
void createBuffer();
11+
12+
void prepareBufferLock();
13+
1014
void createPartition(String topic);
1115

12-
void deletePartition(String topic);
16+
void dropPartition(String topic);
1317

1418
void create(EventInput event);
1519

src/main/java/com/binaryigor/eventsql/internal/TopicRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
public interface TopicRepository {
99

10+
void createTable();
11+
1012
void save(TopicDefinition topic);
1113

1214
Optional<TopicDefinition> ofName(String name);

0 commit comments

Comments
 (0)