Skip to content

Commit ba5de70

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Refactor CdcRegexManager
1 parent 717a11c commit ba5de70

3 files changed

Lines changed: 13 additions & 193 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
123123
@Override public void start(MetricRegistry reg, Path cdcDir, List<String> cacheNames) {
124124
A.notEmpty(caches, "caches");
125125

126-
regexManager = new CdcRegexManager(cdcDir, log);
126+
regexManager = new CdcRegexManager();
127127

128128
cachesIds = caches.stream()
129129
.mapToInt(CU::cacheId)
@@ -132,9 +132,8 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
132132

133133
regexManager.compileRegexp(includeTemplate, excludeTemplate);
134134

135-
regexManager.match(cacheNames);
136-
137-
regexManager.getSavedCaches().stream()
135+
cacheNames.stream()
136+
.filter(regexManager::matchesFilters)
138137
.map(CU::cacheId)
139138
.forEach(cachesIds::add);
140139

@@ -188,13 +187,13 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
188187
private void matchWithRegex(String cacheName) {
189188
int cacheId = CU.cacheId(cacheName);
190189

191-
if (!cachesIds.contains(cacheId) && regexManager.match(cacheName))
190+
if (!cachesIds.contains(cacheId) && regexManager.matchesFilters(cacheName))
192191
cachesIds.add(cacheId);
193192
}
194193

195194
/** {@inheritDoc} */
196195
@Override public void onCacheDestroy(Iterator<Integer> caches) {
197-
caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent);
196+
caches.forEachRemaining(cachesIds::remove);
198197
}
199198

200199
/** {@inheritDoc} */

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java

Lines changed: 1 addition & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -17,132 +17,29 @@
1717

1818
package org.apache.ignite.cdc;
1919

20-
import java.io.IOException;
21-
import java.nio.file.Files;
22-
import java.nio.file.Path;
23-
import java.util.Collections;
24-
import java.util.List;
25-
import java.util.Optional;
2620
import java.util.regex.Pattern;
2721
import java.util.regex.PatternSyntaxException;
28-
import java.util.stream.Collectors;
2922

3023
import org.apache.ignite.IgniteException;
31-
import org.apache.ignite.IgniteLogger;
32-
import org.apache.ignite.internal.util.typedef.internal.CU;
33-
34-
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
35-
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
3624

