Skip to content

Commit 4967191

Browse files
committed
Better docs
1 parent 66edf71 commit 4967191

8 files changed

Lines changed: 105 additions & 50 deletions

File tree

README.md

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,23 @@ FOR UPDATE SKIP LOCKED;
5454

5555
SELECT * FROM event
5656
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
57-
ORDER BY id LIMIT N;
57+
ORDER BY id LIMIT :limit;
5858

5959
(process events)
6060

6161
UPDATE consumer
6262
SET last_event_id = :id,
6363
last_consumption_at = :now
6464
WHERE topic = :topic AND name = :c_name;
65+
66+
COMMIT;
6567
```
6668

6769
Optionally, to increase throughput & concurrency, we might have a partitioned topic and consumers (-1 partition standing
68-
for not partitioned topic/consumer).
70+
for not partitioned topic/consumer/event).
71+
72+
Distribution of partitioned events is the sole responsibility of the publisher.
6973

70-
Distribution of partitioned events is a sole responsibility of publisher - the library provides sensible default (random
71-
distribution).
7274
Consumption of such events per partition (0 in an example) might look like this:
7375

7476
```sql
@@ -80,19 +82,20 @@ FOR UPDATE SKIP LOCKED;
8082

8183
SELECT * FROM event
8284
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
83-
ORDER BY id LIMIT N;
85+
ORDER BY id LIMIT :limit;
8486

8587
(process events)
8688

8789
UPDATE consumer
8890
SET last_event_id = :id,
8991
last_consumption_at = :now
9092
WHERE topic = :topic AND name = :c_name AND partition = 0;
93+
94+
COMMIT;
9195
```
9296

93-
Limitation being that if consumer is partitioned, it must have the exact same number of partition as in the topic
94-
definition.
95-
It's a rather acceptable tradeoff and easy to enforce at the library level.
97+
Limitation being that if consumer is partitioned, it must have the exact same number of partition as the topic
98+
definition has. It's a rather acceptable tradeoff and easy to enforce at the library level.
9699

97100
## How to use it
98101

@@ -104,7 +107,8 @@ them:
104107
import com.binaryigor.eventsql.EventSQL;
105108
import javax.sql.DataSource;
106109
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
107-
// as of now, only POSTGRES has fully tested support
110+
// as of now, only POSTGRES has fully tested support;
111+
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
108112
import org.jooq.SQLDialect;
109113

110114
var eventSQL = new EventSQL(dataSource, SQLDialect.POSTGRES);
@@ -128,72 +132,104 @@ eventSQL.registry()
128132
.registerConsumer(new ConsumerDefinition("invoice_issued", "consumer-2", true));
129133
```
130134

131-
Topics and consumers can be both partitioned and not partitioned (-1 stands for not partitioned).
135+
Topics and consumers can be both partitioned and not partitioned.
132136
**Partitioned topics allow to have partitioned consumers, increasing parallelism.**
133137
Parallelism of partitioned consumers is as high as consumed topic number of partitions - events have ordering guarantee within a partition.
134138
As a consequence, for a given consumer, each partition can be processed only by a single thread at the time.
135139

136-
For a consumer to be partitioned (third argument in the example) its topic must be partitioned as well - it will have the same number of partitions.
137-
The opposite does not have to be true - consumer might not be partitioned but related topic can;
138-
it has performance implications though, since as described above, consumer parallelism is capped at its number of partitions.
140+
For a consumer to be partitioned its topic must be partitioned as well - it will have the same number of partitions.
141+
The opposite does not have to be true - consumer might not be partitioned but a related topic can; it has performance implications though, since as described above, consumer parallelism is capped at its number of partitions.
139142

140-
**For sharding, partitions are multiplied by the number of shards.**
143+
**With sharding, partitions are multiplied by the number of shards (db instances).**
141144

