Skip to content

Commit 82e0aa1

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Add removal of destroyed caches from cacheList file
1 parent 5b46c0a commit 82e0aa1

2 files changed

Lines changed: 82 additions & 6 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashSet;
2626
import java.util.Iterator;
2727
import java.util.List;
28+
import java.util.Optional;
2829
import java.util.Set;
2930
import java.util.regex.Pattern;
3031
import java.util.regex.PatternSyntaxException;
@@ -283,9 +284,46 @@ private boolean matchesFilters(String cacheName) {
283284

284285
/** {@inheritDoc} */
285286
@Override public void onCacheDestroy(Iterator<Integer> caches) {
286-
caches.forEachRemaining(e -> {
287-
// Just skip. Handle of cache events not supported.
288-
});
287+
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
288+
}
289+
290+
/**
291+
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
292+
*
293+
* @param cacheId Cache id.
294+
*/
295+
private void deleteRegexpCacheIfPresent(Integer cacheId) {
296+
try {
297+
List<String> caches = loadCaches();
298+
299+
Optional<String> cacheName = caches.stream()
300+
.filter(name -> CU.cacheId(name) == cacheId)
301+
.findAny();
302+
303+
if (cacheName.isPresent()) {
304+
String name = cacheName.get();
305+
306+
caches.remove(name);
307+
308+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
309+
310+
StringBuilder cacheList = new StringBuilder();
311+
312+
for (String cache : caches) {
313+
cacheList.append(cache);
314+
315+
cacheList.append('\n');
316+
}
317+
318+
Files.write(savedCachesPath, cacheList.toString().getBytes());
319+
320+
if (log.isInfoEnabled())
321+
log.info("Cache has been removed from replication [cacheName=" + name + ']');
322+
}
323+
}
324+
catch (IOException e) {
325+
throw new IgniteException(e);
326+
}
289327
}
290328

291329
/** {@inheritDoc} */

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.HashSet;
2828
import java.util.Iterator;
2929
import java.util.List;
30+
import java.util.Optional;
3031
import java.util.Properties;
3132
import java.util.Set;
3233
import java.util.concurrent.ExecutionException;
@@ -282,9 +283,46 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
282283

283284
/** {@inheritDoc} */
284285
@Override public void onCacheDestroy(Iterator<Integer> caches) {
285-
caches.forEachRemaining(e -> {
286-
// Just skip. Handle of cache events not supported.
287-
});
286+
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
287+
}
288+
289+
/**
290+
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
291+
*
292+
* @param cacheId Cache id.
293+
*/
294+
private void deleteRegexpCacheIfPresent(Integer cacheId) {
295+
try {
296+
List<String> caches = loadCaches();
297+
298+
Optional<String> cacheName = caches.stream()
299+
.filter(name -> CU.cacheId(name) == cacheId)
300+
.findAny();
301+
302+
if (cacheName.isPresent()) {
303+
String name = cacheName.get();
304+
305+
caches.remove(name);
306+
307+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
308+
309+
StringBuilder cacheList = new StringBuilder();
310+
311+
for (String cache : caches) {
312+
cacheList.append(cache);
313+
314+
cacheList.append('\n');
315+
}
316+
317+
Files.write(savedCachesPath, cacheList.toString().getBytes());
318+
319+
if (log.isInfoEnabled())
320+
log.info("Cache has been removed from replication [cacheName=" + name + ']');
321+
}
322+
}
323+
catch (IOException e) {
324+
throw new IgniteException(e);
325+
}
288326
}
289327

290328
/** Send marker(meta need to be updated) record to each partition of events topic. */

0 commit comments

Comments
 (0)