Skip to content

Commit daf62b7

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 CDC: Add regex filters for cache names
1 parent 5b4f9de commit daf62b7

16 files changed

Lines changed: 913 additions & 38 deletions

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,19 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.StandardOpenOption;
24+
import java.util.Collections;
25+
import java.util.HashSet;
2026
import java.util.Iterator;
27+
import java.util.List;
2128
import java.util.Set;
29+
import java.util.regex.Pattern;
30+
import java.util.regex.PatternSyntaxException;
2231
import java.util.stream.Collectors;
32+
2333
import org.apache.ignite.IgniteCheckedException;
2434
import org.apache.ignite.IgniteException;
2535
import org.apache.ignite.IgniteLogger;
@@ -67,12 +77,30 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
6777
/** */
6878
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";
6979

80+
/** File with saved names of caches added by cache masks. */
81+
private static final String SAVED_CACHES_FILE = "caches";
82+
83+
/** CDC directory path. */
84+
private Path cdcDir;
85+
7086
/** Handle only primary entry flag. */
7187
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
7288

7389
/** Cache names. */
7490
private Set<String> caches;
7591

92+
/** Include regex templates for cache names. */
93+
private Set<String> includeTemplates = new HashSet<>();
94+
95+
/** Compiled include regex patterns for cache names. */
96+
private Set<Pattern> includeFilters;
97+
98+
/** Exclude regex templates for cache names. */
99+
private Set<String> excludeTemplates = new HashSet<>();
100+
101+
/** Compiled exclude regex patterns for cache names. */
102+
private Set<Pattern> excludeFilters;
103+
76104
/** Cache IDs. */
77105
protected Set<Integer> cachesIds;
78106

@@ -99,14 +127,28 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
99127
protected IgniteLogger log;
100128

