Skip to content

Commit e860788

Browse files
authored
Merge pull request #5 from BinaryIgor/better-docs
Better docs & API
2 parents 0ab31a8 + 4967191 commit e860788

28 files changed

Lines changed: 646 additions & 237 deletions

README.md

Lines changed: 201 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
1212

1313
## How it works
1414

15-
We just need to have three tables:
15+
We just need to have three tables (postgres syntax):
16+
1617
```sql
1718
CREATE TABLE topic (
1819
name TEXT PRIMARY KEY,
@@ -27,7 +28,7 @@ CREATE TABLE event (
2728
key TEXT,
2829
value BYTEA NOT NULL,
2930
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
30-
metadata JSONB NOT NULL,
31+
metadata JSON NOT NULL,
3132
PRIMARY KEY (topic, id)
3233
) PARTITION BY LIST (topic);
3334

@@ -43,29 +44,35 @@ CREATE TABLE consumer (
4344
```
4445

4546
To consume messages, we just need to periodically (every one to a few seconds) do:
47+
4648
```sql
4749
BEGIN;
4850

4951
SELECT * FROM consumer
50-
WHERE topic = : topic AND name = :c_name
52+
WHERE topic = :topic AND name = :c_name
5153
FOR UPDATE SKIP LOCKED;
5254

5355
SELECT * FROM event
54-
WHERE (:last_event_id IS NULL) OR id > last_event_id
55-
ORDER BY id LIMIT N;
56+
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
57+
ORDER BY id LIMIT :limit;
5658

5759
(process events)
5860

5961
UPDATE consumer
6062
SET last_event_id = :id,
6163
last_consumption_at = :now
6264
WHERE topic = :topic AND name = :c_name;
65+
66+
COMMIT;
6367
```
6468

65-
Optionally, to increase throughput & concurrency, we might have partitioned topic and consumers (-1 partition standing for not partitioned topic/consumer).
69+
Optionally, to increase throughput & concurrency, we might have a partitioned topic and consumers (-1 partition standing
70+
for not partitioned topic/consumer/event).
71+
72+
Distribution of partitioned events is the sole responsibility of the publisher.
73+
74+
Consumption of such events per partition (0 in an example) might look like this:
6675

67-
Distribution of partitioned events is a sole responsibility of publisher - the library provides sensible default (random distribution).
68-
Consumption of such events per partition (0 in example) might look like this:
6976
```sql
7077
BEGIN;
7178

@@ -74,26 +81,205 @@ WHERE topic = :topic AND name = :c_name AND partition = 0
7481
FOR UPDATE SKIP LOCKED;
7582

7683
SELECT * FROM event
77-
WHERE (:last_event_id IS NULL) OR id > last_event_id AND partition = 0
78-
ORDER BY id LIMIT N;
84+
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
85+
ORDER BY id LIMIT :limit;
7986

8087
(process events)
8188

8289
UPDATE consumer
8390
SET last_event_id = :id,
8491
last_consumption_at = :now
8592
WHERE topic = :topic AND name = :c_name AND partition = 0;
93+
94+
COMMIT;
8695
```
8796

88-
Limitation being that if consumer is partitioned, it must have the exact same number of partition as in the topic
89-
definition.
90-
It's a rather acceptable tradeoff and easy to enforce at the library level.
97+
Limitation being that if consumer is partitioned, it must have the exact same number of partition as the topic
98+
definition has. It's a rather acceptable tradeoff and easy to enforce at the library level.
9199

92100
## How to use it
93101

