Skip to content

Commit 66edf71

Browse files
committed
API improvements
1 parent 3c84a41 commit 66edf71

28 files changed

Lines changed: 423 additions & 224 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ CREATE TABLE event (
2828
key TEXT,
2929
value BYTEA NOT NULL,
3030
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
31-
metadata JSONB NOT NULL,
31+
metadata JSON NOT NULL,
3232
PRIMARY KEY (topic, id)
3333
) PARTITION BY LIST (topic);
3434

benchmarks/events-db/init_db.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ CREATE TABLE event (
1919
key TEXT,
2020
value BYTEA NOT NULL,
2121
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
22-
metadata JSONB NOT NULL,
22+
metadata JSON NOT NULL,
2323
PRIMARY KEY (topic, id)
2424
) PARTITION BY LIST (topic);
2525

benchmarks/runner/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
FROM eclipse-temurin:21-alpine
22

3-
COPY target/eventsql-benchmarks-runer-jar-with-dependencies.jar /app.jar
3+
COPY target/eventsql-benchmarks-runner-jar-with-dependencies.jar /app.jar
44

55
ENTRYPOINT ["java", "-jar", "/app.jar"]

benchmarks/runner/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<goal>single</goal>
5252
</goals>
5353
<configuration>
54-
<finalName>eventsql-benchmarks-runer</finalName>
54+
<finalName>eventsql-benchmarks-runner</finalName>
5555
<archive>
5656
<manifest>
5757
<mainClass>com.binaryigor.eventsql.benchmarks.EventSQLBenchmarksRunner</mainClass>

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static void main(String[] args) throws Exception {
6161

6262
var start = System.currentTimeMillis();
6363

64-
publishEvents(eventSQL.publisher(), topicDefinition);
64+
publishEvents(eventSQL.publisher());
6565
var publicationDuration = Duration.ofMillis(System.currentTimeMillis() - start);
6666

6767
printDelimiter();
@@ -94,11 +94,6 @@ static String envValueOrDefault(String key, String defaultValue) {
9494
return System.getenv().getOrDefault(key, defaultValue);
9595
}
9696

97-
static String envValueOrThrow(String key) {
98-
return Optional.ofNullable(System.getenv().get(key))
99-
.orElseThrow(() -> new RuntimeException("%s env variable is required but was not supplied!".formatted(key)));
100-
}
101-
10297
static int envIntValueOrDefault(String key, int defaultValue) {
10398
return Integer.parseInt(envValueOrDefault(key, String.valueOf(defaultValue)));
10499
}
@@ -173,12 +168,12 @@ static <T> T executeQuery(DataSource source, String query, ResultSetMapper<T> re
173168
}
174169
}
175170

