Skip to content

Commit ffb6ef8

Browse files
committed
Read timeout fix
1 parent 95aa04a commit ffb6ef8

24 files changed

Lines changed: 241 additions & 120 deletions

README.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ CREATE TABLE consumer (
3636
topic TEXT NOT NULL,
3737
name TEXT NOT NULL,
3838
partition SMALLINT NOT NULL,
39+
first_event_id BIGINT,
3940
last_event_id BIGINT,
4041
last_consumption_at TIMESTAMP,
42+
consumed_events BIGINT NOT NULL,
4143
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
4244
PRIMARY KEY (topic, name, partition)
4345
);
@@ -54,6 +56,9 @@ FOR UPDATE SKIP LOCKED;
5456

5557
SELECT * FROM event
5658
WHERE topic = :topic AND (:last_event_id IS NULL OR id > :last_event_id)
59+
-- eventual consistency of auto increment; there is no guarantee that record of id 2 is visible after id 1 record --
60+
-- in the implementation, we set insert statements timeout to '250 ms' so it is safe --
61+
AND created_at < (NOW() - interval '333 ms')
5762
ORDER BY id LIMIT :limit;
5863

5964
(process events)
@@ -82,6 +87,9 @@ FOR UPDATE SKIP LOCKED;
8287

8388
SELECT * FROM event
8489
WHERE topic = :topic AND partition = 0 AND (:last_event_id IS NULL OR id > :last_event_id)
90+
-- eventual consistency of auto increment; there is no guarantee that record of id 2 is visible after id 1 record --
91+
-- in the implementation, we set insert statements timeout to '250 ms' so it is safe --
92+
AND created_at < (NOW() - interval '333 ms')
8593
ORDER BY id LIMIT :limit;
8694

8795
(process events)
@@ -99,18 +107,20 @@ definition has. It's a rather acceptable tradeoff and easy to enforce at the lib
99107

100108
## How to use it
101109

102-
`EventSQL` is an entrypoint to the whole library. It requires standard Java `javax.sql.DataSource` or a list of
110+
`EventSQL` is an entrypoint to the whole library. It requires single data source properties or a list of
103111
them:
104112

105113
```java
106-
107114
import com.binaryigor.eventsql.EventSQL;
108-
import javax.sql.DataSource;
109-
// dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
115+
// EventSQL.Dialect is a dialect of your events backend - POSTGRES, MYSQL, MARIADB and so on;
110116
// as of now, only POSTGRES has fully tested support;
111117
// should also work with others but some things - event table partition management for example - works only with Postgres, for others it must be managed manually
112-
var eventSQL = new EventSQL(dataSource, EventSQLDialect.POSTGRES);
113-
ver shardedEventSQL = new EventSQL(dataSources, EventSQLDialect.POSTGRES);
118+
var eventSQL = EventSQL.of(new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "dbUrl", "dbUsername", "dbPassword"));
119+
ver shardedEventSQL = EventSQL.of(
120+
List.of(
121+
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db0Url", "db0Username", "db0Password"),
122+
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db1Url", "db1Username", "db1Password"),
123+
new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES, "db2Url", "db2Username", "db2Password")));
114124
```
115125

116126
Sharded version works in the same vain - it just assumes that topics and consumers are hosted on multiple dbs.

TODO.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,4 @@
77
* JavaDocs
88
* Support schemas init in registry - why require schemas from users, if it is always the same (keeping dbs diffs in mind)?
99
* using JOOQ tradeoffs?
10-
* custom enums for db dialect - hide JOOQ usage!
1110
* full MySQL/MariaDB support
12-
* randomize delay option to increase parallelism
Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
package com.binaryigor.eventsql.benchmarks;
22

33
import com.binaryigor.eventsql.EventSQL;
4-
import com.binaryigor.eventsql.EventSQLDialect;
5-
import com.zaxxer.hikari.HikariConfig;
6-
import com.zaxxer.hikari.HikariDataSource;
74
import org.springframework.boot.SpringApplication;
85
import org.springframework.boot.autoconfigure.SpringBootApplication;
96
import org.springframework.boot.context.properties.EnableConfigurationProperties;
107
import org.springframework.context.annotation.Bean;
118

