Skip to content

Commit 96f0bbf

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Add removal of destroyed caches from cacheList file
1 parent 0289edb commit 96f0bbf

2 files changed

Lines changed: 82 additions & 6 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashSet;
2626
import java.util.Iterator;
2727
import java.util.List;
28+
import java.util.Optional;
2829
import java.util.Set;
2930
import java.util.regex.Pattern;
3031
import java.util.regex.PatternSyntaxException;
@@ -283,9 +284,46 @@ private boolean matchesFilters(String cacheName) {
283284

284285
/** {@inheritDoc} */
285286
@Override public void onCacheDestroy(Iterator<Integer> caches) {
286-
caches.forEachRemaining(e -> {
287-
// Just skip. Handle of cache events not supported.
288-
});
287+
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
288+
}
289+
290+
/**
291+
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
292+
*
293+
* @param cacheId Cache id.
294+
*/
295+
private void deleteRegexpCacheIfPresent(Integer cacheId) {
296+
try {
297+
List<String> caches = loadCaches();
298+
299+
Optional<String> cacheName = caches.stream()
300+
.filter(name -> CU.cacheId(name) == cacheId)
301+
.findAny();
302+
303+
if (cacheName.isPresent()) {
304+
String name = cacheName.get();
305+
306+
caches.remove(name);
307+
308+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
309+
310+
StringBuilder cacheList = new StringBuilder();
311+
312+
for (String cache : caches) {
313+
cacheList.append(cache);
314+
315+
cacheList.append('\n');
316+
}
317+
318+
Files.write(savedCachesPath, cacheList.toString().getBytes());
319+
320+
if (log.isInfoEnabled())
321+
log.info("Cache has been removed from replication [cacheName=" + name + ']');
322+
}
323+
}
324+
catch (IOException e) {
325+
throw new IgniteException(e);
326+
}
289327
}
290328

291329
/** {@inheritDoc} */

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.HashSet;
2828
import java.util.Iterator;
2929
import java.util.List;
30+
import java.util.Optional;
3031
import java.util.Properties;
3132
import java.util.Set;
3233
import java.util.concurrent.ExecutionException;
@@ -280,9 +281,46 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
280281

281282
/** {@inheritDoc} */
282283
@Override public void onCacheDestroy(Iterator<Integer> caches) {
283-
caches.forEachRemaining(e -> {
284-
// Just skip. Handle of cache events not supported.
285-
});
284+
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
285+
}
286+
287+
/**
288+
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
289+
*
290+
* @param cacheId Cache id.
291+
*/
292+
private void deleteRegexpCacheIfPresent(Integer cacheId) {
293+
try {
294+
List<String> caches = loadCaches();
295+
296+
Optional<String> cacheName = caches.stream()
297+
.filter(name -> CU.cacheId(name) == cacheId)
298+
.findAny();
299+
300+
if (cacheName.isPresent()) {
301+
String name = cacheName.get();
302+
303+
caches.remove(name);
304+
305+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
306+
307+
StringBuilder cacheList = new StringBuilder();
308+
309+
for (String cache : caches) {
310+
cacheList.append(cache);
311+
312+
cacheList.append('\n');
313+
}
314+
315+
Files.write(savedCachesPath, cacheList.toString().getBytes());
316+
317+
if (log.isInfoEnabled())
318+
log.info("Cache has been removed from replication [cacheName=" + name + ']');
319+
}
320+
}
321+
catch (IOException e) {
322+
throw new IgniteException(e);
323+
}
286324
}
287325

288326
/** Send marker(meta need to be updated) record to each partition of events topic. */

0 commit comments

Comments
 (0)