Skip to content

Commit 17d7989

Browse files
lordgarrishAndrei Nadyktov
authored andcommitted
IGNITE-22530 Refactor IgniteToKafkaCdcStreamer for use with CdcRegexManager
1 parent 4b550d7 commit 17d7989

1 file changed

Lines changed: 12 additions & 158 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 12 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,28 @@
1717

1818
package org.apache.ignite.cdc.kafka;
1919

20-
import java.io.IOException;
21-
import java.nio.file.Files;
2220
import java.nio.file.Path;
2321
import java.util.ArrayList;
2422
import java.util.Collection;
2523
import java.util.HashSet;
2624
import java.util.Iterator;
2725
import java.util.List;
28-
import java.util.Optional;
2926
import java.util.Properties;
3027
import java.util.Set;
3128
import java.util.concurrent.ExecutionException;
3229
import java.util.concurrent.Future;
3330
import java.util.concurrent.TimeUnit;
3431
import java.util.concurrent.TimeoutException;
3532
import java.util.function.Function;
36-
import java.util.regex.Pattern;
37-
import java.util.regex.PatternSyntaxException;
3833
import java.util.stream.Collectors;
3934
import java.util.stream.IntStream;
4035

41-
import org.apache.ignite.IgniteException;
4236
import org.apache.ignite.IgniteLogger;
4337
import org.apache.ignite.binary.BinaryType;
4438
import org.apache.ignite.cdc.CdcCacheEvent;
4539
import org.apache.ignite.cdc.CdcConsumerEx;
4640
import org.apache.ignite.cdc.CdcEvent;
41+
import org.apache.ignite.cdc.CdcRegexManager;
4742
import org.apache.ignite.cdc.TypeMapping;
4843
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
4944
import org.apache.ignite.internal.binary.BinaryTypeImpl;
@@ -63,8 +58,6 @@
6358
import org.apache.kafka.common.serialization.ByteArraySerializer;
6459
import org.apache.kafka.common.serialization.IntegerSerializer;
6560

66-
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
67-
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
6861
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
6962
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
7063
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -158,27 +151,15 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
158151
/** Cache names. */
159152
private Collection<String> caches;
160153

161-
/** File with saved names of caches added by cache masks. */
162-
private static final String SAVED_CACHES_FILE = "caches";
163-
164-
/** Temporary file with saved names of caches added by cache masks. */
165-
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
166-
167-
/** CDC directory path. */
168-
private Path cdcDir;
154+
/** Regexp manager. */
155+
private CdcRegexManager regexManager;
169156

170157
/** Include regex templates for cache names. */
171158
private Set<String> includeTemplates = new HashSet<>();
172159

173-
/** Compiled include regex patterns for cache names. */
174-
private Set<Pattern> includeFilters;
175-
176160
/** Exclude regex templates for cache names. */
177161
private Set<String> excludeTemplates = new HashSet<>();
178162

179-
/** Compiled exclude regex patterns for cache names. */
180-
private Set<Pattern> excludeFilters;
181-
182163
/** Max batch size. */
183164
private int maxBatchSz = DFLT_MAX_BATCH_SIZE;
184165

@@ -278,42 +259,13 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
278259
/** {@inheritDoc} */
279260
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
280261
cacheEvents.forEachRemaining(e -> {
281-
matchWithRegexTemplates(e.configuration().getName());
262+
matchWithRegex(e.configuration().getName());
282263
});
283264
}
284265

285266
/** {@inheritDoc} */
286267
@Override public void onCacheDestroy(Iterator<Integer> caches) {
287-
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
288-
}
289-
290-
/**
291-
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
292-
*
293-
* @param cacheId Cache id.
294-
*/
295-
private void deleteRegexpCacheIfPresent(Integer cacheId) {
296-
try {
297-
List<String> caches = loadCaches();
298-
299-
Optional<String> cacheName = caches.stream()
300-
.filter(name -> CU.cacheId(name) == cacheId)
301-
.findAny();
302-
303-
if (cacheName.isPresent()) {
304-
String name = cacheName.get();
305-
306-
caches.remove(name);
307-
308-
save(caches);
309-
310-
if (log.isInfoEnabled())
311-
log.info("Cache has been removed from replication [cacheName=" + name + ']');
312-
}
313-
}
314-
catch (IOException e) {
315-
throw new IgniteException(e);
316-
}
268+
caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent);
317269
}
318270

