Skip to content

Commit af7e663

Browse files
committed
DebeziumService patch
1 parent ac41173 commit af7e663

5 files changed

Lines changed: 235 additions & 31 deletions

File tree

asql-core/src/main/java/me/zort/sqllib/mapping/annotation/Table.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.lang.annotation.Retention;
1111
import java.lang.annotation.RetentionPolicy;
1212
import java.lang.annotation.Target;
13+
import java.lang.reflect.AnnotatedElement;
1314
import java.lang.reflect.Method;
1415

1516
@Retention(RetentionPolicy.RUNTIME)
@@ -19,11 +20,15 @@
1920

2021
class Util {
2122
@Nullable
22-
public static String getFromContext(Method method, @Nullable ParameterPair[] parameters) {
23+
public static String getFromContext(AnnotatedElement element, @Nullable ParameterPair[] parameters) {
2324
PlaceholderMapper mapper = new PlaceholderMapper(parameters != null ? parameters : new ParameterPair[0]);
24-
if (method.isAnnotationPresent(Table.class)) {
25-
return mapper.assignValues(method.getAnnotation(Table.class).value());
26-
} else if (method.getDeclaringClass().isAnnotationPresent(Table.class)) {
25+
if (element.isAnnotationPresent(Table.class)) {
26+
return mapper.assignValues(element.getAnnotation(Table.class).value());
27+
} else if (!(element instanceof Method)) {
28+
throw new SQLMappingException("Element " + element.toString() + " is not suitable for @Table check!", null, null);
29+
}
30+
Method method = (Method) element;
31+
if (method.getDeclaringClass().isAnnotationPresent(Table.class)) {
2732
return mapper.assignValues(method.getDeclaringClass().getAnnotation(Table.class).value());
2833
} else {
2934
throw new SQLMappingException("Method " + method.getName() + " in class " + method.getDeclaringClass().getSimpleName() + " requires @Table annotation", method, null);

asql-debezium/src/main/java/me/zort/sqllib/debezium/ASQLDebeziumService.java

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,66 @@
55
import io.debezium.engine.ChangeEvent;
66
import io.debezium.engine.DebeziumEngine;
77
import io.debezium.engine.format.Json;
8+
import lombok.SneakyThrows;
89
import me.zort.sqllib.SQLDatabaseConnection;
910
import me.zort.sqllib.SQLDatabaseConnectionImpl;
11+
import me.zort.sqllib.debezium.builder.EntityFilterBuilder;
1012
import org.jetbrains.annotations.NotNull;
1113

14+
import java.lang.reflect.AnnotatedElement;
15+
import java.net.URI;
1216
import java.sql.Connection;
17+
import java.util.HashMap;
1318
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.CompletableFuture;
1421
import java.util.concurrent.ExecutorService;
1522
import java.util.concurrent.Executors;
23+
import java.util.function.Consumer;
1624
import java.util.function.Function;
1725

26+
/**
27+
* Service that uses debezium engine to listen for changes of row changes in
28+
* a database.
29+
*
30+
* <p>Example usage:
31+
* <pre>{@code
32+
* ExecutorService executor = Executors.newSingleThreadExecutor();
33+
* ASQLDebeziumService service = ASQLDebeziumService.configure(connection)
34+
* .connector(ASQLDebeziumService.ConnectorType.MYSQL)
35+
* .executor(executor)
36+
* .build();
37+
*
38+
* </pre>
39+
*
40+
* @author ZorTik
41+
*/
1842
@Beta
19-
public final class ASQLDebeziumService implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
43+
public final class ASQLDebeziumService implements
44+
DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>>, Runnable {
2045

21-
public static @NotNull Builder configure(SQLDatabaseConnection connection) {
46+
@SneakyThrows
47+
public static @NotNull Builder configure(@NotNull SQLDatabaseConnection connection) {
2248
if (!(connection instanceof SQLDatabaseConnectionImpl)) {
2349
throw new IllegalArgumentException("Connection does not contain options!");
50+
} else if (!connection.isConnected()) {
51+
throw new IllegalArgumentException("Connection is not connected!");
2452
}
2553
Connection rawConnection = connection.getConnection();
54+
URI uri = new URI(rawConnection.getMetaData().getURL());
2655
Configuration.Builder configBuilder = Configuration.create()
27-
.with("database.hostname", "TODO");
56+
.with("database.hostname", uri.getHost())
57+
.with("database.port", uri.getPort());
2858
// TODO: Build configuration builder from raw connection details
2959
return new Builder(configBuilder);
3060
}
3161

3262
private final DebeziumEngine<ChangeEvent<String, String>> engine;
33-
private final ExecutorService executor;
34-
private boolean running = false;
63+
private final Map<RecordFilter, Consumer<ChangeEvent<String, String>>> handlers;
3564

36-
private ASQLDebeziumService(DebeziumEngine.Builder<ChangeEvent<String, String>> builder, ExecutorService executor) {
65+
private ASQLDebeziumService(DebeziumEngine.Builder<ChangeEvent<String, String>> builder) {
3766
this.engine = builder.notifying(this).build();
38-
this.executor = executor;
67+
this.handlers = null;
3968
}
4069

4170
@Override
@@ -44,31 +73,48 @@ public void handleBatch(
4473
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer
4574
) throws InterruptedException {
4675
for (ChangeEvent<String, String> record : records) {
47-
// TODO
76+
new HashMap<>(handlers).keySet().forEach(filter -> {
77+
if (filter.expired()) {
78+
handlers.remove(filter);
79+
} else if (filter.test(record)) {
80+
handlers.remove(filter).accept(record);
81+
}
82+
});
4883
committer.markProcessed(record);
4984
}
5085
committer.markBatchFinished();
5186
}
5287

53-
public void start() {
54-
if (executor.isShutdown() || executor.isTerminated()) {
55-
throw new IllegalStateException("Executor is not running!");
56-
} else if (running) {
57-
throw new IllegalStateException("Debezium service is already running!");
58-
}
59-
executor.submit(engine);
60-
running = true;
88+
@Override
89+
public void run() {
90+
engine.run();
91+
}
92+
93+
/**
94+
* This method registers event handler to be executed when provided filter matches
95+
* the event. Note that the handler will be executed only once, then removed.
96+
*
97+
* @param filter Filter to match
98+
* @return Future accepting the event
99+
*/
100+
public @NotNull CompletableFuture<ChangeEvent<String, String>> awaitChange(RecordFilter filter) {
101+
CompletableFuture<ChangeEvent<String, String>> future = new CompletableFuture<>();
102+
filter.markRegistered();
103+
handlers.put(filter, future::complete);
104+
return future;
105+
}
106+
107+
public @NotNull EntityFilterBuilder watch(AnnotatedElement entityElement) {
108+
return new EntityFilterBuilder(this, entityElement);
61109
}
62110

63111
public static class Builder {
64112

65113
private static int serviceCount = 0;
66114
private Configuration.Builder config;
67-
private ExecutorService executor;
68115

69116
private Builder(Configuration.Builder initialConfig) {
70117
this.config = initialConfig;
71-
this.executor = null;
72118
edit(builder -> builder
73119
.with("name", "Asql-Debezium-" + (++serviceCount)));
74120
}
@@ -84,18 +130,10 @@ private Builder(Configuration.Builder initialConfig) {
84130
return edit(builder -> builder.with("connector.class", type.getClassName()));
85131
}
86132

87-
public @NotNull Builder executor(ExecutorService executor) {
88-
this.executor = executor;
89-
return this;
90-
}
91-
92133
public @NotNull ASQLDebeziumService build() {
93134
DebeziumEngine.Builder<ChangeEvent<String, String>> builder = DebeziumEngine.create(Json.class)
94135
.using(config.build().asProperties());
95-
return new ASQLDebeziumService(
96-
builder,
97-
executor != null ? executor : Executors.newCachedThreadPool()
98-
);
136+
return new ASQLDebeziumService(builder);
99137
}
100138
}
101139

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package me.zort.sqllib.debezium;
2+
3+
import io.debezium.engine.ChangeEvent;
4+
import me.zort.sqllib.mapping.annotation.Table;
5+
import me.zort.sqllib.util.ParameterPair;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
import java.lang.reflect.AnnotatedElement;
9+
import java.util.function.Predicate;
10+
11+
/**
12+
* Tests a record if the provided record should be notified
13+
* to the registered consumer in {@link ASQLDebeziumService}.
14+
*
15+
* @author ZorTik
16+
*/
17+
public final class RecordFilter {
18+
19+
private static final long DEFAULT_EXPIRATION = 10000L;
20+
21+
private final Predicate<ChangeEvent<String, String>> testFunction;
22+
private long expireAfter;
23+
private long expireAt = -1;
24+
25+
public RecordFilter(Predicate<ChangeEvent<String, String>> testFunction) {
26+
this.testFunction = testFunction;
27+
setExpireAfter(DEFAULT_EXPIRATION);
28+
}
29+
30+
public boolean test(ChangeEvent<String, String> record) {
31+
return testFunction.test(record);
32+
}
33+
34+
public void setExpireAfter(long expireAfter) {
35+
if (isRegistered()) {
36+
throw new IllegalStateException("Filter expiration cannot be changed after it has been registered!");
37+
}
38+
this.expireAfter = expireAfter;
39+
}
40+
41+
public boolean isRegistered() {
42+
return expireAt != -1;
43+
}
44+
45+
public boolean expired() {
46+
return System.currentTimeMillis() >= expireAt;
47+
}
48+
49+
void markRegistered() {
50+
if (isRegistered()) {
51+
throw new IllegalStateException("Filter has already been registered!");
52+
}
53+
expireAt = System.currentTimeMillis() + expireAfter;
54+
}
55+
56+
public static @NotNull RecordFilter table(AnnotatedElement element, ParameterPair... parameters) {
57+
final String table = Table.Util.getFromContext(element, parameters);
58+
return new RecordFilter(record -> record.destination().equals(table));
59+
}
60+
61+
public static @NotNull RecordFilter column(String column) {
62+
return new RecordFilter(record -> record.key().equals(column));
63+
}
64+
65+
public static @NotNull RecordFilter column(String column, String value) {
66+
return new RecordFilter(record -> record.key().equals(column) && record.value().equals(value));
67+
}
68+
69+
public static @NotNull RecordFilter join(RecordFilter... filters) {
70+
return new RecordFilter(record -> {
71+
for (RecordFilter filter : filters) {
72+
if (!filter.test(record)) {
73+
return false;
74+
}
75+
}
76+
return true;
77+
});
78+
}
79+
80+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package me.zort.sqllib.debezium;
2+
3+
import io.debezium.engine.ChangeEvent;
4+
import org.jetbrains.annotations.NotNull;
5+
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.TimeoutException;
10+
11+
/**
12+
* Provides easy to use API for building and executing filters
13+
* in {@link ASQLDebeziumService}.
14+
*
15+
* @see RecordFilter
16+
* @see ASQLDebeziumService
17+
* @author ZorTik
18+
*/
19+
public class RecordFilterBuilder {
20+
21+
private final ASQLDebeziumService service;
22+
private RecordFilter filter;
23+
24+
public RecordFilterBuilder(ASQLDebeziumService service) {
25+
this.service = service;
26+
this.filter = null;
27+
}
28+
29+
public @NotNull RecordFilterBuilder and(RecordFilter filter) {
30+
this.filter = RecordFilter.join(this.filter, filter);
31+
return this;
32+
}
33+
34+
public @NotNull CompletableFuture<ChangeEvent<String, String>> begin() {
35+
return service.awaitChange(filter);
36+
}
37+
38+
public @NotNull ChangeEvent<String, String> block()
39+
throws InterruptedException, ExecutionException {
40+
return service.awaitChange(filter).get();
41+
}
42+
43+
public @NotNull ChangeEvent<String, String> block(long timeout, TimeUnit millis)
44+
throws InterruptedException, ExecutionException, TimeoutException {
45+
return service.awaitChange(filter).get(timeout, millis);
46+
}
47+
48+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package me.zort.sqllib.debezium.builder;
2+
3+
import me.zort.sqllib.debezium.ASQLDebeziumService;
4+
import me.zort.sqllib.debezium.RecordFilter;
5+
import me.zort.sqllib.debezium.RecordFilterBuilder;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
import java.lang.reflect.AnnotatedElement;
9+
10+
/**
11+
* Provides API for building filters based on provided element that
12+
* is either part of an entity or proxy.
13+
*
14+
* @see RecordFilter
15+
* @see RecordFilterBuilder
16+
* @author ZorTik
17+
*/
18+
public class EntityFilterBuilder extends RecordFilterBuilder {
19+
20+
public EntityFilterBuilder(ASQLDebeziumService service, AnnotatedElement element) {
21+
super(service);
22+
super.and(RecordFilter.table(element));
23+
}
24+
25+
public @NotNull EntityFilterBuilder expectColumnChange(String column) {
26+
return (EntityFilterBuilder) super.and(RecordFilter.column(column));
27+
}
28+
29+
public @NotNull EntityFilterBuilder expectColumnChange(String column, String expectedValue) {
30+
return (EntityFilterBuilder) super.and(RecordFilter.column(column, expectedValue));
31+
}
32+
33+
}

0 commit comments

Comments
 (0)