101129
/** {@inheritDoc} */
102-
@Override public void start(MetricRegistry reg) {
130+
@Override public void start(MetricRegistry reg, Path cdcDir) {
103131
A.notEmpty(caches, "caches");
104132

133+
this.cdcDir = cdcDir;
134+
105135
cachesIds = caches.stream()
106136
.mapToInt(CU::cacheId)
107137
.boxed()
108138
.collect(Collectors.toSet());
109139

140+
prepareRegexFilters();
141+
142+
try {
143+
loadCaches().stream()
144+
.filter(this::matchesFilters)
145+
.map(CU::cacheId)
146+
.forEach(cachesIds::add);
147+
}
148+
catch (IOException e) {
149+
throw new IgniteException(e);
150+
}
151+
110152
MetricRegistryImpl mreg = (MetricRegistryImpl)reg;
111153

112154
this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
@@ -144,10 +186,101 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
144186
/** {@inheritDoc} */
145187
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
146188
cacheEvents.forEachRemaining(e -> {
147-
// Just skip. Handle of cache events not supported.
189+
matchWithRegexTemplates(e.configuration().getName());
148190
});
149191
}
150192

193+
/**
194+
* Finds match between cache name and user's regex templates.
195+
* If match found, adds this cache's id to id's list and saves cache name to file.
196+
*
197+
* @param cacheName Cache name.
198+
*/
199+
private void matchWithRegexTemplates(String cacheName) {
200+
int cacheId = CU.cacheId(cacheName);
201+
202+
if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
203+
cachesIds.add(cacheId);
204+
205+
try {
206+
saveCache(cacheName);
207+
}
208+
catch (IOException e) {
209+
throw new IgniteException(e);
210+
}
211+
212+
if (log.isInfoEnabled())
213+
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
214+
}
215+
}
216+
217+
/**
218+
* Writes cache name to file
219+
*
220+
* @param cacheName Cache name.
221+
*/
222+
private void saveCache(String cacheName) throws IOException {
223+
if (cdcDir != null) {
224+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
225+
226+
String cn = cacheName + '\n';
227+
228+
Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
229+
}
230+
}
231+
232+
/**
233+
* Loads saved caches from file.
234+
*
235+
* @return List of saved caches names.
236+
*/
237+
private List<String> loadCaches() throws IOException {
238+
if (cdcDir != null) {
239+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
240+
241+
if (Files.notExists(savedCachesPath)) {
242+
Files.createFile(savedCachesPath);
243+
244+
if (log.isInfoEnabled())
245+
log.info("Cache list created: " + savedCachesPath);
246+
}
247+
248+
return Files.readAllLines(savedCachesPath);
249+
}
250+
return Collections.emptyList();
251+
}
252+
253+
/**
254+
* Compiles regex patterns from user templates.
255+
*
256+
* @throws PatternSyntaxException If the template's syntax is invalid
257+
*/
258+
private void prepareRegexFilters() {
259+
includeFilters = includeTemplates.stream()
260+
.map(Pattern::compile)
261+
.collect(Collectors.toSet());
262+
263+
excludeFilters = excludeTemplates.stream()
264+
.map(Pattern::compile)
265+
.collect(Collectors.toSet());
266+
}
267+
268+
/**
269+
* Matches cache name with compiled regex patterns.
270+
*
271+
* @param cacheName Cache name.
272+
* @return True if cache name match include patterns and don't match exclude patterns.
273+
*/
274+
private boolean matchesFilters(String cacheName) {
275+
boolean matchesInclude = includeFilters.stream()
276+
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
277+
278+
boolean notMatchesExclude = excludeFilters.stream()
279+
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
280+
281+
return matchesInclude && notMatchesExclude;
282+
}
283+
151284
/** {@inheritDoc} */
152285
@Override public void onCacheDestroy(Iterator<Integer> caches) {
153286
caches.forEachRemaining(e -> {
@@ -238,6 +371,30 @@ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
238371
return this;
239372
}
240373

374+
/**
375+
* Sets include regex patterns that participate in CDC.
376+
*
377+
* @param includeTemplates Include regex templates
378+
* @return {@code this} for chaining.
379+
*/
380+
public AbstractIgniteCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
381+
this.includeTemplates = includeTemplates;
382+
383+
return this;
384+
}
385+
386+
/**
387+
* Sets exclude regex patterns that participate in CDC.
388+
*
389+
* @param excludeTemplates Exclude regex templates
390+
* @return {@code this} for chaining.
391+
*/
392+
public AbstractIgniteCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
393+
this.excludeTemplates = excludeTemplates;
394+
395+
return this;
396+
}
397+
241398
/**
242399
* Sets maximum batch size that will be applied to destination cluster.
243400
*

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.ignite.cdc;
1919

20+
import java.nio.file.Path;
21+
2022
import org.apache.ignite.IgniteException;
2123
import org.apache.ignite.Ignition;
2224
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
@@ -61,8 +63,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
6163
private volatile boolean alive = true;
6264

6365
/** {@inheritDoc} */
64-
@Override public void start(MetricRegistry mreg) {
65-
super.start(mreg);
66+
@Override public void start(MetricRegistry mreg, Path cdcDir) {
67+
super.start(mreg, cdcDir);
6668

6769
if (log.isInfoEnabled())
6870
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.ignite.cdc.conflictresolve;
1919

2020
import java.io.Serializable;
21+
import java.util.HashSet;
2122
import java.util.Set;
2223
import java.util.UUID;
24+
import java.util.regex.Pattern;
25+
2326
import org.apache.ignite.IgniteLogger;
2427
import org.apache.ignite.cluster.ClusterNode;
2528
import org.apache.ignite.internal.IgniteEx;
@@ -65,6 +68,12 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
6568
/** Custom conflict resolver. */
6669
private CacheVersionConflictResolver resolver;
6770

71+
/** Include regex templates for cache names. */
72+
private Set<String> includeTemplates = new HashSet<>();
73+
74+
/** Exclude regex templates for cache names. */
75+
private Set<String> excludeTemplates = new HashSet<>();
76+
6877
/** Log. */
6978
private IgniteLogger log;
7079

@@ -98,7 +107,7 @@ public CacheVersionConflictResolverPluginProvider() {
98107
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
99108
String cacheName = ctx.igniteCacheConfiguration().getName();
100109

101-
if (caches.contains(cacheName)) {
110+
if (caches.contains(cacheName) || matchesFilters(cacheName)) {
102111
log.info("ConflictResolver provider set for cache [cacheName=" + cacheName + ']');
103112

104113
return provider;
@@ -144,6 +153,16 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
144153
this.resolver = resolver;
145154
}
146155

156+
/** @param includeTemplates Include regex templates */
157+
public void setIncludeTemplates(Set<String> includeTemplates) {
158+
this.includeTemplates = includeTemplates;
159+
}
160+
161+
/** @param excludeTemplates Exclude regex templates */
162+
public void setExcludeTemplates(Set<String> excludeTemplates) {
163+
this.excludeTemplates = excludeTemplates;
164+
}
165+
147166
/** {@inheritDoc} */
148167
@Override public void start(PluginContext ctx) {
149168
((IgniteEx)ctx.grid()).context().cache().context().versions().dataCenterId(clusterId);
@@ -178,4 +197,21 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
178197
@Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
179198
return null;
180199
}
200+
201+
/**
202+
* Match cache name with regex patterns.
203+
*
204+
* @param cacheName Cache name.
205+
*/
206+
private boolean matchesFilters(String cacheName) {
207+
boolean matchesInclude = includeTemplates.stream()
208+
.map(Pattern::compile)
209+
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
210+
211+
boolean notMatchesExclude = excludeTemplates.stream()
212+
.map(Pattern::compile)
213+
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
214+
215+
return matchesInclude && notMatchesExclude;
216+
}
181217
}

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ protected void runAppliers() {
181181
caches,
182182
metaUpdr,
183183
stopped,
184-
metrics
184+
metrics,
185+
this
185186
);
186187

187188
addAndStart("applier-thread-" + cntr++, applier);
@@ -252,6 +253,13 @@ private <T extends AutoCloseable & Runnable> void addAndStart(String threadName,
252253
/** Checks that configured caches exist in a destination cluster. */
253254
protected abstract void checkCaches(Collection<String> caches);
254255

256+
/**
257+
* Get cache names from client.
258+
*
259+
* @return Cache names.
260+
* */
261+
protected abstract Collection<String> getCaches();
262+
255263
/** */
256264
private void ackAsciiLogo(IgniteLogger log) {
257265
String ver = "ver. " + ACK_VER_STR;

0 commit comments

Comments
 (0)