Skip to content

Commit 323e828

Browse files
committed
add failing test case for starting multiple ChangeStreams
1 parent f8615ad commit 323e828

1 file changed

Lines changed: 71 additions & 0 deletions

File tree

test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractOplogTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,23 @@
77
import static de.bwaldvogel.mongo.backend.TestUtils.json;
88
import static de.bwaldvogel.mongo.backend.TestUtils.toArray;
99
import static java.util.Collections.singletonList;
10+
import static org.assertj.core.groups.Tuple.tuple;
1011
import static org.junit.jupiter.api.Assertions.assertThrows;
1112

1213
import java.time.Duration;
1314
import java.time.Instant;
15+
import java.util.AbstractMap;
1416
import java.util.ArrayList;
1517
import java.util.Arrays;
1618
import java.util.Date;
1719
import java.util.List;
20+
import java.util.Map;
1821
import java.util.NoSuchElementException;
1922
import java.util.UUID;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.stream.IntStream;
2025

26+
import org.assertj.core.api.Assertions;
2127
import org.bson.BsonDocument;
2228
import org.bson.BsonInt32;
2329
import org.bson.BsonTimestamp;
@@ -39,6 +45,8 @@
3945
import com.mongodb.reactivestreams.client.Success;
4046

4147
import de.bwaldvogel.mongo.oplog.OperationType;
48+
import io.reactivex.Flowable;
49+
import io.reactivex.schedulers.Schedulers;
4250
import io.reactivex.subscribers.TestSubscriber;
4351

4452
public abstract class AbstractOplogTest extends AbstractTest {
@@ -425,4 +433,67 @@ public void testSimpleChangeStreamWithFilter() throws Exception {
425433
assertThat(streamSubscriber.values().get(0).getFullDocument().get("bu")).isEqualTo("abc");
426434
}
427435

436+
@Test
437+
@Disabled
438+
public void testMultipleChangeStreams() throws InterruptedException {
439+
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 1")))
440+
.test().awaitDone(5, TimeUnit.SECONDS).assertComplete();
441+
442+
final int changeStreamCount = 32;
443+
444+
List<Bson> pipeline = singletonList(match(Filters.eq("fullDocument.bu", "abc")));
445+
446+
final TestSubscriber<Map<Integer, List<ChangeStreamDocument<Document>>>> streamSubscriber
447+
= new TestSubscriber<>();
448+
449+
Flowable.range(1, changeStreamCount)
450+
.flatMapSingle(index -> {
451+
return Flowable.fromPublisher(asyncCollection.watch(pipeline))
452+
.take(2)
453+
.toList()
454+
.map(changeStreamDocuments -> {
455+
return new AbstractMap.SimpleEntry<>(index, changeStreamDocuments);
456+
})
457+
.subscribeOn(Schedulers.io()); // subscribe to change streams concurrently
458+
})
459+
.toMap(Map.Entry::getKey, Map.Entry::getValue)
460+
.toFlowable()
461+
.subscribe(streamSubscriber);
462+
463+
// give time for all ChangeStream Publishers to be subscribed to
464+
// todo: expose API to get cursors from Backend and wait until 'changeStreamCount' cursors
465+
TimeUnit.SECONDS.sleep(5);
466+
467+
Flowable.concat(
468+
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 2, bu: 'abc"))),
469+
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 3, bu: 'xyz"))),
470+
Flowable.fromPublisher(asyncCollection.insertOne(json("_id: 4, bu: 'abc")))
471+
).test().awaitDone(5, TimeUnit.SECONDS).assertComplete();
472+
473+
final Map<Integer, List<ChangeStreamDocument<Document>>> results = streamSubscriber
474+
.awaitDone(15, TimeUnit.SECONDS)
475+
.assertComplete()
476+
.assertValueCount(1)
477+
.values().get(0);
478+
479+
Assertions.assertThat(IntStream.rangeClosed(1, changeStreamCount))
480+
.allSatisfy(index -> {
481+
Assertions.assertThat(results).containsKey(index);
482+
483+
final List<ChangeStreamDocument<Document>> emits = results.get(index);
484+
Assertions.assertThat(emits).isNotNull()
485+
.extracting(
486+
document -> {
487+
return document.getDocumentKey().getInt32("_id").getValue();
488+
},
489+
document -> {
490+
return document.getFullDocument() != null
491+
? document.getFullDocument().getString("bu")
492+
: null;
493+
}
494+
)
495+
.containsExactly(tuple(2, "abc"), tuple(4, "abc"));
496+
});
497+
}
498+
428499
}

0 commit comments

Comments
 (0)