176-
static void publishEvents(EventSQLPublisher publisher, TopicDefinition topicDefinition) throws Exception {
171+
static void publishEvents(EventSQLPublisher publisher) throws Exception {
177172
var futures = new LinkedList<Future<?>>();
178173

179174
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
180175
for (var i = 0; i < EVENTS_TO_PUBLISH; i++) {
181-
var result = executor.submit(() -> publishRandomEvent(publisher, topicDefinition));
176+
var result = executor.submit(() -> publishRandomEvent(publisher));
182177
futures.add(result);
183178

184179
var publications = i + 1;
@@ -198,20 +193,18 @@ static void publishEvents(EventSQLPublisher publisher, TopicDefinition topicDefi
198193
}
199194
}
200195

201-
static void publishRandomEvent(EventSQLPublisher publisher, TopicDefinition topicDefinition) {
196+
static void publishRandomEvent(EventSQLPublisher publisher) {
202197
try {
203198
// make publication more evenly distributed in time
204199
Thread.sleep(RANDOM.nextInt(1000));
205-
var partition = RANDOM.nextInt(topicDefinition.partitions());
206-
var event = nextEvent(partition);
207-
publisher.publish(event);
200+
publisher.publish(nextEvent());
208201
} catch (Exception e) {
209202
e.printStackTrace();
210203
}
211204
}
212205

213-
static EventPublication nextEvent(int partition) {
214-
return new EventPublication(TEST_TOPIC, partition, accountCreatedEventJson().getBytes(StandardCharsets.UTF_8));
206+
static EventPublication nextEvent() {
207+
return new EventPublication(TEST_TOPIC, accountCreatedEventJson().getBytes(StandardCharsets.UTF_8));
215208
}
216209

217210
static String accountCreatedEventJson() {

src/main/java/com/binaryigor/eventsql/EventPublication.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,10 @@
33
import java.util.Map;
44

55
public record EventPublication(String topic,
6-
int partition,
76
String key,
87
byte[] value,
98
Map<String, String> metadata) {
109

11-
public EventPublication(String topic, int partition, String key, byte[] value) {
12-
this(topic, partition, key, value, Map.of());
13-
}
14-
15-
public EventPublication(String topic, int partition, byte[] value, Map<String, String> metadata) {
16-
this(topic, partition, null, value, metadata);
17-
}
18-
19-
public EventPublication(String topic, int partition, byte[] value) {
20-
this(topic, partition, value, Map.of());
21-
}
22-
23-
public EventPublication(String topic, String key, byte[] value, Map<String, String> metadata) {
24-
this(topic, -1, key, value, metadata);
25-
}
26-
2710
public EventPublication(String topic, String key, byte[] value) {
2811
this(topic, key, value, Map.of());
2912
}
@@ -35,8 +18,4 @@ public EventPublication(String topic, byte[] value, Map<String, String> metadata
3518
public EventPublication(String topic, byte[] value) {
3619
this(topic, value, Map.of());
3720
}
38-
39-
public EventPublication withPartition(int partition) {
40-
return new EventPublication(topic, partition, key, value, metadata);
41-
}
4221
}

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.binaryigor.eventsql;
22

3-
import com.binaryigor.eventsql.internal.EventSQLOps;
43
import com.binaryigor.eventsql.internal.DefaultEventSQLRegistry;
4+
import com.binaryigor.eventsql.internal.EventSQLOps;
55
import com.binaryigor.eventsql.internal.TopicDefinitionsCache;
66
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLConsumers;
77
import com.binaryigor.eventsql.internal.sharded.ShardedEventSQLPublisher;
@@ -18,7 +18,8 @@
1818
import java.util.ArrayList;
1919
import java.util.Collection;
2020
import java.util.List;
21-
import java.util.Optional;
21+
22+
import static java.util.Collections.unmodifiableList;
2223

2324
public class EventSQL {
2425

@@ -27,34 +28,20 @@ public class EventSQL {
2728
private final EventSQLConsumers consumers;
2829

2930
public EventSQL(DataSource dataSource, SQLDialect sqlDialect) {
30-
this(dataSource, sqlDialect, Clock.systemUTC(), Optional.empty());
31+
this(dataSource, sqlDialect, Clock.systemUTC());
3132
}
3233

3334
public EventSQL(DataSource dataSource, SQLDialect sqlDialect, Clock clock) {
34-
this(dataSource, sqlDialect, clock, Optional.empty());
35-
}
36-
37-
public EventSQL(DataSource dataSource,
38-
SQLDialect sqlDialect,
39-
Clock clock,
40-
Optional<EventSQLConsumers.DLTEventFactory> dltEventFactory) {
41-
this(List.of(dataSource), sqlDialect, clock, dltEventFactory);
35+
this(List.of(dataSource), sqlDialect, clock);
4236
}
4337

4438
public EventSQL(Collection<DataSource> dataSources, SQLDialect sqlDialect) {
45-
this(dataSources, sqlDialect, Clock.systemUTC(), Optional.empty());
39+
this(dataSources, sqlDialect, Clock.systemUTC());
4640
}
4741

4842
public EventSQL(Collection<DataSource> dataSources,
4943
SQLDialect sqlDialect,
5044
Clock clock) {
51-
this(dataSources, sqlDialect, clock, Optional.empty());
52-
}
53-
54-
public EventSQL(Collection<DataSource> dataSources,
55-
SQLDialect sqlDialect,
56-
Clock clock,
57-
Optional<EventSQLConsumers.DLTEventFactory> dltEventFactory) {
5845
if (dataSources.isEmpty()) {
5946
throw new IllegalArgumentException("At least one data source is required");
6047
}
@@ -72,13 +59,12 @@ public EventSQL(Collection<DataSource> dataSources,
7259

7360
var topicRepository = new SqlTopicRepository(transactions);
7461
var consumerRepository = new SqlConsumerRepository(transactions);
75-
var eventRepository = new SqlEventRepository(transactions);
62+
var eventRepository = new SqlEventRepository(transactions, sqlDialect);
7663

7764
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
7865

7966
var topicDefinitionsCache = new TopicDefinitionsCache(topicRepository);
8067
var ops = new EventSQLOps(topicDefinitionsCache, transactions, consumerRepository, eventRepository, clock);
81-
dltEventFactory.ifPresent(ops::configureDLTEventFactory);
8268

8369
registryList.add(registry);
8470
publisherList.add(ops);
@@ -91,9 +77,9 @@ public EventSQL(Collection<DataSource> dataSources,
9177
publisher = publisherList.getFirst();
9278
consumers = consumersList.getFirst();
9379
} else {
94-
registry = new ShardedEventSQLRegistry(registryList);
95-
publisher = new ShardedEventSQLPublisher(publisherList);
96-
consumers = new ShardedEventSQLConsumers(consumersList);
80+
registry = new ShardedEventSQLRegistry(unmodifiableList(registryList));
81+
publisher = new ShardedEventSQLPublisher(unmodifiableList(publisherList));
82+
consumers = new ShardedEventSQLConsumers(unmodifiableList(consumersList));
9783
}
9884
}
9985

src/main/java/com/binaryigor/eventsql/EventSQLPublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ public interface EventSQLPublisher {
77
void publish(EventPublication publication);
88

99
void publishAll(Collection<EventPublication> publications);
10+
11+
void configurePartitioner(Partitioner partitioner);
12+
13+
Partitioner partitioner();
1014
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.binaryigor.eventsql;
2+
3+
public interface Partitioner {
4+
int partition(EventPublication publication, int topicPartitions);
5+
}

src/main/java/com/binaryigor/eventsql/internal/DefaultDLTEventFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ public Optional<EventPublication> create(EventSQLConsumptionException exception,
3333
metadata.put("exceptionType", cause.getClass().getName());
3434
metadata.put("exceptionMessage", Optional.ofNullable(cause.getMessage()).orElse(exception.getMessage()));
3535

36-
// TODO: maybe warning, strange situation
37-
var partition = dltTopicDefinitionOpt.get().partitions() < event.partition() ? -1 : dltTopicDefinitionOpt.get().partitions();
38-
return Optional.of(new EventPublication(dltTopic, partition, event.key(), event.value(), metadata));
36+
return Optional.of(new EventPublication(dltTopic, event.key(), event.value(), metadata));
3937
}
4038
}

0 commit comments

Comments
 (0)