12-
import javax.sql.DataSource;
13-
149
@SpringBootApplication
1510
@EnableConfigurationProperties(EventsProperties.class)
1611
public class EventSQLBenchmarksApp {
@@ -21,20 +16,11 @@ public static void main(String[] args) {
2116

2217
@Bean
2318
EventSQL eventSQL(EventsProperties eventsProperties) {
24-
var dataSources = eventsProperties.dataSources().stream()
19+
var dataSourceProperties = eventsProperties.dataSources().stream()
2520
.filter(EventsProperties.DataSourceProperties::enabled)
26-
.map(this::dataSource)
21+
.map(ps -> new EventSQL.DataSourceProperties(EventSQL.Dialect.POSTGRES,
22+
ps.url(), ps.username(), ps.password()))
2723
.toList();
28-
return new EventSQL(dataSources, EventSQLDialect.POSTGRES);
29-
}
30-
31-
private DataSource dataSource(EventsProperties.DataSourceProperties properties) {
32-
var config = new HikariConfig();
33-
config.setJdbcUrl(properties.url());
34-
config.setUsername(properties.username());
35-
config.setPassword(properties.password());
36-
config.setMinimumIdle(properties.connections());
37-
config.setMaximumPoolSize(properties.connections());
38-
return new HikariDataSource(config);
24+
return EventSQL.of(dataSourceProperties);
3925
}
4026
}

benchmarks/app/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ spring:
77
true
88

99
server:
10-
port: 8090
10+
port: 8080
1111

1212
management:
1313
endpoints:

benchmarks/events-db/init_db.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ CREATE TABLE consumer (
2727
topic TEXT NOT NULL,
2828
name TEXT NOT NULL,
2929
partition SMALLINT NOT NULL,
30+
first_event_id BIGINT,
3031
last_event_id BIGINT,
3132
last_consumption_at TIMESTAMP,
33+
consumed_events BIGINT NOT NULL,
3234
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
3335
PRIMARY KEY (topic, name, partition)
3436
);

benchmarks/runner/build_and_package.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export app=$app
3131
export tag=$tag
3232
export run_cmd="docker run -d \\
3333
-e EVENTS_RATE -e EVENTS_TO_PUBLISH -e RUNNER_INSTANCES \\
34-
--memory 2G --cpus 2 \\
34+
--memory 2G --cpus 4 \\
3535
--network host \\
3636
--name $app $tagged_image"
3737

benchmarks/runner/src/main/java/com/binaryigor/eventsql/benchmarks/EventSQLBenchmarksRunner.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.math.RoundingMode;
1010
import java.nio.charset.StandardCharsets;
1111
import java.sql.ResultSet;
12-
import java.time.Clock;
1312
import java.time.Duration;
1413
import java.time.Instant;
1514
import java.time.LocalDateTime;
@@ -23,10 +22,10 @@ public class EventSQLBenchmarksRunner {
2322
static final String DB_URL = envValueOrDefault("DB_URL", "jdbc:postgresql://localhost:5432/events");
2423
static final String DB_USERNAME = envValueOrDefault("DB_URL", "events");
2524
static final String DB_PASSWORD = envValueOrDefault("DB_PASSWORD", "events");
26-
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 25);
27-
static final EventSQLDialect SQL_DIALECT = EventSQLDialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
25+
static final int DATA_SOURCE_POOL_SIZE = envIntValueOrDefault("DATA_SOURCE_POOL_SIZE", 50);
26+
static final EventSQL.Dialect SQL_DIALECT = EventSQL.Dialect.valueOf(envValueOrDefault("SQL_DIALECT", "POSTGRES"));
2827
static final int RUNNER_INSTANCES = envIntValueOrDefault("RUNNER_INSTANCES", 1);
29-
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 10_000);
28+
static final int EVENTS_TO_PUBLISH = envIntValueOrDefault("EVENTS_TO_PUBLISH", 60_000);
3029
static final int EVENTS_RATE = envIntValueOrDefault("EVENTS_RATE", 1000);
3130
static final String TEST_TOPIC = envValueOrDefault("TEST_TOPIC", "account_created");
3231
static final String TEST_CONSUMER = envValueOrDefault("TEST_CONSUMER", "benchmarks-consumer");
@@ -40,7 +39,7 @@ public static void main(String[] args) throws Exception {
4039

4140
var dataSource = dataSource(DB_URL, DB_USERNAME, DB_PASSWORD);
4241

43-
var eventSQL = new EventSQL(dataSource, SQL_DIALECT, Clock.systemUTC());
42+
var eventSQL = EventSQL.of(new EventSQL.DataSourceProperties(SQL_DIALECT, DB_URL, DB_USERNAME, DB_PASSWORD, DATA_SOURCE_POOL_SIZE));
4443

4544
printDelimiter();
4645

@@ -276,7 +275,6 @@ record EventTableStats(long lastId,
276275
throw new IllegalArgumentException("lastIdsPerPartition cannot be null or empty");
277276
}
278277
}
279-
280278
}
281279

