Skip to content

Commit 2619efc

Browse files
authored
IGNITE-25706 Fail-fast detection for CDC tests (#12151)
1 parent 1d47614 commit 2619efc

3 files changed

Lines changed: 25 additions & 6 deletions

File tree

modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.ignite.internal.util.typedef.F;
4747
import org.apache.ignite.internal.util.typedef.T2;
4848
import org.apache.ignite.lang.IgniteBiTuple;
49+
import org.apache.ignite.lang.IgniteInClosure;
4950
import org.apache.ignite.metric.MetricRegistry;
5051
import org.apache.ignite.spi.metric.HistogramMetric;
5152
import org.apache.ignite.spi.metric.LongMetric;
@@ -167,7 +168,7 @@ protected void addAndWaitForConsumption(
167168
else
168169
cdc = createCdc(cnsmr, cfg);
169170

170-
IgniteInternalFuture<?> fut = runAsync(cdc);
171+
IgniteInternalFuture<?> fut = runCdcAsync(cdc, latch);
171172

172173
addData.apply(cache, from, to);
173174

@@ -279,6 +280,22 @@ protected MetricExporterSpi[] metricExporters() {
279280
return null;
280281
}
281282

283+
/** */
284+
protected IgniteInternalFuture<?> runCdcAsync(CdcMain cdc, CountDownLatch latch) {
285+
IgniteInternalFuture<?> fut = runAsync(cdc);
286+
287+
fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
288+
@Override public void apply(IgniteInternalFuture<?> initFut) {
289+
if (initFut.error() != null)
290+
log.error("The CDC main process has failed", initFut.error());
291+
292+
latch.countDown();
293+
}
294+
});
295+
296+
return fut;
297+
}
298+
282299
/** */
283300
public abstract static class TestCdcConsumer<T> implements CdcConsumer {
284301
/** Keys. */

modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,14 +629,15 @@ public void testMultiNodeConsumption() throws Exception {
629629
CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
630630
CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
631631

632-
IgniteInternalFuture<?> fut1 = runAsync(cdc1);
633-
IgniteInternalFuture<?> fut2 = runAsync(cdc2);
632+
IgniteInternalFuture<?> fut1 = runCdcAsync(cdc1, latch);
633+
IgniteInternalFuture<?> fut2 = runCdcAsync(cdc2, latch);
634634

635635
addDataFut.get(getTestTimeout());
636636

637637
runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)).get(getTestTimeout());
638638

639-
// Wait while predicate will become true and state saved on the disk for both cdc.
639+
// Wait while predicate will become true and state saved on the disk for both cdc or
640+
// while CdcMain futures completes when assertions fail in UserCdcConsumer methods.
640641
assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
641642

642643
checkMetrics(cdc1, keysCnt[0]);

modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void testReadAllSQLRows() throws Exception {
137137

138138
CdcMain cdc = createCdc(cnsmr, cfg, latch, userPredicate, cityPredicate);
139139

140-
IgniteInternalFuture<?> fut = runAsync(cdc);
140+
IgniteInternalFuture<?> fut = runCdcAsync(cdc, latch);
141141

142142
executeSql(
143143
ign,
@@ -167,7 +167,8 @@ public void testReadAllSQLRows() throws Exception {
167167
Integer.toString(127000 + i));
168168
}
169169

170-
// Wait while both predicte will become true and state saved on the disk.
170+
// Wait while both predicate will become true and state saved on the disk or
171+
// while CdcMain future completes when assertions fail in BinaryCdcConsumer methods.
171172
assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
172173

173174
checkMetrics(cdc, KEYS_CNT * 2);

0 commit comments

Comments
 (0)