File tree Expand file tree Collapse file tree
core/src/main/java/de/bwaldvogel/mongo/oplog
test-common/src/main/java/de/bwaldvogel/mongo/backend Expand file tree Collapse file tree Original file line number Diff line number Diff line change 11package de .bwaldvogel .mongo .oplog ;
22
33import java .util .List ;
4+ import java .util .concurrent .TimeUnit ;
45import java .util .function .Function ;
56import java .util .stream .Collectors ;
67import java .util .stream .Stream ;
@@ -27,6 +28,8 @@ public boolean isEmpty() {
2728
2829 @ Override
2930 public List <Document > takeDocuments (int numberToReturn ) {
31+ emulateWaitingForAllShards ();
32+
3033 Stream <Document > stream = oplogStream .apply (position );
3134
3235 if (numberToReturn > 0 ) {
@@ -38,6 +41,16 @@ public List<Document> takeDocuments(int numberToReturn) {
3841 return documents ;
3942 }
4043
44+ private void emulateWaitingForAllShards () {
45+ try {
46+ // artificial delay to avoid 100% CPU usage when starting multiple ChangeStreams
47+ // emulates real ChangeStream behaviour of waiting for all shards to provide data
48+ TimeUnit .MILLISECONDS .sleep (100 );
49+ } catch (InterruptedException e ) {
50+ // ignore
51+ }
52+ }
53+
4154 OplogPosition getPosition () {
4255 return position ;
4356 }
Original file line number Diff line number Diff line change @@ -434,7 +434,6 @@ public void testSimpleChangeStreamWithFilter() throws Exception {
434434 }
435435
436436 @ Test
437- @ Disabled
438437 public void testMultipleChangeStreams () throws InterruptedException {
439438 Flowable .fromPublisher (asyncCollection .insertOne (json ("_id: 1" )))
440439 .test ().awaitDone (5 , TimeUnit .SECONDS ).assertComplete ();
You can’t perform that action at this time.
0 commit comments