282280
record ConsumerTableStats(Map<Integer, Long> lastIdsPerPartition) {

pom.xml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
<artifactId>jooq</artifactId>
3636
<version>${jooq.version}</version>
3737
</dependency>
38+
<dependency>
39+
<groupId>com.zaxxer</groupId>
40+
<artifactId>HikariCP</artifactId>
41+
<version>${hikari.version}</version>
42+
</dependency>
3843

3944
<dependency>
4045
<groupId>org.junit.jupiter</groupId>
@@ -66,12 +71,6 @@
6671
<version>${testcontainers.version}</version>
6772
<scope>test</scope>
6873
</dependency>
69-
<dependency>
70-
<groupId>com.zaxxer</groupId>
71-
<artifactId>HikariCP</artifactId>
72-
<version>${hikari.version}</version>
73-
<scope>test</scope>
74-
</dependency>
7574
<dependency>
7675
<groupId>org.postgresql</groupId>
7776
<artifactId>postgresql</artifactId>

src/main/java/com/binaryigor/eventsql/EventSQL.java

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
import com.binaryigor.eventsql.internal.sql.SQLEventRepository;
1111
import com.binaryigor.eventsql.internal.sql.SQLTopicRepository;
1212
import com.binaryigor.eventsql.internal.sql.SQLTransactions;
13+
import com.zaxxer.hikari.HikariConfig;
14+
import com.zaxxer.hikari.HikariDataSource;
1315
import org.jooq.SQLDialect;
1416
import org.jooq.impl.DSL;
1517

16-
import javax.sql.DataSource;
1718
import java.time.Clock;
1819
import java.util.ArrayList;
1920
import java.util.Collection;
@@ -23,25 +24,40 @@
2324

2425
public class EventSQL {
2526

27+
public static final int PUBLISH_TIMEOUT = 250;
28+
public static final int NEXT_EVENTS_READ_VISIBILITY_THRESHOLD = 333;
2629
private final EventSQLRegistry registry;
2730
private final EventSQLPublisher publisher;
2831
private final EventSQLConsumers consumers;
2932

30-
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect) {
31-
this(dataSource, sqlDialect, Clock.systemUTC());
33+
public static EventSQL of(DataSourceProperties dataSourceProperties) {
34+
return of(dataSourceProperties, Clock.systemUTC());
3235
}
3336

34-
public EventSQL(DataSource dataSource, EventSQLDialect sqlDialect, Clock clock) {
35-
this(List.of(dataSource), sqlDialect, clock);
37+
public static EventSQL of(DataSourceProperties dataSourceProperties, Clock clock) {
38+
return of(List.of(dataSourceProperties), clock);
3639
}
3740

38-
public EventSQL(Collection<DataSource> dataSources, EventSQLDialect sqlDialect) {
39-
this(dataSources, sqlDialect, Clock.systemUTC());
41+
public static EventSQL of(Collection<DataSourceProperties> dataSourceProperties) {
42+
return of(dataSourceProperties, Clock.systemUTC());
4043
}
4144

42-
public EventSQL(Collection<DataSource> dataSources,
43-
EventSQLDialect sqlDialect,
44-
Clock clock) {
45+
public static EventSQL of(Collection<DataSourceProperties> dataSourceProperties, Clock clock) {
46+
return new EventSQL(dataSourceProperties.stream().map(props ->
47+
new DataSource(props.dialect(), dataSource(props)))
48+
.toList(), clock);
49+
}
50+
51+
/**
52+
* Primary constructor used mainly for internal purposes.
53+
* It should rarely be used outside library implementation, only if static of factories do not provide what is necessary.
54+
* Additionally, if used, PUBLISH_TIMEOUT must be set as a query/statement timeout on each newly open data source connection.
55+
* If not, there is a risk of loosing some events by consumers - it is being set by static of methods when dataSource() factory is called; check them out for guidance.
56+
*
57+
* @param dataSources list of data sources where events are hosted
58+
* @param clock clock used mainly for consumer delays
59+
*/
60+
public EventSQL(Collection<DataSource> dataSources, Clock clock) {
4561
if (dataSources.isEmpty()) {
4662
throw new IllegalArgumentException("At least one data source is required");
4763
}
@@ -53,15 +69,14 @@ public EventSQL(Collection<DataSource> dataSources,
5369
System.setProperty("org.jooq.no-logo", "true");
5470
System.setProperty("org.jooq.no-tips", "true");
5571

56-
var jooqDialect = SQLDialect.valueOf(sqlDialect.name());
57-
5872
dataSources.forEach(dataSource -> {
59-
var dslContext = DSL.using(dataSource, jooqDialect);
73+
var jooqDialect = SQLDialect.valueOf(dataSource.dialect().name());
74+
var dslContext = DSL.using(dataSource.dataSource(), jooqDialect);
6075
var transactions = new SQLTransactions(dslContext);
6176

6277
var topicRepository = new SQLTopicRepository(transactions);
6378
var consumerRepository = new SQLConsumerRepository(transactions);
64-
var eventRepository = new SQLEventRepository(transactions, jooqDialect);
79+
var eventRepository = new SQLEventRepository(transactions, dataSource.dialect(), NEXT_EVENTS_READ_VISIBILITY_THRESHOLD);
6580

6681
var registry = new DefaultEventSQLRegistry(topicRepository, eventRepository, consumerRepository, transactions);
6782

@@ -73,7 +88,6 @@ public EventSQL(Collection<DataSource> dataSources,
7388
consumersList.add(ops);
7489
});
7590

76-
7791
if (dataSources.size() == 1) {
7892
registry = registryList.getFirst();
7993
publisher = publisherList.getFirst();
@@ -85,6 +99,23 @@ public EventSQL(Collection<DataSource> dataSources,
8599
}
86100
}
87101

102+
private static javax.sql.DataSource dataSource(DataSourceProperties dataSourceProperties) {
103+
var config = new HikariConfig();
104+
config.setJdbcUrl(dataSourceProperties.url());
105+
config.setUsername(dataSourceProperties.username());
106+
config.setPassword(dataSourceProperties.password());
107+
config.setMinimumIdle(dataSourceProperties.minPoolSize());
108+
config.setMaximumPoolSize(dataSourceProperties.maxPoolSize());
109+
110+
switch (dataSourceProperties.dialect()) {
111+
case POSTGRES -> config.setConnectionInitSql("SET statement_timeout=" + PUBLISH_TIMEOUT);
112+
case MYSQL -> config.setConnectionInitSql("SET SESSION max_execution_time=" + PUBLISH_TIMEOUT);
113+
case MARIADB -> config.setConnectionInitSql("SET SESSION max_statement_time=" + (PUBLISH_TIMEOUT / 1000.0));
114+
}
115+
116+
return new HikariDataSource(config);
117+
}
118+
88119
public EventSQLRegistry registry() {
89120
return registry;
90121
}
@@ -96,4 +127,35 @@ public EventSQLPublisher publisher() {
96127
public EventSQLConsumers consumers() {
97128
return consumers;
98129
}
130+
131+
public enum Dialect {
132+
POSTGRES, MYSQL, MARIADB
133+
}
134+
135+
public record DataSourceProperties(Dialect dialect,
136+
String url,
137+
String username,
138+
String password,
139+
int minPoolSize,
140+
int maxPoolSize) {
141+
142+
public DataSourceProperties(Dialect dialect,
143+
String url,
144+
String username,
145+
String password,
146+
int poolSize) {
147+
this(dialect, url, username, password, poolSize, poolSize);
148+
}
149+
150+
public DataSourceProperties(Dialect dialect,
151+
String url,
152+
String username,
153+
String password) {
154+
this(dialect, url, username, password, 10, 20);
155+
}
156+
}
157+
158+
public record DataSource(Dialect dialect, javax.sql.DataSource dataSource) {
159+
160+
}
99161
}

src/main/java/com/binaryigor/eventsql/EventSQLConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
public interface EventSQLConsumers {
99

10-
Duration DEFAULT_POLLING_DELAY = Duration.ofSeconds(1);
10+
Duration DEFAULT_POLLING_DELAY = Duration.ofMillis(500);
1111
int DEFAULT_IN_MEMORY_EVENTS = 10;
1212

1313
void startConsumer(String topic, String name,

0 commit comments

Comments
 (0)