1111import me .zort .sqllib .debezium .builder .EntityFilterBuilder ;
1212import org .jetbrains .annotations .NotNull ;
1313
14+ import java .io .IOException ;
1415import java .lang .reflect .AnnotatedElement ;
1516import java .net .URI ;
16- import java .sql .Connection ;
17+ import java .net .URISyntaxException ;
18+ import java .sql .DatabaseMetaData ;
1719import java .util .HashMap ;
1820import java .util .List ;
1921import java .util .Map ;
2022import java .util .concurrent .CompletableFuture ;
23+ import java .util .concurrent .ConcurrentHashMap ;
2124import java .util .concurrent .ExecutorService ;
22- import java .util .concurrent .Executors ;
2325import java .util .function .Consumer ;
2426import java .util .function .Function ;
2527
4042 * @author ZorTik
4143 */
4244@ Beta
43- public final class ASQLDebeziumService implements
44- DebeziumEngine .ChangeConsumer <ChangeEvent <String , String >>, Runnable {
45+ public final class ASQLDebeziumWatcher
46+ implements DebeziumEngine .ChangeConsumer <ChangeEvent <String , String >> {
4547
4648 @ SneakyThrows
47- public static @ NotNull Builder configure (@ NotNull SQLDatabaseConnection connection ) {
49+ public static @ NotNull Builder configure (
50+ @ NotNull SQLDatabaseConnection connection , String password
51+ ) {
4852 if (!(connection instanceof SQLDatabaseConnectionImpl )) {
4953 throw new IllegalArgumentException ("Connection does not contain options!" );
5054 } else if (!connection .isConnected ()) {
5155 throw new IllegalArgumentException ("Connection is not connected!" );
5256 }
53- Connection rawConnection = connection .getConnection ();
54- URI uri = new URI (rawConnection .getMetaData ().getURL ());
57+ DatabaseMetaData rawConnectionMeta = connection .getConnection ().getMetaData ();
58+ URI uri = new URI (rawConnectionMeta .getURL ());
59+ return configure (
60+ uri .getHost (), uri .getPort (), rawConnectionMeta .getUserName (), password
61+ );
62+ }
63+
64+ public static @ NotNull Builder configure (
65+ @ NotNull String hostname ,
66+ int port ,
67+ @ NotNull String username ,
68+ @ NotNull String password
69+ ) throws URISyntaxException {
5570 Configuration .Builder configBuilder = Configuration .create ()
56- .with ("database.hostname" , uri .getHost ())
57- .with ("database.port" , uri .getPort ());
58- // TODO: Build configuration builder from raw connection details
71+ .with ("database.hostname" , hostname )
72+ .with ("database.port" , String .valueOf (port ))
73+ .with ("database.user" , username )
74+ .with ("database.password" , password );
5975 return new Builder (configBuilder );
6076 }
6177
6278 private final DebeziumEngine <ChangeEvent <String , String >> engine ;
6379 private final Map <RecordFilter , Consumer <ChangeEvent <String , String >>> handlers ;
80+ private boolean running = false ;
6481
65- private ASQLDebeziumService (DebeziumEngine .Builder <ChangeEvent <String , String >> builder ) {
82+ private ASQLDebeziumWatcher (DebeziumEngine .Builder <ChangeEvent <String , String >> builder ) {
6683 this .engine = builder .notifying (this ).build ();
67- this .handlers = null ;
84+ this .handlers = new ConcurrentHashMap <>() ;
6885 }
6986
7087 @ Override
@@ -85,9 +102,26 @@ public void handleBatch(
85102 committer .markBatchFinished ();
86103 }
87104
88- @ Override
89- public void run () {
90- engine .run ();
105+ public void start (ExecutorService executor ) {
106+ if (running ) {
107+ throw new IllegalStateException ("Service is already running!" );
108+ }
109+ executor .submit (engine );
110+ running = true ;
111+ }
112+
113+ public void stop () {
114+ try {
115+ engine .close ();
116+ } catch (IOException e ) {
117+ throw new RuntimeException (e );
118+ } finally {
119+ running = false ;
120+ }
121+ }
122+
123+ public @ NotNull CompletableFuture <ChangeEvent <String , String >> awaitChange () {
124+ return awaitChange (RecordFilter .any ());
91125 }
92126
93127 /**
@@ -116,7 +150,13 @@ public static class Builder {
116150 private Builder (Configuration .Builder initialConfig ) {
117151 this .config = initialConfig ;
118152 edit (builder -> builder
119- .with ("name" , "Asql-Debezium-" + (++serviceCount )));
153+ .with ("name" , "Asql-Debezium-" + (++serviceCount ))
154+ .with ("offset.storage" , "org.apache.kafka.connect.storage.FileOffsetBackingStore" )
155+ .with ("offset.storage.file.filename" , System .getProperty ("user.dir" ) + "/offsets.dat" )
156+ .with ("server.id" , serviceCount )
157+ .with ("database.history" , "io.debezium.relational.history.FileDatabaseHistory" )
158+ .with ("io.debezium.relational.history.FileDatabaseHistory" , System .getProperty ("user.dir" ) + "/dbhistory.dat" )
159+ .with ("offset.flush.interval.ms" , 1000 ));
120160 }
121161
122162 public @ NotNull Builder edit (
@@ -130,10 +170,19 @@ private Builder(Configuration.Builder initialConfig) {
130170 return edit (builder -> builder .with ("connector.class" , type .getClassName ()));
131171 }
132172
133- public @ NotNull ASQLDebeziumService build () {
173+ public @ NotNull ASQLDebeziumWatcher build () {
174+ Configuration configuration = config .build ();
175+ assertProperty (configuration , "connector.class" );
176+ assertProperty (configuration , "database.hostname" );
134177 DebeziumEngine .Builder <ChangeEvent <String , String >> builder = DebeziumEngine .create (Json .class )
135- .using (config .build ().asProperties ());
136- return new ASQLDebeziumService (builder );
178+ .using (configuration .asProperties ());
179+ return new ASQLDebeziumWatcher (builder );
180+ }
181+
182+ private static void assertProperty (Configuration config , String name ) {
183+ if (!config .hasKey (name )) {
184+ throw new IllegalArgumentException ("Configuration requires property " + name );
185+ }
137186 }
138187 }
139188
0 commit comments