Skip to content

Commit b64d4af

Browse files
committed
Better schema
1 parent 89340e7 commit b64d4af

16 files changed

Lines changed: 112 additions & 109 deletions

README.md

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ CREATE TABLE {topic}_event (
4343
metadata JSON NOT NULL
4444
);
4545

46-
-- Same schema as event, just not partitioned. --
46+
-- Same schema as event, just not partitioned (by topic). --
4747
-- It is used to handle eventual consistency of auto increment; --
4848
-- there is no guarantee that record of id 2 is visible after id 1 record. --
4949
-- Events are first inserted to the event_buffer; --
50-
-- they are then moved to event table in bulk, by a single, serialized writer; --
50+
-- they are then moved to the {topic}_event table in bulk, by a single, serialized writer (per topic); --
5151
-- because there is only one writer, it fixes eventual consistency issue --
5252
CREATE TABLE event_buffer (
5353
id BIGSERIAL PRIMARY KEY,
@@ -58,12 +58,11 @@ CREATE TABLE event_buffer (
5858
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
5959
metadata JSON NOT NULL
6060
);
61-
-- Used to lock single event_buffer to event writer; --
62-
-- there cannot be more than one record of this table! --
61+
CREATE INDEX event_buffer_topic_id ON event_buffer (topic, id);
62+
-- Used to lock single (per topic) event_buffer to {topic}_event writer --
6363
CREATE TABLE event_buffer_lock (
64-
id TEXT PRIMARY KEY
64+
topic TEXT PRIMARY KEY
6565
);
66-
INSERT INTO event_buffer_lock VALUES ('singleton-lock');
6766
```
6867

6968
To consume events, we just need to periodically (every one to a few seconds) do:
@@ -128,9 +127,8 @@ them:
128127
```java
129128

130129
import com.binaryigor.eventsql.EventSQL;
131-
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
132-
// as of now, only POSTGRES has fully tested support;
133-
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
130+
// dialect of your events backend - POSTGRES, MYSQL, MARIADB;
131+
// as of now, only POSTGRES has fully tested support, but should work on others as well
134132
import com.binaryigor.eventsql.EventSQLDialect;
135133
import javax.sql.DataSource;
136134

@@ -140,7 +138,7 @@ ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
140138

141139
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
142140

143-
Required tables are managed automatically by the library, but if you want to customize their schema a bit, you can provide your own `EventSQLRegistry.TableManager` implementation.
141+
Required tables are managed automatically by the library, but if you want to customize their schema a bit, you can provide your own `EventSQLRegistry.TablesManager` implementation.
144142
See `EventSQLRegistry` for details.
145143

146144
### Topics and Consumers

src/main/java/com/binaryigor/eventsql/EventSQLRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public interface EventSQLRegistry {
1616

1717
List<ConsumerDefinition> listConsumers();
1818

19-
void configureTableManager(TableManager tableManager);
19+
void configureTablesManager(TablesManager tablesManager);
2020

21-
TableManager tableManager();
21+
TablesManager tablesManager();
2222

2323
// all methods must be idempotent
24-
interface TableManager {
24+
interface TablesManager {
2525

2626
void prepareTopicTable();
2727

src/main/java/com/binaryigor/eventsql/internal/DefaultEventSQLPublisher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
import java.time.Duration;
1010
import java.util.Collection;
11+
import java.util.Collections;
1112
import java.util.List;
13+
import java.util.Set;
14+
import java.util.concurrent.ConcurrentHashMap;
1215
import java.util.concurrent.CountDownLatch;
1316
import java.util.concurrent.TimeUnit;
1417
import java.util.concurrent.atomic.AtomicBoolean;
@@ -24,6 +27,7 @@ public class DefaultEventSQLPublisher implements EventSQLPublisher {
2427
private final EventRepository eventRepository;
2528
private Partitioner partitioner;
2629
private final AtomicBoolean published = new AtomicBoolean(false);
30+
private final Set<String> publishedTopics = Collections.newSetFromMap(new ConcurrentHashMap<>());
2731
private final AtomicBoolean flushPublishBufferThreadSet = new AtomicBoolean(false);
2832
private volatile Thread flushPublishBufferThread;
2933
private final int flushPublishBufferSize;
@@ -67,6 +71,8 @@ private void publish(String topicName, Collection<EventPublication> publications
6771
eventRepository.createAll(toCreateEvents);
6872

6973
published.set(true);
74+
publishedTopics.add(topicName);
75+
7076
var shouldSetFlushPublishBufferThread = !flushPublishBufferThreadSet.compareAndExchange(false, true);
7177
if (shouldSetFlushPublishBufferThread) {
7278
startFlushPublishBufferThread();
@@ -82,7 +88,7 @@ private void startFlushPublishBufferThread() {
8288
Thread.sleep(flushPublishBufferDelay);
8389
}
8490
if (moreToFlush || published.getAndSet(false)) {
85-
var flushed = eventRepository.flushBuffer(flushPublishBufferSize);
91+
var flushed = eventRepository.flushBuffer(publishedTopics, flushPublishBufferSize);
8692
moreToFlush = flushed > flushPublishBufferSize;
8793
}
8894
} catch (Exception e) {

src/main/java/com/binaryigor/eventsql/internal/DefaultEventSQLRegistry.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class DefaultEventSQLRegistry implements EventSQLRegistry {
1717
private final EventRepository eventRepository;
1818
private final ConsumerRepository consumerRepository;
1919
private final Transactions transactions;
20-
private TableManager tableManager;
20+
private TablesManager tablesManager;
2121

2222
public DefaultEventSQLRegistry(TopicRepository topicRepository,
2323
EventRepository eventRepository,
@@ -27,14 +27,14 @@ public DefaultEventSQLRegistry(TopicRepository topicRepository,
2727
this.eventRepository = eventRepository;
2828
this.consumerRepository = consumerRepository;
2929
this.transactions = transactions;
30-
this.tableManager = new DefaultTableManager(topicRepository, consumerRepository, eventRepository);
30+
this.tablesManager = new DefaultTablesManager(topicRepository, consumerRepository, eventRepository);
3131

3232
registerTables();
3333
}
3434

3535
private void registerTables() {
36-
tableManager.prepareTopicTable();
37-
tableManager.prepareConsumerTable();
36+
tablesManager.prepareTopicTable();
37+
tablesManager.prepareConsumerTable();
3838
}
3939

4040
// TODO: support more complex modifications
@@ -49,7 +49,7 @@ public EventSQLRegistry registerTopic(TopicDefinition topic) {
4949
if (currentTopicDefinitionOpt.isEmpty()) {
5050
transactions.execute(() -> {
5151
topicRepository.save(topic);
52-
tableManager.prepareEventTable(topic.name());
52+
tablesManager.prepareEventTable(topic.name());
5353
});
5454
return this;
5555
}
@@ -81,7 +81,7 @@ public EventSQLRegistry unregisterTopic(String topic) {
8181
}
8282

8383
topicRepository.delete(topic);
84-
tableManager.dropEventTable(topic);
84+
tablesManager.dropEventTable(topic);
8585
});
8686

8787
return this;
@@ -101,7 +101,7 @@ public EventSQLRegistry registerConsumer(ConsumerDefinition consumer) {
101101
.formatted(topic.name(), consumer.name()));
102102
}
103103

104-
tableManager.prepareConsumerTable();
104+
tablesManager.prepareConsumerTable();
105105

106106
var currentConsumers = consumerRepository.allOf(consumer.topic(), consumer.name());
107107
if (consumerDefinitionHaveNotChanged(currentConsumers, topic, consumer)) {
@@ -152,13 +152,13 @@ public List<ConsumerDefinition> listConsumers() {
152152
}
153153

154154
@Override
155-
public void configureTableManager(TableManager tableManager) {
156-
this.tableManager = tableManager;
155+
public void configureTablesManager(TablesManager tablesManager) {
156+
this.tablesManager = tablesManager;
157157
}
158158

159159
@Override
160-
public TableManager tableManager() {
161-
return tableManager;
160+
public TablesManager tablesManager() {
161+
return tablesManager;
162162
}
163163

164164
private TopicDefinition findTopicDefinition(String topic) {

src/main/java/com/binaryigor/eventsql/internal/DefaultTableManager.java renamed to src/main/java/com/binaryigor/eventsql/internal/DefaultTablesManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import com.binaryigor.eventsql.EventSQLRegistry;
44

5-
public class DefaultTableManager implements EventSQLRegistry.TableManager {
5+
public class DefaultTablesManager implements EventSQLRegistry.TablesManager {
66

77
private final TopicRepository topicRepository;
88
private final ConsumerRepository consumerRepository;
99
private final EventRepository eventRepository;
1010

11-
public DefaultTableManager(TopicRepository topicRepository, ConsumerRepository consumerRepository, EventRepository eventRepository) {
11+
public DefaultTablesManager(TopicRepository topicRepository, ConsumerRepository consumerRepository, EventRepository eventRepository) {
1212
this.topicRepository = topicRepository;
1313
this.consumerRepository = consumerRepository;
1414
this.eventRepository = eventRepository;
@@ -26,7 +26,6 @@ public void prepareConsumerTable() {
2626

2727
@Override
2828
public void prepareEventTable(String topic) {
29-
eventRepository.prepareBufferLock();
3029
eventRepository.createBuffer();
3130
eventRepository.createPartition(topic);
3231
}

src/main/java/com/binaryigor/eventsql/internal/EventRepository.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ public interface EventRepository {
99

1010
void createBuffer();
1111

12-
void prepareBufferLock();
13-
1412
void createPartition(String topic);
1513

1614
void dropPartition(String topic);
@@ -19,7 +17,7 @@ public interface EventRepository {
1917

2018
void createAll(Collection<EventInput> events);
2119

22-
int flushBuffer(int toFlush);
20+
int flushBuffer(Collection<String> topics, int toFlush);
2321

2422
List<Event> nextEvents(String topic, Long lastId, int limit);
2523

src/main/java/com/binaryigor/eventsql/internal/sharded/ShardedEventSQLRegistry.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ public List<ConsumerDefinition> listConsumers() {
4949
}
5050

5151
@Override
52-
public void configureTableManager(TableManager tableManager) {
53-
registries.forEach(r -> r.configureTableManager(tableManager));
52+
public void configureTablesManager(TablesManager tablesManager) {
53+
registries.forEach(r -> r.configureTablesManager(tablesManager));
5454
}
5555

5656
@Override
57-
public TableManager tableManager() {
58-
return registries.getFirst().tableManager();
57+
public TablesManager tablesManager() {
58+
return registries.getFirst().tablesManager();
5959
}
6060
}

0 commit comments

Comments
 (0)