3725
/**
3826
* Contains logic to process user's regexp patterns for CDC.
3927
*/
4028
public class CdcRegexManager {
4129

42-
/** File with saved names of caches added by cache masks. */
43-
private static final String SAVED_CACHES_FILE = "caches";
44-
45-
/** Temporary file with saved names of caches added by cache masks. */
46-
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
47-
48-
/** CDC directory path. */
49-
private final Path cdcDir;
50-
5130
/** Include regex pattern for cache names. */
5231
private Pattern includeFilter;
5332

5433
/** Exclude regex pattern for cache names. */
5534
private Pattern excludeFilter;
5635

57-
/** Logger. */
58-
private IgniteLogger log;
59-
60-
/**
61-
*
62-
* @param cdcDir Path to Change Data Capture Directory.
63-
* @param log Logger.
64-
*/
65-
public CdcRegexManager(Path cdcDir, IgniteLogger log) {
66-
this.cdcDir = cdcDir;
67-
this.log = log;
68-
}
69-
70-
/**
71-
* Finds and processes match between cache name and user's regexp patterns.
72-
*
73-
* @param cacheName Cache name.
74-
* @return True if cache name matches user's regexp patterns.
75-
*/
76-
public boolean match(String cacheName) {
77-
return matchAndSave(Collections.singletonList(cacheName));
78-
}
79-
80-
/**
81-
* Finds and processes match between a set of cache names and user's regexp patterns.
82-
*
83-
* @param cacheNames Cache names.
84-
* @return True if cache name matches user's regexp patterns.
85-
*/
86-
public boolean match(List<String> cacheNames) {
87-
return matchAndSave(cacheNames);
88-
}
89-
90-
/**
91-
* Get actual list of names of caches added by regex templates from cache list file.<br/>
92-
* All new caches started during the work of CDC application are saved to file so they can be added to CDC later if
93-
* appropriate regex filter is set.
94-
*
95-
* @return Caches names list.
96-
*/
97-
public List<String> getSavedCaches() {
98-
try {
99-
return loadCaches().stream()
100-
.filter(this::matchesFilters)
101-
.collect(Collectors.toList());
102-
}
103-
catch (IOException e) {
104-
throw new IgniteException(e);
105-
}
106-
}
107-
108-
/**
109-
* Finds match between caches names and user's regex templates and saves cache name to a file.
110-
*
111-
* @param cacheNames Cache names.
112-
* @return True if cache name matches user's regexp patterns.
113-
*/
114-
private boolean matchAndSave(List<String> cacheNames) {
115-
try {
116-
List<String> caches = loadCaches();
117-
118-
caches.addAll(cacheNames);
119-
120-
save(caches);
121-
}
122-
catch (IOException e) {
123-
throw new IgniteException(e);
124-
}
125-
126-
List<String> matchingCaches = cacheNames.stream()
127-
.filter(this::matchesFilters)
128-
.collect(Collectors.toList());
129-
130-
if (matchingCaches.isEmpty())
131-
return false;
132-
133-
if (log.isInfoEnabled())
134-
log.info("Cache(s) has been added to replication [cacheNames=" + matchingCaches + "]");
135-
136-
return true;
137-
}
138-
13936
/**
14037
* Matches cache name with compiled regex patterns.
14138
*
14239
* @param cacheName Cache name.
14340
* @return True if cache name matches include pattern and doesn't match exclude pattern.
14441
*/
145-
private boolean matchesFilters(String cacheName) {
42+
public boolean matchesFilters(String cacheName) {
14643
return includeFilter.matcher(cacheName).matches() && !excludeFilter.matcher(cacheName).matches();
14744
}
14845

@@ -163,79 +60,4 @@ public void compileRegexp(String includeTemplate, String excludeTemplate) {
16360
throw new IgniteException("Invalid cache regexp template.", e);
16461
}
16562
}
166-
167-
/**
168-
* Loads saved CDC caches from file. If file not found, creates a new one containing empty list.
169-
*
170-
* @return List of saved caches names.
171-
*/
172-
private List<String> loadCaches() throws IOException {
173-
if (cdcDir == null) {
174-
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
175-
}
176-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
177-
178-
if (Files.notExists(savedCachesPath)) {
179-
Files.createFile(savedCachesPath);
180-
181-
if (log.isInfoEnabled())
182-
log.info("Cache list created: " + savedCachesPath);
183-
}
184-
185-
return Files.readAllLines(savedCachesPath);
186-
}
187-
188-
/**
189-
* Writes caches list to file.
190-
*
191-
* @param caches Caches list.
192-
*/
193-
private void save(List<String> caches) throws IOException {
194-
if (cdcDir == null) {
195-
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
196-
}
197-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
198-
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
199-
200-
StringBuilder cacheList = new StringBuilder();
201-
202-
for (String cache : caches) {
203-
cacheList.append(cache);
204-
205-
cacheList.append('\n');
206-
}
207-
208-
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
209-
210-
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
211-
}
212-
213-
/**
214-
* Removes cache added by regexp from cache list if such cache is present in file to prevent disk space overflow.
215-
*
216-
* @param cacheId Cache id.
217-
*/
218-
public void deleteRegexpCacheIfPresent(Integer cacheId) {
219-
try {
220-
List<String> caches = loadCaches();
221-
222-
Optional<String> cacheName = caches.stream()
223-
.filter(name -> CU.cacheId(name) == cacheId)
224-
.findAny();
225-
226-
if (cacheName.isPresent()) {
227-
String name = cacheName.get();
228-
229-
caches.remove(name);
230-
231-
save(caches);
232-
233-
if (log.isInfoEnabled())
234-
log.info("Cache has been removed from replication [cacheName=" + name + ']');
235-
}
236-
}
237-
catch (IOException e) {
238-
throw new IgniteException(e);
239-
}
240-
}
24163
}

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
267267

268268
/** {@inheritDoc} */
269269
@Override public void onCacheDestroy(Iterator<Integer> caches) {
270-
caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent);
270+
caches.forEachRemaining(cachesIds::remove);
271271
}
272272

273273
/** Send marker(meta need to be updated) record to each partition of events topic. */
@@ -347,19 +347,18 @@ private <T> void sendOneBatch(
347347
kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
348348
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
349349

350-
regexManager = new CdcRegexManager(cdcDir, log);
350+
regexManager = new CdcRegexManager();
351351

352352
cachesIds = caches.stream()
353353
.map(CU::cacheId)
354354
.collect(Collectors.toSet());
355355

356356
regexManager.compileRegexp(includeTemplate, excludeTemplate);
357357

358-
regexManager.match(cacheNames);
359-
360-
regexManager.getSavedCaches().stream()
361-
.map(CU::cacheId)
362-
.forEach(cachesIds::add);
358+
cacheNames.stream()
359+
.filter(regexManager::matchesFilters)
360+
.map(CU::cacheId)
361+
.forEach(cachesIds::add);
363362

364363
try {
365364
producer = new KafkaProducer<>(kafkaProps);
@@ -415,7 +414,7 @@ public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
415414
private void matchWithRegex(String cacheName) {
416415
int cacheId = CU.cacheId(cacheName);
417416

418-
if (!cachesIds.contains(cacheId) && regexManager.match(cacheName))
417+
if (!cachesIds.contains(cacheId) && regexManager.matchesFilters(cacheName))
419418
cachesIds.add(cacheId);
420419
}
421420

0 commit comments

Comments
 (0)