Skip to content

Commit 39c640a

Browse files
committed
Debezium service
1 parent 342f5cd commit 39c640a

6 files changed

Lines changed: 187 additions & 2 deletions

File tree

asql-core/src/main/java/me/zort/sqllib/internal/fieldResolver/ConstructorParameterResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ public Object obtainValue(
2424
String convertedName,
2525
Type type
2626
) {
27-
if (!(element instanceof Parameter) || !(((Parameter) element).getDeclaringExecutable() instanceof Constructor))
27+
if (!(element instanceof Parameter) || !(((Parameter) element).getDeclaringExecutable() instanceof Constructor)) {
2828
return null;
29+
}
2930

3031
Parameter p = (Parameter) element;
3132
Matcher matcher = argumentPattern.matcher(p.getName());

asql-debezium/.gitignore

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
.gradle
2+
build/
3+
!gradle/wrapper/gradle-wrapper.jar
4+
!**/src/main/**/build/
5+
!**/src/test/**/build/
6+
7+
### IntelliJ IDEA ###
8+
.idea/modules.xml
9+
.idea/jarRepositories.xml
10+
.idea/compiler.xml
11+
.idea/libraries/
12+
*.iws
13+
*.iml
14+
*.ipr
15+
out/
16+
!**/src/main/**/out/
17+
!**/src/test/**/out/
18+
19+
### Eclipse ###
20+
.apt_generated
21+
.classpath
22+
.factorypath
23+
.project
24+
.settings
25+
.springBeans
26+
.sts4-cache
27+
bin/
28+
!**/src/main/**/bin/
29+
!**/src/test/**/bin/
30+
31+
### NetBeans ###
32+
/nbproject/private/
33+
/nbbuild/
34+
/dist/
35+
/nbdist/
36+
/.nb-gradle/
37+
38+
### VS Code ###
39+
.vscode/
40+
41+
### Mac OS ###
42+
.DS_Store

asql-debezium/build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
plugins {
2+
id 'java'
3+
}
4+
5+
group = 'me.zort'
6+
version = '1.0-SNAPSHOT'
7+
8+
repositories {
9+
mavenCentral()
10+
}
11+
12+
dependencies {
13+
compileOnly 'org.projectlombok:lombok:1.18.24'
14+
implementation group: 'org.jetbrains', name: 'annotations', version: '20.1.0'
15+
implementation 'io.debezium:debezium-api:2.3.0.Final'
16+
implementation 'io.debezium:debezium-embedded:2.3.0.Final'
17+
implementation 'io.debezium:debezium-connector-mysql:2.3.0.Final'
18+
implementation project(':api')
19+
implementation project(':core')
20+
annotationProcessor 'org.projectlombok:lombok:1.18.24'
21+
}

asql-debezium/settings.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
rootProject.name = 'asql-debezium'
2+
include ":api"
3+
include ":core"
4+
project(":api").projectDir = file("../asql-api")
5+
project(":core").projectDir = file("../asql-core")
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package me.zort.sqllib.debezium;
2+
3+
import io.debezium.config.Configuration;
4+
import io.debezium.engine.ChangeEvent;
5+
import io.debezium.engine.DebeziumEngine;
6+
import io.debezium.engine.format.Json;
7+
import me.zort.sqllib.SQLDatabaseConnection;
8+
import me.zort.sqllib.SQLDatabaseConnectionImpl;
9+
import org.jetbrains.annotations.NotNull;
10+
11+
import java.sql.Connection;
12+
import java.util.List;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
import java.util.function.Function;
16+
17+
public final class ASQLDebeziumService implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
18+
19+
public static @NotNull Builder configure(SQLDatabaseConnection connection) {
20+
if (!(connection instanceof SQLDatabaseConnectionImpl)) {
21+
throw new IllegalArgumentException("Connection does not contain options!");
22+
}
23+
Connection rawConnection = connection.getConnection();
24+
Configuration.Builder configBuilder = Configuration.create()
25+
.with("database.hostname", "TODO");
26+
// TODO: Build configuration builder from raw connection details
27+
return new Builder(configBuilder);
28+
}
29+
30+
private final DebeziumEngine<ChangeEvent<String, String>> engine;
31+
private final ExecutorService executor;
32+
private boolean running = false;
33+
34+
private ASQLDebeziumService(DebeziumEngine.Builder<ChangeEvent<String, String>> builder, ExecutorService executor) {
35+
this.engine = builder.notifying(this).build();
36+
this.executor = executor;
37+
}
38+
39+
@Override
40+
public void handleBatch(
41+
List<ChangeEvent<String, String>> records,
42+
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer
43+
) throws InterruptedException {
44+
for (ChangeEvent<String, String> record : records) {
45+
// TODO
46+
committer.markProcessed(record);
47+
}
48+
committer.markBatchFinished();
49+
}
50+
51+
public void start() {
52+
if (executor.isShutdown() || executor.isTerminated()) {
53+
throw new IllegalStateException("Executor is not running!");
54+
} else if (running) {
55+
throw new IllegalStateException("Debezium service is already running!");
56+
}
57+
executor.submit(engine);
58+
running = true;
59+
}
60+
61+
public static class Builder {
62+
63+
private static int serviceCount = 0;
64+
private Configuration.Builder config;
65+
private ExecutorService executor;
66+
67+
private Builder(Configuration.Builder initialConfig) {
68+
this.config = initialConfig;
69+
this.executor = null;
70+
edit(builder -> builder
71+
.with("name", "Asql-Debezium-" + (++serviceCount)));
72+
}
73+
74+
public @NotNull Builder edit(
75+
Function<Configuration.Builder, Configuration.Builder> editFunc
76+
) {
77+
this.config = editFunc.apply(this.config);
78+
return this;
79+
}
80+
81+
public @NotNull Builder connector(ConnectorType type) {
82+
return edit(builder -> builder.with("connector.class", type.getClassName()));
83+
}
84+
85+
public @NotNull Builder executor(ExecutorService executor) {
86+
this.executor = executor;
87+
return this;
88+
}
89+
90+
public @NotNull ASQLDebeziumService build() {
91+
DebeziumEngine.Builder<ChangeEvent<String, String>> builder = DebeziumEngine.create(Json.class)
92+
.using(config.build().asProperties());
93+
return new ASQLDebeziumService(
94+
builder,
95+
executor != null ? executor : Executors.newCachedThreadPool()
96+
);
97+
}
98+
}
99+
100+
public enum ConnectorType {
101+
MYSQL("io.debezium.connector.mysql.MySqlConnector");
102+
103+
private final String className;
104+
105+
ConnectorType(String className) {
106+
this.className = className;
107+
}
108+
109+
public String getClassName() {
110+
return this.className;
111+
}
112+
}
113+
114+
}

settings.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ include 'examples'
77
project(":api").projectDir = file("asql-api")
88
project(":core").projectDir = file("asql-core")
99
project(":shared").projectDir = file("asql-shared")
10-
project(":examples").projectDir = file("asql-examples")
10+
project(":examples").projectDir = file("asql-examples")
11+
include 'asql-debezium'
12+

0 commit comments

Comments
 (0)