94-
TODO: for now, check out benchmarks/app being an example app.
102+
`EventSQL` is an entrypoint to the whole library. It requires standard Java `javax.sql.DataSource` or a list of
103+
them:
104+
105+
```java
106+
107+
import com.binaryigor.eventsql.EventSQL;
108+
import javax.sql.DataSource;
109+
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
110+
// as of now, only POSTGRES has fully tested support;
111+
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
112+
import org.jooq.SQLDialect;
113+
114+
var eventSQL = new EventSQL(dataSource, SQLDialect.POSTGRES);
115+
ver shardedEventSQL = new EventSQL(dataSources, SQLDialect.POSTGRES);
116+
```
117+
118+
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
119+
120+
### Topics and Consumers
121+
122+
Having `EventSQL` instance, we can register topics and their consumers:
123+
124+
```java
125+
// all operations are idempotent
126+
eventSQL.registry()
127+
// -1 stands for not partitioned topic
128+
.registerTopic(new TopicDefinition("account_created", -1))
129+
.registerTopic(new TopicDefinition("invoice_issued", 5))
130+
// thirds argument (true/false) determines whether Consumer is partitioned or not
131+
.registerConsumer(new ConsumerDefinition("account_created", "consumer-1", false))
132+
.registerConsumer(new ConsumerDefinition("invoice_issued", "consumer-2", true));
133+
```
134+
135+
Topics and consumers can be both partitioned and not partitioned.
136+
**Partitioned topics allow to have partitioned consumers, increasing parallelism.**
137+
Parallelism of partitioned consumers is as high as consumed topic number of partitions - events have ordering guarantee within a partition.
138+
As a consequence, for a given consumer, each partition can be processed only by a single thread at the time.
139+
140+
For a consumer to be partitioned its topic must be partitioned as well - it will have the same number of partitions.
141+
The opposite does not have to be true - consumer might not be partitioned but a related topic can; it has performance implications though, since as described above, consumer parallelism is capped at its number of partitions.
142+
143+
**With sharding, partitions are multiplied by the number of shards (db instances).**
144+
145+
For example, if we have *3 shards (3 dbs) and a topic with 10 partitions - each shard (db) will host 10 partitions, giving 30 partitions in total*.
146+
Same with consumers of a sharded topic - they will be all multiplied by the number of shards.
147+
148+
For events, it works differently - in the example above, *each shard will host ~ 33% (1/3) of the topic events data* (assuming even partition distribution).
149+
To get all events, we must read them from all shards.
150+
151+
There will be *30 consumer instances* in this particular case - `3 shards * 10 partitions`; each consuming from one partition hosted on a given shard.
152+
Each event will be published to a one partition of a single shard - as a consequence, events are unique globally, across all shards.
153+
154+
### Publishing
155+
156+
We can publish single events and batches of arbitrary data and type:
157+
```java
158+
var publisher = eventSQL.publisher();
159+
160+
publisher.publish(new EventPublication("txt_topic", "txt event".getBytes(StandardCharsets.UTF_8)));
161+
publisher.publish(new EventPublication("raw_topic", new byte[]{1, 2, 3}));
162+
publisher.publish(new EventPublication("json_topic",
163+
"""
164+
{
165+
"id": 2,
166+
"name: "some-user"
167+
}
168+
""".getBytes(StandardCharsets.UTF_8)));
169+
170+
// events can have keys and metadata as well;
171+
// key determines event distribution - if it's null, partition is randomly assigned
172+
publisher.publish(new EventPublication("txt_topic",
173+
"event-key",
174+
"txt event".getBytes(StandardCharsets.UTF_8),
175+
Map.of("some-tag", "some-meta-info")));
176+
177+
178+
// events can be also published in batches, for improved throughput
179+
publisher.publishAll(List.of(
180+
new EventPublication("txt_topic", "txt event 1".getBytes(StandardCharsets.UTF_8)),
181+
new EventPublication("txt_topic", "txt event 2".getBytes(StandardCharsets.UTF_8)),
182+
new EventPublication("txt_topic", "txt event 3".getBytes(StandardCharsets.UTF_8))));
183+
```
184+
185+
### Partitioner
186+
187+
Event partition is determined by `EventSQLPublisher.Partitioner`. By default, the following implementation is used:
188+
```java
189+
public class DefaultPartitioner implements EventSQLPublisher.Partitioner {
190+
191+
private static final Random RANDOM = new Random();
192+
193+
@Override
194+
public int partition(EventPublication publication, int topicPartitions) {
195+
if (topicPartitions == -1) {
196+
return -1;
197+
}
198+
if (publication.key() == null) {
199+
return RANDOM.nextInt(topicPartitions);
200+
}
201+
202+
return keyHash(publication.key())
203+
.mod(BigInteger.valueOf(topicPartitions))
204+
.intValue();
205+
}
206+
207+
...
208+
```
209+
210+
For a not partitioned topic, no partition is assigned.
211+
212+
If the topic is partitioned and has a null key - partition is random.
213+
If a key is defined, the partition is assigned based on key hash. Thanks to this, we have a guarantee that events associated with the same key always land in the same partition.
214+
215+
If you want to change this behavior, you can provide your own implementation and configure it by calling `EventSQLPublisher.configurePartitioner` method.
216+
217+
### Consuming
218+
219+
We can have both single event and batch consumers:
220+
```java
221+
var consumers = eventSQL.consumers();
222+
223+
consumers.startConsumer("txt_topic", "single-consumer", event -> {
224+
// handle single event
225+
});
226+
// with more frequent polling - by default it is 1 second
227+
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
228+
// handle single event
229+
}, Duration.ofMillis(100));
230+
231+
consumers.startBatchConsumer("txt_topic", "batch-consumer", events -> {
232+
// handle events batch for better performance
233+
}, // customize batch behavior:
234+
// minEvents, maxEvents,
235+
// pollingDelay and maxPollingDelay - how long to wait for minEvents
236+
EventSQLConsumers.ConsumptionConfig.of(5, 100,
237+
Duration.ofSeconds(1), Duration.ofSeconds(10)));
238+
```
239+
240+
### Dead Letter Topics (DLT)
241+
242+
If we register a topic with DLT as follows:
243+
```java
244+
eventSQL.registry()
245+
.registerTopic(new TopicDefinition("account_created", -1))
246+
.registerTopic(new TopicDefinition("account_created_dlt", -1));
247+
```
248+
Under certain circumstances, it will get a special treatment.
249+
250+
When a consumer throws `EventSQLConsumptionException`, `DefaultDLTEventFactory` takes over and publishes failed event to the associated DLT, if it can find one:
251+
```java
252+
...
253+
254+
@Override
255+
public Optional<EventPublication> create(EventSQLConsumptionException exception, String consumer) {
256+
var event = exception.event();
257+
258+
var dltTopic = event.topic() + "_dlt";
259+
var dltTopicDefinitionOpt = topicDefinitionsCache.getLoadingIf(dltTopic, true);
260+
if (dltTopicDefinitionOpt.isEmpty()) {
261+
return Optional.empty();
262+
}
263+
264+
...
265+
266+
// creates dlt event
267+
```
268+
269+
This factory can be customized by calling `EventSQLConsumers.configureDLTEventFactory` method.
270+
271+
What is also worth noting is that any exception thrown by a single event consumer is wrapped into `EventSQLConsumptionException` automatically - see `ConsumerWrapper` class.
272+
273+
When you use `EventSQLConsumers.startBatchConsumer` you have to do the wrapping yourself.
95274
96275
97276
## How to get it
98277
99-
TODO
278+
Maven:
279+
```
280+
TODO: publish it
281+
```
282+
Gradle:
283+
```
284+
TODO: publish it
285+
```

