Skip to content

Commit 3b25dd9

Browse files
lordgarrishAndrei Nadyktov
authored andcommitted
IGNITE-22530 Add atomic write to caches file
1 parent 96f0bbf commit 3b25dd9

3 files changed

Lines changed: 84 additions & 69 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23-
import java.nio.file.StandardOpenOption;
24-
import java.util.Collections;
2523
import java.util.HashSet;
2624
import java.util.Iterator;
2725
import java.util.List;
@@ -46,6 +44,8 @@
4644
import org.apache.ignite.metric.MetricRegistry;
4745
import org.apache.ignite.resources.LoggerResource;
4846

47+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
48+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
4949
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
5050

5151
/**
@@ -81,6 +81,9 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
8181
/** File with saved names of caches added by cache masks. */
8282
private static final String SAVED_CACHES_FILE = "caches";
8383

84+
/** Temporary file with saved names of caches added by cache masks. */
85+
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
86+
8487
/** CDC directory path. */
8588
private Path cdcDir;
8689

@@ -193,7 +196,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
193196

194197
/**
195198
* Finds match between cache name and user's regex templates.
196-
* If match found, adds this cache's id to id's list and saves cache name to file.
199+
* If match is found, adds this cache's id to id's list and saves cache name to file.
197200
*
198201
* @param cacheName Cache name.
199202
*/
@@ -204,7 +207,11 @@ private void matchWithRegexTemplates(String cacheName) {
204207
cachesIds.add(cacheId);
205208

206209
try {
207-
saveCache(cacheName);
210+
List<String> caches = loadCaches();
211+
212+
caches.add(cacheName);
213+
214+
save(caches);
208215
}
209216
catch (IOException e) {
210217
throw new IgniteException(e);
@@ -216,18 +223,28 @@ private void matchWithRegexTemplates(String cacheName) {
216223
}
217224

218225
/**
219-
* Writes cache name to file
226+
* Writes caches list to file
220227
*
221-
* @param cacheName Cache name.
228+
* @param caches Caches list.
222229
*/
223-
private void saveCache(String cacheName) throws IOException {
224-
if (cdcDir != null) {
225-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
230+
private void save(List<String> caches) throws IOException {
231+
if (cdcDir == null) {
232+
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
233+
}
234+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
235+
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
226236

227-
String cn = cacheName + '\n';
237+
StringBuilder cacheList = new StringBuilder();
228238

229-
Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
239+
for (String cache : caches) {
240+
cacheList.append(cache);
241+
242+
cacheList.append('\n');
230243
}
244+
245+
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
246+
247+
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
231248
}
232249

233250
/**
@@ -236,19 +253,19 @@ private void saveCache(String cacheName) throws IOException {
236253
* @return List of saved caches names.
237254
*/
238255
private List<String> loadCaches() throws IOException {
239-
if (cdcDir != null) {
240-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
241-
242-
if (Files.notExists(savedCachesPath)) {
243-
Files.createFile(savedCachesPath);
256+
if (cdcDir == null) {
257+
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
258+
}
259+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
244260

245-
if (log.isInfoEnabled())
246-
log.info("Cache list created: " + savedCachesPath);
247-
}
261+
if (Files.notExists(savedCachesPath)) {
262+
Files.createFile(savedCachesPath);
248263

249-
return Files.readAllLines(savedCachesPath);
264+
if (log.isInfoEnabled())
265+
log.info("Cache list created: " + savedCachesPath);
250266
}
251-
return Collections.emptyList();
267+
268+
return Files.readAllLines(savedCachesPath);
252269
}
253270

254271
/**
@@ -305,17 +322,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {
305322

306323
caches.remove(name);
307324

308-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
309-
310-
StringBuilder cacheList = new StringBuilder();
311-
312-
for (String cache : caches) {
313-
cacheList.append(cache);
314-
315-
cacheList.append('\n');
316-
}
317-
318-
Files.write(savedCachesPath, cacheList.toString().getBytes());
325+
save(caches);
319326

320327
if (log.isInfoEnabled())
321328
log.info("Cache has been removed from replication [cacheName=" + name + ']');

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23-
import java.nio.file.StandardOpenOption;
2423
import java.util.ArrayList;
2524
import java.util.Collection;
26-
import java.util.Collections;
2725
import java.util.HashSet;
2826
import java.util.Iterator;
2927
import java.util.List;
@@ -65,6 +63,8 @@
6563
import org.apache.kafka.common.serialization.ByteArraySerializer;
6664
import org.apache.kafka.common.serialization.IntegerSerializer;
6765

66+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
67+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
6868
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
6969
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
7070
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -161,6 +161,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
161161
/** File with saved names of caches added by cache masks. */
162162
private static final String SAVED_CACHES_FILE = "caches";
163163

164+
/** Temporary file with saved names of caches added by cache masks. */
165+
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
166+
164167
/** CDC directory path. */
165168
private Path cdcDir;
166169

@@ -302,17 +305,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {
302305

303306
caches.remove(name);
304307

305-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
306-
307-
StringBuilder cacheList = new StringBuilder();
308-
309-
for (String cache : caches) {
310-
cacheList.append(cache);
311-
312-
cacheList.append('\n');
313-
}
314-
315-
Files.write(savedCachesPath, cacheList.toString().getBytes());
308+
save(caches);
316309

317310
if (log.isInfoEnabled())
318311
log.info("Cache has been removed from replication [cacheName=" + name + ']');
@@ -479,19 +472,19 @@ private void prepareRegexFilters() {
479472
* @return List of saved caches names.
480473
*/
481474
private List<String> loadCaches() throws IOException {
482-
if (cdcDir != null) {
483-
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
484-
485-
if (Files.notExists(savedCachesPath)) {
486-
Files.createFile(savedCachesPath);
475+
if (cdcDir == null) {
476+
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
477+
}
478+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
487479

488-
if (log.isInfoEnabled())
489-
log.info("Cache list created: " + savedCachesPath);
490-
}
480+
if (Files.notExists(savedCachesPath)) {
481+
Files.createFile(savedCachesPath);
491482

492-
return Files.readAllLines(savedCachesPath);
483+
if (log.isInfoEnabled())
484+
log.info("Cache list created: " + savedCachesPath);
493485
}
494-
return Collections.emptyList();
486+
487+
return Files.readAllLines(savedCachesPath);
495488
}
496489

497490
/**
@@ -512,7 +505,7 @@ private boolean matchesFilters(String cacheName) {
512505

513506
/**
514507
* Finds match between cache name and user's regex templates.
515-
* If match found, adds this cache's id to id's list and saves cache name to file.
508+
* If match is found, adds this cache's id to id's list and saves cache name to file.
516509
*
517510
* @param cacheName Cache name.
518511
*/
@@ -523,7 +516,11 @@ private void matchWithRegexTemplates(String cacheName) {
523516
cachesIds.add(cacheId);
524517

525518
try {
526-
saveCache(cacheName);
519+
List<String> caches = loadCaches();
520+
521+
caches.add(cacheName);
522+
523+
save(caches);
527524
}
528525
catch (IOException e) {
529526
throw new IgniteException(e);
@@ -535,18 +532,28 @@ private void matchWithRegexTemplates(String cacheName) {
535532
}
536533

537534
/**
538-
* Writes cache name to file.
535+
* Writes caches list to file
539536
*
540-
* @param cacheName Cache name.
537+
* @param caches Caches list.
541538
*/
542-
private void saveCache(String cacheName) throws IOException {
543-
if (cdcDir != null) {
544-
Path savedCaches = cdcDir.resolve(SAVED_CACHES_FILE);
539+
private void save(List<String> caches) throws IOException {
540+
if (cdcDir == null) {
541+
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
542+
}
543+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
544+
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
545+
546+
StringBuilder cacheList = new StringBuilder();
545547

546-
String cn = cacheName + '\n';
548+
for (String cache : caches) {
549+
cacheList.append(cache);
547550

548-
Files.write(savedCaches, cn.getBytes(), StandardOpenOption.APPEND);
551+
cacheList.append('\n');
549552
}
553+
554+
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
555+
556+
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
550557
}
551558

552559
/**

modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
115115
for (IgniteEx ex : srcCluster) {
116116
int idx = getTestIgniteInstanceIndex(ex.name());
117117

118-
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
118+
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates,
119+
excludeTemplates, "ignite-src-to-kafka-" + idx));
119120
}
120121

121122
for (int i = 0; i < destCluster.length; i++) {
@@ -149,15 +150,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
149150
for (IgniteEx ex : srcCluster) {
150151
int idx = getTestIgniteInstanceIndex(ex.name());
151152

152-
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
153-
excludeTemplates, "ignite-src-to-kafka-" + idx));
153+
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE,
154+
includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
154155
}
155156

156157
for (IgniteEx ex : destCluster) {
157158
int idx = getTestIgniteInstanceIndex(ex.name());
158159

159-
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
160-
excludeTemplates, "ignite-dest-to-kafka-" + idx));
160+
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE,
161+
includeTemplates, excludeTemplates, "ignite-dest-to-kafka-" + idx));
161162
}
162163

163164
futs.add(kafkaToIgnite(

0 commit comments

Comments
 (0)