You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+159-9Lines changed: 159 additions & 9 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -12,7 +12,8 @@ For scalability details, see [benchmarks](/benchmarks/README.md).
12
12
13
13
## How it works
14
14
15
-
We just need to have three tables:
15
+
We just need to have three tables (postgres syntax):
16
+
16
17
```sql
17
18
CREATETABLEtopic (
18
19
name TEXTPRIMARY KEY,
@@ -43,15 +44,16 @@ CREATE TABLE consumer (
43
44
```
44
45
45
46
To consume messages, we just need to periodically (every one to a few seconds) do:
47
+
46
48
```sql
47
49
BEGIN;
48
50
49
51
SELECT*FROM consumer
50
-
WHERE topic = :topic AND name = :c_name
52
+
WHERE topic = :topic AND name = :c_name
51
53
FOR UPDATE SKIP LOCKED;
52
54
53
55
SELECT*FROM event
54
-
WHERE (:last_event_id IS NULL)OR id > last_event_id
56
+
WHEREtopic = :topic AND(:last_event_id IS NULLOR id >:last_event_id)
55
57
ORDER BY id LIMIT N;
56
58
57
59
(process events)
@@ -62,10 +64,13 @@ SET last_event_id = :id,
62
64
WHERE topic = :topic AND name = :c_name;
63
65
```
64
66
65
-
Optionally, to increase throughput & concurrency, we might have partitioned topic and consumers (-1 partition standing for not partitioned topic/consumer).
67
+
Optionally, to increase throughput & concurrency, we might have a partitioned topic and consumers (-1 partition standing
68
+
for not partitioned topic/consumer).
69
+
70
+
Distribution of partitioned events is a sole responsibility of publisher - the library provides sensible default (random
71
+
distribution).
72
+
Consumption of such events per partition (0 in an example) might look like this:
66
73
67
-
Distribution of partitioned events is a sole responsibility of publisher - the library provides sensible default (random distribution).
68
-
Consumption of such events per partition (0 in example) might look like this:
69
74
```sql
70
75
BEGIN;
71
76
@@ -74,7 +79,7 @@ WHERE topic = :topic AND name = :c_name AND partition = 0
74
79
FOR UPDATE SKIP LOCKED;
75
80
76
81
SELECT*FROM event
77
-
WHERE (:last_event_id IS NULL)OR id > last_event_idAND partition =0
82
+
WHEREtopic = :topic AND partition =0AND(:last_event_id IS NULLOR id >:last_event_id)
78
83
ORDER BY id LIMIT N;
79
84
80
85
(process events)
@@ -91,9 +96,154 @@ It's a rather acceptable tradeoff and easy to enforce at the library level.
91
96
92
97
## How to use it
93
98
94
-
TODO: for now, check out benchmarks/app being an example app.
99
+
`EventSQL` is an entrypoint to the whole library. It requires standard Java `javax.sql.DataSource` or a list of
100
+
them:
101
+
102
+
```java
103
+
104
+
importcom.binaryigor.eventsql.EventSQL;
105
+
importjavax.sql.DataSource;
106
+
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
107
+
// as of now, only POSTGRES has fully tested support
108
+
importorg.jooq.SQLDialect;
109
+
110
+
var eventSQL =newEventSQL(dataSource, SQLDialect.POSTGRES);
111
+
ver shardedEventSQL =newEventSQL(dataSources, SQLDialect.POSTGRES);
112
+
```
113
+
114
+
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.
115
+
116
+
### Topics and Consumers
117
+
118
+
Having `EventSQL` instance, we can register topics and their consumers:
Under certain circumstances, it will have special treatment.
213
+
214
+
When a consumer throws `EventSQLConsumptionException`, `DefaultDLTEventFactory` takes it over and publishes failed event to the associated dlt if it can find one:
215
+
```java
216
+
...
217
+
218
+
@Override
219
+
public Optional<EventPublication> create(EventSQLConsumptionException exception, String consumer) {
220
+
var event = exception.event();
221
+
222
+
var dltTopic = event.topic() + "_dlt";
223
+
var dltTopicDefinitionOpt = topicDefinitionsCache.getLoadingIf(dltTopic, true);
224
+
if (dltTopicDefinitionOpt.isEmpty()) {
225
+
return Optional.empty();
226
+
}
227
+
228
+
...
229
+
230
+
// creates dlt event
231
+
```
232
+
233
+
This factory can be customized by using another `EventSQL` constructor or by calling `EventSQLConsumers.configureDLTEventFactory` method.
234
+
235
+
What is also worth noting is that any exception thrown by single event consumer is wrapped into `EventSQLConsumptionException` automatically - see *ConsumerWrapper.class*.
236
+
237
+
When you use `consumers.startBatchConsumer` you have to do wrapping yourself.
0 commit comments