Skip to content

Commit 2aeec6b

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 WIP
1 parent d81bd13 commit 2aeec6b

5 files changed

Lines changed: 39 additions & 23 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.nio.file.Path;
2121
import java.util.Iterator;
22+
import java.util.List;
2223
import java.util.Set;
2324
import java.util.stream.Collectors;
2425

@@ -119,7 +120,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
119120
}
120121

121122
/** {@inheritDoc} */
122-
@Override public void start(MetricRegistry reg, Path cdcDir) {
123+
@Override public void start(MetricRegistry reg, Path cdcDir, List<String> cacheNames) {
123124
A.notEmpty(caches, "caches");
124125

125126
regexManager = new CdcRegexManager(cdcDir, log);
@@ -131,6 +132,8 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
131132

132133
regexManager.compileRegexp(includeTemplate, excludeTemplate);
133134

135+
regexManager.match(cacheNames);
136+
134137
regexManager.getSavedCaches().stream()
135138
.map(CU::cacheId)
136139
.forEach(cachesIds::add);

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Optional;
2526
import java.util.regex.Pattern;
@@ -73,7 +74,11 @@ public CdcRegexManager(Path cdcDir, IgniteLogger log) {
7374
* @return True if cache name matches user's regexp patterns.
7475
*/
7576
public boolean match(String cacheName) {
76-
return matchAndSave(cacheName);
77+
return matchAndSave(Collections.singletonList(cacheName));
78+
}
79+
80+
public boolean match(List<String> cacheNames) {
81+
return matchAndSave(cacheNames);
7782
}
7883

7984
/**
@@ -98,28 +103,32 @@ public List<String> getSavedCaches() {
98103
* Finds match between cache name and user's regex templates.
99104
* If match is found, saves cache name to file.
100105
*
101-
* @param cacheName Cache name.
106+
* @param cacheNames Cache name.
102107
* @return True if cache name matches user's regexp patterns.
103108
*/
104-
private boolean matchAndSave(String cacheName) {
105-
if (matchesFilters(cacheName)) {
106-
try {
107-
List<String> caches = loadCaches();
109+
private boolean matchAndSave(List<String> cacheNames) {
110+
try {
111+
List<String> caches = loadCaches();
108112

109-
caches.add(cacheName);
113+
caches.addAll(cacheNames);
110114

111-
save(caches);
112-
}
113-
catch (IOException e) {
114-
throw new IgniteException(e);
115-
}
115+
save(caches);
116+
}
117+
catch (IOException e) {
118+
throw new IgniteException(e);
119+
}
116120

117-
if (log.isInfoEnabled())
118-
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
121+
List<String> matchingCaches = cacheNames.stream()
122+
.filter(this::matchesFilters)
123+
.collect(Collectors.toList());
119124

120-
return true;
121-
}
122-
return false;
125+
if (matchingCaches.isEmpty())
126+
return false;
127+
128+
if (log.isInfoEnabled())
129+
log.info("Cache(s) has been added to replication [cacheNames=" + matchingCaches + "]");
130+
131+
return true;
123132
}
124133

125134
/**

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.nio.file.Path;
21+
import java.util.List;
2122

2223
import org.apache.ignite.IgniteException;
2324
import org.apache.ignite.Ignition;
@@ -61,8 +62,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
6162
private volatile boolean alive = true;
6263

6364
/** {@inheritDoc} */
64-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
65-
super.start(mreg, cdcDir);
65+
@Override public void start(MetricRegistry mreg, Path cdcDir, List<String> cacheNames) {
66+
super.start(mreg, cdcDir, cacheNames);
6667

6768
if (log.isInfoEnabled())
6869
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private <T> void sendOneBatch(
336336
}
337337

338338
/** {@inheritDoc} */
339-
@Override public void start(MetricRegistry reg, Path cdcDir) {
339+
@Override public void start(MetricRegistry reg, Path cdcDir, List<String> cacheNames) {
340340
A.notNull(kafkaProps, "Kafka properties");
341341
A.notNull(evtTopic, "Kafka topic");
342342
A.notNull(metadataTopic, "Kafka metadata topic");
@@ -355,6 +355,8 @@ private <T> void sendOneBatch(
355355

356356
regexManager.compileRegexp(includeTemplate, excludeTemplate);
357357

358+
regexManager.match(cacheNames);
359+
358360
regexManager.getSavedCaches().stream()
359361
.map(CU::cacheId)
360362
.forEach(cachesIds::add);

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc.thin;
1919

2020
import java.nio.file.Path;
21+
import java.util.List;
2122

2223
import org.apache.ignite.Ignition;
2324
import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
@@ -68,8 +69,8 @@ public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer {
6869
private long aliveCheckTimeout = DFLT_ALIVE_CHECK_TIMEOUT;
6970

7071
/** {@inheritDoc} */
71-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
72-
super.start(mreg, cdcDir);
72+
@Override public void start(MetricRegistry mreg, Path cdcDir, List<String> cacheNames) {
73+
super.start(mreg, cdcDir, cacheNames);
7374

7475
if (log.isInfoEnabled())
7576
log.info("Ignite To Ignite Client Streamer [cacheIds=" + cachesIds + ']');

0 commit comments

Comments
 (0)