Skip to content

Commit 9f2ccea

Browse files
Andrei Nadyktovlordgarrish
authored andcommitted
IGNITE-22530 Refactor AbstractIgniteCdcStreamer for use with CdcRegexManager
1 parent a2c68e8 commit 9f2ccea

1 file changed

Lines changed: 13 additions & 160 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 13 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,10 @@
1717

1818
package org.apache.ignite.cdc;
1919

20-
import java.io.IOException;
21-
import java.nio.file.Files;
2220
import java.nio.file.Path;
2321
import java.util.HashSet;
2422
import java.util.Iterator;
25-
import java.util.List;
26-
import java.util.Optional;
2723
import java.util.Set;
28-
import java.util.regex.Pattern;
29-
import java.util.regex.PatternSyntaxException;
3024
import java.util.stream.Collectors;
3125

3226
import org.apache.ignite.IgniteCheckedException;
@@ -44,8 +38,6 @@
4438
import org.apache.ignite.metric.MetricRegistry;
4539
import org.apache.ignite.resources.LoggerResource;
4640

47-
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
48-
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
4941
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
5042

5143
/**
@@ -78,33 +70,21 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
7870
/** */
7971
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";
8072

81-
/** File with saved names of caches added by cache masks. */
82-
private static final String SAVED_CACHES_FILE = "caches";
83-
84-
/** Temporary file with saved names of caches added by cache masks. */
85-
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
86-
87-
/** CDC directory path. */
88-
private Path cdcDir;
89-
9073
/** Handle only primary entry flag. */
9174
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
9275

9376
/** Cache names. */
9477
private Set<String> caches;
9578

79+
/** Regexp manager. */
80+
private CdcRegexManager regexManager;
81+
9682
/** Include regex templates for cache names. */
9783
private Set<String> includeTemplates = new HashSet<>();
9884

99-
/** Compiled include regex patterns for cache names. */
100-
private Set<Pattern> includeFilters;
101-
10285
/** Exclude regex templates for cache names. */
10386
private Set<String> excludeTemplates = new HashSet<>();
10487

105-
/** Compiled exclude regex patterns for cache names. */
106-
private Set<Pattern> excludeFilters;
107-
10888
/** Cache IDs. */
10989
protected Set<Integer> cachesIds;
11090

@@ -139,24 +119,18 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
139119
@Override public void start(MetricRegistry reg, Path cdcDir) {
140120
A.notEmpty(caches, "caches");
141121

142-
this.cdcDir = cdcDir;
122+
regexManager = new CdcRegexManager(cdcDir, log);
143123

144124
cachesIds = caches.stream()
145125
.mapToInt(CU::cacheId)
146126
.boxed()
147127
.collect(Collectors.toSet());
148128

149-
prepareRegexFilters();
129+
regexManager.compileRegexp(includeTemplates, excludeTemplates);
150130

151-
try {
152-
loadCaches().stream()
153-
.filter(this::matchesFilters)
154-
.map(CU::cacheId)
155-
.forEach(cachesIds::add);
156-
}
157-
catch (IOException e) {
158-
throw new IgniteException(e);
159-
}
131+
regexManager.getSavedCaches().stream()
132+
.map(CU::cacheId)
133+
.forEach(cachesIds::add);
160134

161135
MetricRegistryImpl mreg = (MetricRegistryImpl)reg;
162136

@@ -195,147 +169,26 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
195169
/** {@inheritDoc} */
196170
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
197171
cacheEvents.forEachRemaining(e -> {
198-
matchWithRegexTemplates(e.configuration().getName());
172+
matchWithRegex(e.configuration().getName());
199173
});
200174
}
201175

202176
/**
203177
* Finds match between cache name and user's regex templates.
204-
* If match is found, adds this cache's id to id's list and saves cache name to file.
178+
* If match is found, adds this cache's id to id's list.
205179
*
206180
* @param cacheName Cache name.
207181
*/
208-
private void matchWithRegexTemplates(String cacheName) {
182+
private void matchWithRegex(String cacheName) {
209183
int cacheId = CU.cacheId(cacheName);
210184

211-
if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
185+
if (!cachesIds.contains(cacheId) && regexManager.match(cacheName))
212186
cachesIds.add(cacheId);
213-
214-
try {
215-
List<String> caches = loadCaches();
216-
217-
caches.add(cacheName);
218-
219-
save(caches);
220-
}
221-
catch (IOException e) {
222-
throw new IgniteException(e);
223-
}
224-
225-
if (log.isInfoEnabled())
226-
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
227-
}
228-
}
229-
230-
/**
231-
* Writes caches list to file
232-
*
233-
* @param caches Caches list.
234-
*/
235-
private void save(List<String> caches) throws IOException {
236-
if (cdcDir == null) {
237-
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
238-
}
239-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
240-
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
241-
242-
StringBuilder cacheList = new StringBuilder();
243-
244-
for (String cache : caches) {
245-
cacheList.append(cache);
246-
247-
cacheList.append('\n');
248-
}
249-
250-
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
251-
252-
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
253-
}
254-
255-
/**
256-
* Loads saved caches from file.
257-
*
258-
* @return List of saved caches names.
259-
*/
260-
private List<String> loadCaches() throws IOException {
261-
if (cdcDir == null) {
262-
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
263-
}
264-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
265-
266-
if (Files.notExists(savedCachesPath)) {
267-
Files.createFile(savedCachesPath);
268-
269-
if (log.isInfoEnabled())
270-
log.info("Cache list created: " + savedCachesPath);
271-
}
272-
273-
return Files.readAllLines(savedCachesPath);
274-
}
275-
276-
/**
277-
* Compiles regex patterns from user templates.
278-
*
279-
* @throws PatternSyntaxException If the template's syntax is invalid
280-
*/
281-
private void prepareRegexFilters() {
282-
includeFilters = includeTemplates.stream()
283-
.map(Pattern::compile)
284-
.collect(Collectors.toSet());
285-
286-
excludeFilters = excludeTemplates.stream()
287-
.map(Pattern::compile)
288-
.collect(Collectors.toSet());
289-
}
290-
291-
/**
292-
* Matches cache name with compiled regex patterns.
293-
*
294-
* @param cacheName Cache name.
295-
* @return True if cache name match include patterns and don't match exclude patterns.
296-
*/
297-
private boolean matchesFilters(String cacheName) {
298-
boolean matchesInclude = includeFilters.stream()
299-
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
300-
301-
boolean notMatchesExclude = excludeFilters.stream()
302-
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
303-
304-
return matchesInclude && notMatchesExclude;
305187
}
306188

307189
/** {@inheritDoc} */
308190
@Override public void onCacheDestroy(Iterator<Integer> caches) {
309-
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
310-
}
311-
312-
/**
313-
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
314-
*
315-
* @param cacheId Cache id.
316-
*/
317-
private void deleteRegexpCacheIfPresent(Integer cacheId) {
318-
try {
319-
List<String> caches = loadCaches();
320-
321-
Optional<String> cacheName = caches.stream()
322-
.filter(name -> CU.cacheId(name) == cacheId)
323-
.findAny();
324-
325-
if (cacheName.isPresent()) {
326-
String name = cacheName.get();
327-
328-
caches.remove(name);
329-
330-
save(caches);
331-
332-
if (log.isInfoEnabled())
333-
log.info("Cache has been removed from replication [cacheName=" + name + ']');
334-
}
335-
}
336-
catch (IOException e) {
337-
throw new IgniteException(e);
338-
}
191+
caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent);
339192
}
340193

341194
/** {@inheritDoc} */

0 commit comments

Comments
 (0)