TODO.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
* usage examples
2-
* just pub/sub
32
* giving access to event tables as a means of a simple export - since they are all there
43
* expiring events/TTL?
54
* compact topics - unique key
65
* join, aka streams
76
* more elaborate definitions change support - especially around partitions growth & shrinkage
87
* JavaDocs
9-
* Support schemas init in registry - why require schemas from users, if it is always the same?
8+
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
9+
* using JOOQ tradeoffs?
10+
* full MySQL/MariaDB support

benchmarks/events-db/init_db.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ CREATE TABLE event (
1919
key TEXT,
2020
value BYTEA NOT NULL,
2121
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
22-
metadata JSONB NOT NULL,
22+
metadata JSON NOT NULL,
2323
PRIMARY KEY (topic, id)
2424
) PARTITION BY LIST (topic);
2525

benchmarks/runner/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
FROM eclipse-temurin:21-alpine
22

3-
COPY target/eventsql-benchmarks-runer-jar-with-dependencies.jar /app.jar
3+
COPY target/eventsql-benchmarks-runner-jar-with-dependencies.jar /app.jar
44

55
ENTRYPOINT ["java", "-jar", "/app.jar"]

benchmarks/runner/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<goal>single</goal>
5252
</goals>
5353
<configuration>
54-
<finalName>eventsql-benchmarks-runer</finalName>
54+
<finalName>eventsql-benchmarks-runner</finalName>
5555
<archive>
5656
<manifest>
5757
<mainClass>com.binaryigor.eventsql.benchmarks.EventSQLBenchmarksRunner</mainClass>

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static void main(String[] args) throws Exception {
6161

6262
var start = System.currentTimeMillis();
6363

64-
publishEvents(eventSQL.publisher(), topicDefinition);
64+
publishEvents(eventSQL.publisher());
6565
var publicationDuration = Duration.ofMillis(System.currentTimeMillis() - start);
6666

6767
printDelimiter();
@@ -94,11 +94,6 @@ static String envValueOrDefault(String key, String defaultValue) {
9494
return System.getenv().getOrDefault(key, defaultValue);
9595
}
9696

97-
static String envValueOrThrow(String key) {
98-
return Optional.ofNullable(System.getenv().get(key))
99-
.orElseThrow(() -> new RuntimeException("%s env variable is required but was not supplied!".formatted(key)));
100-
}
101-
10297
static int envIntValueOrDefault(String key, int defaultValue) {
10398
return Integer.parseInt(envValueOrDefault(key, String.valueOf(defaultValue)));
10499
}
@@ -173,12 +168,12 @@ static <T> T executeQuery(DataSource source, String query, ResultSetMapper<T> re
173168
}
174169
}
175170

