Skip to content

Commit 5b46c0a

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Make caches set in KafkaToIgniteCdcStreamerApplier mutable
1 parent daf62b7 commit 5b46c0a

1 file changed

Lines changed: 4 additions & 4 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Collection;
22+
import java.util.HashSet;
2223
import java.util.List;
2324
import java.util.Properties;
2425
import java.util.Set;
2526
import java.util.concurrent.atomic.AtomicBoolean;
26-
import java.util.stream.Collectors;
2727
import org.apache.ignite.IgniteException;
2828
import org.apache.ignite.IgniteLogger;
2929
import org.apache.ignite.cdc.AbstractCdcEventsApplier;
@@ -152,13 +152,13 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS
152152
protected void runAppliers() {
153153
AtomicBoolean stopped = new AtomicBoolean();
154154

155-
Set<Integer> caches = null;
155+
Set<Integer> caches = new HashSet<>();
156156

157157
if (!F.isEmpty(streamerCfg.getCaches())) {
158158
checkCaches(streamerCfg.getCaches());
159159

160-
caches = streamerCfg.getCaches().stream()
161-
.map(CU::cacheId).collect(Collectors.toSet());
160+
streamerCfg.getCaches().stream()
161+
.map(CU::cacheId).forEach(caches::add);
162162
}
163163

164164
KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(

0 commit comments

Comments
 (0)