Skip to content

Commit f6421e7

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 CDC: Add regex filters for cache names
1 parent 28ba739 commit f6421e7

12 files changed

Lines changed: 31 additions & 20 deletions

File tree

modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.Arrays;
2324
import java.util.List;
2425
import java.util.concurrent.CountDownLatch;
@@ -194,7 +195,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception {
194195
CdcConfiguration cfg = new CdcConfiguration();
195196

196197
cfg.setConsumer(new UserCdcConsumer() {
197-
@Override public void start(MetricRegistry mreg) {
198+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
198199
appStarted.countDown();
199200
}
200201
});

modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.util;
1919

20+
import java.nio.file.Path;
2021
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
@@ -217,7 +218,7 @@ synchronized List<CdcEvent> events() {
217218
}
218219

219220
/** {@inheritDoc} */
220-
@Override public void start(MetricRegistry mreg) {
221+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
221222
// No-op
222223
}
223224

modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import org.apache.ignite.Ignite;
2223
import org.apache.ignite.IgniteBinary;
@@ -33,7 +34,7 @@
3334
* This consumer will receive data change events during ignite-cdc process invocation.
3435
* The lifecycle of the consumer is the following:
3536
* <ul>
36-
* <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
37+
* <li>Start of the consumer {@link #start(MetricRegistry, Path)}.</li>
3738
* <li>Notification of the consumer by the {@link #onEvents(Iterator)} call.</li>
3839
* <li>Stop of the consumer {@link #stop()}.</li>
3940
* </ul>
@@ -66,8 +67,9 @@ public interface CdcConsumer {
6667
/**
6768
* Starts the consumer.
6869
* @param mreg Metric registry for consumer specific metrics.
70+
* @param cdcDir Path to Change Data Capture Directory.
6971
*/
70-
public void start(MetricRegistry mreg);
72+
public void start(MetricRegistry mreg, Path cdcDir);
7173

7274
/**
7375
* Handles entry changes events.
@@ -131,7 +133,7 @@ public interface CdcConsumer {
131133

132134
/**
133135
* Stops the consumer.
134-
* This method can be invoked only after {@link #start(MetricRegistry)}.
136+
* This method can be invoked only after {@link #start(MetricRegistry, Path)}.
135137
*/
136138
public void stop();
137139

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public void runX() throws Exception {
339339
committedSegmentOffset.value(walState.get1().fileOffset());
340340
}
341341

342-
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
342+
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir);
343343

344344
started = true;
345345

modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.EnumSet;
2122
import java.util.Iterator;
2223
import java.util.NoSuchElementException;
@@ -188,8 +189,8 @@ public void onCacheDestroyEvents(Iterator<Integer> caches) {
188189
* @param cdcConsumerReg CDC consumer metric registry.
189190
* @throws IgniteCheckedException If failed.
190191
*/
191-
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) throws IgniteCheckedException {
192-
consumer.start(cdcConsumerReg);
192+
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException {
193+
consumer.start(cdcConsumerReg, cdcDir);
193194

194195
evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer");
195196
lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process");
@@ -200,7 +201,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg)
200201

201202
/**
202203
* Stops the consumer.
203-
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl)}.
204+
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}.
204205
*/
205206
public void stop() {
206207
consumer.stop();

modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer<T> implements CdcConsumer {
308308
private volatile boolean stopped;
309309

310310
/** {@inheritDoc} */
311-
@Override public void start(MetricRegistry mreg) {
311+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
312312
stopped = false;
313313
}
314314

@@ -462,7 +462,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer {
462462
}
463463

464464
/** {@inheritDoc} */
465-
@Override public void start(MetricRegistry mreg) {
465+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
466466
// No-op.
467467
}
468468

modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.io.File;
21+
import java.nio.file.Path;
2122
import java.util.concurrent.CountDownLatch;
2223
import org.apache.ignite.IgniteCheckedException;
2324
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -92,8 +93,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception {
9293
CdcConfiguration cdcCfg = new CdcConfiguration();
9394

9495
cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
95-
@Override public void start(MetricRegistry mreg) {
96-
super.start(mreg);
96+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
97+
super.start(mreg, cdcDir);
9798

9899
started.countDown();
99100
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Collections;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
@@ -201,10 +202,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) {
201202
}
202203

203204
/** {@inheritDoc} */
204-
@Override public void start(MetricRegistry mreg) {
205+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
205206
Ignite ignite = Ignition.start(destClusterCliCfg);
206207

207-
super.start(mreg);
208+
super.start(mreg, cdcDir);
208209

209210
ignite.log().info("TestIgniteToIgniteConsumer started.");
210211
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collection;
@@ -418,7 +419,7 @@ public void testReadOneByOneForBackup() throws Exception {
418419
// No-op.
419420
}
420421

421-
@Override public void start(MetricRegistry mreg) {
422+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
422423
// No-op.
423424
}
424425
};
@@ -512,7 +513,7 @@ public void testReadFromNextEntry() throws Exception {
512513
// No-op.
513514
}
514515

515-
@Override public void start(MetricRegistry mreg) {
516+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
516517
// No-op.
517518
}
518519
}, cfg));

modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.ducktest.tests.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import java.util.concurrent.atomic.AtomicLong;
2223
import org.apache.ignite.IgniteLogger;
@@ -42,7 +43,7 @@ public class CountingCdcConsumer implements CdcConsumer {
4243
private final AtomicLong objectsConsumed = new AtomicLong();
4344

4445
/** {@inheritDoc} */
45-
@Override public void start(MetricRegistry mreg) {
46+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
4647
log.info("CountingCdcConsumer started");
4748
}
4849

0 commit comments

Comments
 (0)