176-
static void publishEvents(EventSQLPublisher publisher, TopicDefinition topicDefinition) throws Exception {
171+
static void publishEvents(EventSQLPublisher publisher) throws Exception {
177172
var futures = new LinkedList<Future<?>>();
178173

179174
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
180175
for (var i = 0; i < EVENTS_TO_PUBLISH; i++) {
181-
var result = executor.submit(() -> publishRandomEvent(publisher, topicDefinition));
176+
var result = executor.submit(() -> publishRandomEvent(publisher));
182177
futures.add(result);
183178

184179
var publications = i + 1;
@@ -198,20 +193,18 @@ static void publishEvents(EventSQLPublisher publisher, TopicDefinition topicDefi
198193
}
199194
}
200195

201-
static void publishRandomEvent(EventSQLPublisher publisher, TopicDefinition topicDefinition) {
196+
static void publishRandomEvent(EventSQLPublisher publisher) {
202197
try {
203198
// make publication more evenly distributed in time
204199
Thread.sleep(RANDOM.nextInt(1000));
205-
var partition = RANDOM.nextInt(topicDefinition.partitions());
206-
var event = nextEvent(partition);
207-
publisher.publish(event);
200+
publisher.publish(nextEvent());
208201
} catch (Exception e) {
209202
e.printStackTrace();
210203
}
211204
}
212205

213-
static EventPublication nextEvent(int partition) {
214-
return new EventPublication(TEST_TOPIC, partition, accountCreatedEventJson().getBytes(StandardCharsets.UTF_8));
206+
static EventPublication nextEvent() {
207+
return new EventPublication(TEST_TOPIC, accountCreatedEventJson().getBytes(StandardCharsets.UTF_8));
215208
}
216209

217210
static String accountCreatedEventJson() {

0 commit comments

Comments
 (0)