319271
/** Send marker(meta need to be updated) record to each partition of events topic. */
@@ -393,23 +345,17 @@ private <T> void sendOneBatch(
393345
kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
394346
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
395347

396-
this.cdcDir = cdcDir;
348+
regexManager = new CdcRegexManager(cdcDir, log);
397349

398350
cachesIds = caches.stream()
399351
.map(CU::cacheId)
400352
.collect(Collectors.toSet());
401353

402-
prepareRegexFilters();
354+
regexManager.compileRegexp(includeTemplates, excludeTemplates);
403355

404-
try {
405-
loadCaches().stream()
406-
.filter(this::matchesFilters)
356+
regexManager.getSavedCaches().stream()
407357
.map(CU::cacheId)
408358
.forEach(cachesIds::add);
409-
}
410-
catch (IOException e) {
411-
throw new IgniteException(e);
412-
}
413359

414360
try {
415361
producer = new KafkaProducer<>(kafkaProps);
@@ -457,108 +403,16 @@ public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
457403
}
458404

459405
/**
460-
* Compiles regex patterns from user templates.
461-
*
462-
* @throws PatternSyntaxException If the template's syntax is invalid
463-
*/
464-
private void prepareRegexFilters() {
465-
includeFilters = includeTemplates.stream()
466-
.map(Pattern::compile)
467-
.collect(Collectors.toSet());
468-
469-
excludeFilters = excludeTemplates.stream()
470-
.map(Pattern::compile)
471-
.collect(Collectors.toSet());
472-
}
473-
474-
/**
475-
* Loads saved caches from file.
476-
*
477-
* @return List of saved caches names.
478-
*/
479-
private List<String> loadCaches() throws IOException {
480-
if (cdcDir == null) {
481-
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
482-
}
483-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
484-
485-
if (Files.notExists(savedCachesPath)) {
486-
Files.createFile(savedCachesPath);
487-
488-
if (log.isInfoEnabled())
489-
log.info("Cache list created: " + savedCachesPath);
490-
}
491-
492-
return Files.readAllLines(savedCachesPath);
493-
}
494-
495-
/**
496-
* Matches cache name with compiled regex patterns.
406+
* Finds a match between the cache name and user regex templates.
407+
* If a match is found, adds the cache to replication.
497408
*
498409
* @param cacheName Cache name.
499-
* @return True if cache name match include patterns and don't match exclude patterns.
500410
*/
501-
private boolean matchesFilters(String cacheName) {
502-
boolean matchesInclude = includeFilters.stream()
503-
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
504-
505-
boolean notMatchesExclude = excludeFilters.stream()
506-
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
507-
508-
return matchesInclude && notMatchesExclude;
509-
}
510-
511-
/**
512-
* Finds match between cache name and user's regex templates.
513-
* If match is found, adds this cache's id to id's list and saves cache name to file.
514-
*
515-
* @param cacheName Cache name.
516-
*/
517-
private void matchWithRegexTemplates(String cacheName) {
411+
private void matchWithRegex(String cacheName) {
518412
int cacheId = CU.cacheId(cacheName);
519413

520-
if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
414+
if (!cachesIds.contains(cacheId) && regexManager.match(cacheName))
521415
cachesIds.add(cacheId);
522-
523-
try {
524-
List<String> caches = loadCaches();
525-
526-
caches.add(cacheName);
527-
528-
save(caches);
529-
}
530-
catch (IOException e) {
531-
throw new IgniteException(e);
532-
}
533-
534-
if (log.isInfoEnabled())
535-
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
536-
}
537-
}
538-
539-
/**
540-
* Writes caches list to file
541-
*
542-
* @param caches Caches list.
543-
*/
544-
private void save(List<String> caches) throws IOException {
545-
if (cdcDir == null) {
546-
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
547-
}
548-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
549-
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
550-
551-
StringBuilder cacheList = new StringBuilder();
552-
553-
for (String cache : caches) {
554-
cacheList.append(cache);
555-
556-
cacheList.append('\n');
557-
}
558-
559-
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
560-
561-
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
562416
}
563417

564418
/**

0 commit comments

Comments
 (0)