142-
For example, if we have *3 shards and a topic with 10 partitions - each shard will host 10 partitions, giving 30 partitions in total*.
145+
For example, if we have *3 shards (3 dbs) and a topic with 10 partitions - each shard (db) will host 10 partitions, giving 30 partitions in total*.
143146
Same with consumers of a sharded topic - they will be all multiplied by the number of shards.
144147

145-
For events, it works differently - in the example above, *each shard will host ~ 33% (1/3) of the topic events data*.
148+
For events, it works differently - in the example above, *each shard will host ~ 33% (1/3) of the topic events data* (assuming even partition distribution).
149+
To get all events, we must read them from all shards.
146150

147151
There will be *30 consumer instances* in this particular case - `3 shards * 10 partitions`; each consuming from one partition hosted on a given shard.
148-
Each event will be published to a one partition of a single shard - events are unique globally, across all shards.
152+
Each event will be published to a one partition of a single shard - as a consequence, events are unique globally, across all shards.
149153

150154
### Publishing
151155

152156
We can publish single events and batches of arbitrary data and type:
153157
```java
154158
var publisher = eventSQL.publisher();
155159

156-
// with - 1 argument, if topic is partitioned, it will be published to a random partition
157-
publisher.publish(new EventPublication("txt_topic", -1, "txt event".getBytes(StandardCharsets.UTF_8)));
158-
publisher.publish(new EventPublication("raw_topic", 1, new byte[]{1, 2, 3}));
159-
publisher.publish(new EventPublication("json_topic", 2,
160+
publisher.publish(new EventPublication("txt_topic", "txt event".getBytes(StandardCharsets.UTF_8)));
161+
publisher.publish(new EventPublication("raw_topic", new byte[]{1, 2, 3}));
162+
publisher.publish(new EventPublication("json_topic",
160163
"""
161164
{
162165
"id": 2,
163166
"name: "some-user"
164167
}
165168
""".getBytes(StandardCharsets.UTF_8)));
166169
167-
// events can have keys and metadata as well
168-
publisher.publish(new EventPublication("txt_topic", -1,
170+
// events can have keys and metadata as well;
171+
// key determines event distribution - if it's null, partition is randomly assigned
172+
publisher.publish(new EventPublication("txt_topic",
169173
"event-key",
170174
"txt event".getBytes(StandardCharsets.UTF_8),
171175
Map.of("some-tag", "some-meta-info")));
172176
173177
174-
// events can be published in batches, for improved throughput
178+
// events can be also published in batches, for improved throughput
175179
publisher.publishAll(List.of(
176-
new EventPublication("txt_topic", -1, "txt event 1".getBytes(StandardCharsets.UTF_8)),
177-
new EventPublication("txt_topic", -1, "txt event 2".getBytes(StandardCharsets.UTF_8)),
178-
new EventPublication("txt_topic", -1, "txt event 3".getBytes(StandardCharsets.UTF_8))));
180+
new EventPublication("txt_topic", "txt event 1".getBytes(StandardCharsets.UTF_8)),
181+
new EventPublication("txt_topic", "txt event 2".getBytes(StandardCharsets.UTF_8)),
182+
new EventPublication("txt_topic", "txt event 3".getBytes(StandardCharsets.UTF_8))));
179183
```
180184
185+
### Partitioner
186+
187+
Event partition is determined by `EventSQLPublisher.Partitioner`. By default, the following implementation is used:
188+
```java
189+
public class DefaultPartitioner implements EventSQLPublisher.Partitioner {
190+
191+
private static final Random RANDOM = new Random();
192+
193+
@Override
194+
public int partition(EventPublication publication, int topicPartitions) {
195+
if (topicPartitions == -1) {
196+
return -1;
197+
}
198+
if (publication.key() == null) {
199+
return RANDOM.nextInt(topicPartitions);
200+
}
201+
202+
return keyHash(publication.key())
203+
.mod(BigInteger.valueOf(topicPartitions))
204+
.intValue();
205+
}
206+
207+
...
208+
```
209+
210+
For a not partitioned topic, no partition is assigned.
211+
212+
If the topic is partitioned and has a null key - partition is random.
213+
If a key is defined, the partition is assigned based on key hash. Thanks to this, we have a guarantee that events associated with the same key always land in the same partition.
214+
215+
If you want to change this behavior, you can provide your own implementation and configure it by calling `EventSQLPublisher.configurePartitioner` method.
216+
181217
### Consuming
182218
183219
We can have both single event and batch consumers:
184220
```java
185221
var consumers = eventSQL.consumers();
186222
187-
consumers.startConsumer("txt_topic", "single-consumer", (Event e) -> {
223+
consumers.startConsumer("txt_topic", "single-consumer", event -> {
188224
// handle single event
189225
});
190226
// with more frequent polling - by default it is 1 second
191-
consumers.startConsumer("txt_topic", "single-consumer-customized", (Event e) -> {
227+
consumers.startConsumer("txt_topic", "single-consumer-customized", event -> {
192228
// handle single event
193229
}, Duration.ofMillis(100));
194230
195-
consumers.startBatchConsumer("txt_topic", "batch-consumer", (List<Event> events) -> {
196-
// handle events batch
231+
consumers.startBatchConsumer("txt_topic", "batch-consumer", events -> {
232+
// handle events batch for better performance
197233
}, // customize batch behavior:
198234
// minEvents, maxEvents,
199235
// pollingDelay and maxPollingDelay - how long to wait for minEvents
@@ -203,15 +239,15 @@ consumers.startBatchConsumer("txt_topic", "batch-consumer", (List<Event> events)
203239
204240
### Dead Letter Topics (DLT)
205241
206-
If we register a topic with the DLT as follows:
242+
If we register a topic with DLT as follows:
207243
```java
208244
eventSQL.registry()
209245
.registerTopic(new TopicDefinition("account_created", -1))
210246
.registerTopic(new TopicDefinition("account_created_dlt", -1));
211247
```
212-
Under certain circumstances, it will have special treatment.
248+
Under certain circumstances, it will get a special treatment.
213249
214-
When a consumer throws `EventSQLConsumptionException`, `DefaultDLTEventFactory` takes it over and publishes failed event to the associated dlt if it can find one:
250+
When a consumer throws `EventSQLConsumptionException`, `DefaultDLTEventFactory` takes over and publishes failed event to the associated DLT, if it can find one:
215251
```java
216252
...
217253
@@ -230,11 +266,11 @@ public Optional<EventPublication> create(EventSQLConsumptionException exception,
230266
// creates dlt event
231267
```
232268
233-
This factory can be customized by using another `EventSQL` constructor or by calling `EventSQLConsumers.configureDLTEventFactory` method.
269+
This factory can be customized by calling `EventSQLConsumers.configureDLTEventFactory` method.
234270
235-
What is also worth noting is that any exception thrown by single event consumer is wrapped into `EventSQLConsumptionException` automatically - see *ConsumerWrapper.class*.
271+
What is also worth noting is that any exception thrown by a single event consumer is wrapped into `EventSQLConsumptionException` automatically - see `ConsumerWrapper` class.
236272
237-
When you use `consumers.startBatchConsumer` you have to do wrapping yourself.
273+
When you use `EventSQLConsumers.startBatchConsumer` you have to do the wrapping yourself.
238274
239275
240276
## How to get it

TODO.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
* usage examples
2-
* just pub/sub
32
* giving access to event tables as a means of a simple export - since they are all there
43
* expiring events/TTL?
54
* compact topics - unique key
65
* join, aka streams
76
* more elaborate definitions change support - especially around partitions growth & shrinkage
87
* JavaDocs
9-
* Support schemas init in registry - why require schemas from users, if it is always the same?
8+
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
109
* using JOOQ tradeoffs?
10+
* full MySQL/MariaDB support

src/main/java/com/binaryigor/eventsql/EventSQLPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ public interface EventSQLPublisher {
1111
void configurePartitioner(Partitioner partitioner);
1212

1313
Partitioner partitioner();
14+
15+
interface Partitioner {
16+
int partition(EventPublication publication, int topicPartitions);
17+
}
1418
}

src/main/java/com/binaryigor/eventsql/Partitioner.java

Lines changed: 0 additions & 5 deletions
This file was deleted.

src/main/java/com/binaryigor/eventsql/internal/DefaultPartitioner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package com.binaryigor.eventsql.internal;
22

33
import com.binaryigor.eventsql.EventPublication;
4-
import com.binaryigor.eventsql.Partitioner;
4+
import com.binaryigor.eventsql.EventSQLPublisher;
55

66
import java.math.BigInteger;
77
import java.nio.charset.StandardCharsets;
88
import java.security.MessageDigest;
99
import java.util.Random;
1010

11-
public class DefaultPartitioner implements Partitioner {
11+
public class DefaultPartitioner implements EventSQLPublisher.Partitioner {
1212

1313
private static final Random RANDOM = new Random();
1414

src/main/java/com/binaryigor/eventsql/internal/sharded/ShardedEventSQLPublisher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.binaryigor.eventsql.EventPublication;
44
import com.binaryigor.eventsql.EventSQLPublisher;
5-
import com.binaryigor.eventsql.Partitioner;
65

76
import java.util.Collection;
87
import java.util.List;

src/test/java/com/binaryigor/eventsql/EventSQLPublisherTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,40 @@
1313
import java.util.stream.Stream;
1414

1515
import static org.assertj.core.api.Assertions.assertThatThrownBy;
16+
import static org.assertj.core.api.Assertions.tuple;
1617
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
1718

1819
public class EventSQLPublisherTest extends IntegrationTest {
1920

2021
private static final String PARTITIONED_TOPIC = "partitioned_topic";
22+
private static final int TOPIC_PARTITIONS = 3;
2123
private static final String NOT_PARTITIONED_TOPIC = "not_partitioned_topic";
2224

2325
@BeforeEach
2426
void setup() {
25-
registry.registerTopic(new TopicDefinition(PARTITIONED_TOPIC, 3))
27+
registry.registerTopic(new TopicDefinition(PARTITIONED_TOPIC, TOPIC_PARTITIONS))
2628
.registerTopic(new TopicDefinition(NOT_PARTITIONED_TOPIC, -1));
2729
}
2830

31+
@Test
32+
void publishesToAssignedByPartitionerPartitions() {
33+
// given
34+
var events = IntStream.range(0, 5)
35+
.mapToObj(idx -> TestObjects.randomEventPublication(PARTITIONED_TOPIC, "key" + idx))
36+
.toList();
37+
38+
// when
39+
events.forEach(publisher::publish);
40+
41+
// then
42+
var expectedKeyPartitions = events.stream()
43+
.map(e -> tuple(e.key(), publisher.partitioner().partition(e, TOPIC_PARTITIONS)))
44+
.toList();
45+
assertThat(publishedEvents())
46+
.extracting("key", "partition")
47+
.containsExactlyElementsOf(expectedKeyPartitions);
48+
}
49+
2950
@Test
3051
void publishesToVariousPartitions() {
3152
// when

src/test/java/com/binaryigor/eventsql/test/TestPartitioner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.binaryigor.eventsql.test;
22

33
import com.binaryigor.eventsql.EventPublication;
4-
import com.binaryigor.eventsql.Partitioner;
4+
import com.binaryigor.eventsql.EventSQLPublisher;
55

6-
public class TestPartitioner implements Partitioner {
6+
public class TestPartitioner implements EventSQLPublisher.Partitioner {
77

88
private final int nextPartition;
99

0 commit comments

Comments
 (0)