Skip to content

Commit 6133cfc

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 CDC: Add regex filters for cache names
1 parent df0fee1 commit 6133cfc

12 files changed

Lines changed: 31 additions & 20 deletions

File tree

modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.Arrays;
2324
import java.util.List;
2425
import java.util.concurrent.CountDownLatch;
@@ -194,7 +195,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception {
194195
CdcConfiguration cfg = new CdcConfiguration();
195196

196197
cfg.setConsumer(new UserCdcConsumer() {
197-
@Override public void start(MetricRegistry mreg) {
198+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
198199
appStarted.countDown();
199200
}
200201
});

modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.util;
1919

20+
import java.nio.file.Path;
2021
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
@@ -217,7 +218,7 @@ synchronized List<CdcEvent> events() {
217218
}
218219

219220
/** {@inheritDoc} */
220-
@Override public void start(MetricRegistry mreg) {
221+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
221222
// No-op
222223
}
223224

modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import org.apache.ignite.Ignite;
2223
import org.apache.ignite.IgniteBinary;
@@ -35,7 +36,7 @@
3536
* This consumer will receive data change events during {@link CdcMain} application invocation.
3637
* The lifecycle of the consumer is the following:
3738
* <ul>
38-
* <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
39+
* <li>Start of the consumer {@link #start(MetricRegistry, Path)}.</li>
3940
* <li>Notification of the consumer by the {@link #onEvents(Iterator)} call.</li>
4041
* <li>Stop of the consumer {@link #stop()}.</li>
4142
* </ul>
@@ -70,8 +71,9 @@ public interface CdcConsumer {
7071
/**
7172
* Starts the consumer.
7273
* @param mreg Metric registry for consumer specific metrics.
74+
* @param cdcDir Path to Change Data Capture Directory.
7375
*/
74-
public void start(MetricRegistry mreg);
76+
public void start(MetricRegistry mreg, Path cdcDir);
7577

7678
/**
7779
* Handles entry changes events.
@@ -135,7 +137,7 @@ public interface CdcConsumer {
135137

136138
/**
137139
* Stops the consumer.
138-
* This method can be invoked only after {@link #start(MetricRegistry)}.
140+
* This method can be invoked only after {@link #start(MetricRegistry, Path)}.
139141
*/
140142
public void stop();
141143

modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ public void runX() throws Exception {
358358
committedSegmentOffset.value(walState.get1().fileOffset());
359359
}
360360

361-
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")));
361+
consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir);
362362

363363
started = true;
364364

modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.EnumSet;
2122
import java.util.Iterator;
2223
import java.util.NoSuchElementException;
@@ -188,8 +189,8 @@ public void onCacheDestroyEvents(Iterator<Integer> caches) {
188189
* @param cdcConsumerReg CDC consumer metric registry.
189190
* @throws IgniteCheckedException If failed.
190191
*/
191-
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) throws IgniteCheckedException {
192-
consumer.start(cdcConsumerReg);
192+
public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException {
193+
consumer.start(cdcConsumerReg, cdcDir);
193194

194195
evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer");
195196
lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process");
@@ -200,7 +201,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg)
200201

201202
/**
202203
* Stops the consumer.
203-
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl)}.
204+
* This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}.
204205
*/
205206
public void stop() {
206207
consumer.stop();

modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public abstract static class TestCdcConsumer<T> implements CdcConsumer {
291291
private volatile boolean stopped;
292292

293293
/** {@inheritDoc} */
294-
@Override public void start(MetricRegistry mreg) {
294+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
295295
stopped = false;
296296
}
297297

@@ -445,7 +445,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer {
445445
}
446446

447447
/** {@inheritDoc} */
448-
@Override public void start(MetricRegistry mreg) {
448+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
449449
// No-op.
450450
}
451451

modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.io.File;
21+
import java.nio.file.Path;
2122
import java.util.concurrent.CountDownLatch;
2223
import org.apache.ignite.IgniteCheckedException;
2324
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -91,8 +92,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception {
9192
CdcConfiguration cdcCfg = new CdcConfiguration();
9293

9394
cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() {
94-
@Override public void start(MetricRegistry mreg) {
95-
super.start(mreg);
95+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
96+
super.start(mreg, cdcDir);
9697

9798
started.countDown();
9899
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Collections;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
@@ -197,10 +198,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) {
197198
}
198199

199200
/** {@inheritDoc} */
200-
@Override public void start(MetricRegistry mreg) {
201+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
201202
Ignite ignite = Ignition.start(destClusterCliCfg);
202203

203-
super.start(mreg);
204+
super.start(mreg, cdcDir);
204205

205206
ignite.log().info("TestIgniteToIgniteConsumer started.");
206207
}

modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.Serializable;
22+
import java.nio.file.Path;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collection;
@@ -394,7 +395,7 @@ public void testReadOneByOneForBackup() throws Exception {
394395
// No-op.
395396
}
396397

397-
@Override public void start(MetricRegistry mreg) {
398+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
398399
// No-op.
399400
}
400401
};
@@ -488,7 +489,7 @@ public void testReadFromNextEntry() throws Exception {
488489
// No-op.
489490
}
490491

491-
@Override public void start(MetricRegistry mreg) {
492+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
492493
// No-op.
493494
}
494495
}, cfg));

modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.ducktest.tests.cdc;
1919

20+
import java.nio.file.Path;
2021
import java.util.Iterator;
2122
import java.util.concurrent.atomic.AtomicLong;
2223
import org.apache.ignite.IgniteLogger;
@@ -42,7 +43,7 @@ public class CountingCdcConsumer implements CdcConsumer {
4243
private final AtomicLong objectsConsumed = new AtomicLong();
4344

4445
/** {@inheritDoc} */
45-
@Override public void start(MetricRegistry mreg) {
46+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
4647
log.info("CountingCdcConsumer started");
4748
}
4849

0 commit comments

Comments
 (0)