3939import com .mongodb .reactivestreams .client .Success ;
4040
4141import de .bwaldvogel .mongo .oplog .OperationType ;
42+ import io .reactivex .subscribers .TestSubscriber ;
4243
4344public abstract class AbstractOplogTest extends AbstractTest {
4445
@@ -239,24 +240,25 @@ public void testChangeStreamInsertAndUpdateFullDocumentLookup() {
239240 Document .parse ("{'fullDocument.b': 1}" )))
240241 );
241242
242- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor =
243- collection .watch (pipeline ).fullDocument (FullDocument .UPDATE_LOOKUP ).cursor ();
243+ try ( MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor =
244+ collection .watch (pipeline ).fullDocument (FullDocument .UPDATE_LOOKUP ).cursor ()) {
244245
245- for (int i = 1 ; i < numberOfDocs + 1 ; i ++) {
246- Document doc = json (String .format ("a: %d, b: 1" , i ));
247- collection .insertOne (doc );
248- collection .updateOne (eq ("a" , i ), set ("c" , i * 10 ));
246+ for (int i = 1 ; i < numberOfDocs + 1 ; i ++) {
247+ Document doc = json (String .format ("a: %d, b: 1" , i ));
248+ collection .insertOne (doc );
249+ collection .updateOne (eq ("a" , i ), set ("c" , i * 10 ));
249250
250- ChangeStreamDocument <Document > insertDocument = cursor .next ();
251- ChangeStreamDocument <Document > updateDocument = cursor .next ();
251+ ChangeStreamDocument <Document > insertDocument = cursor .next ();
252+ ChangeStreamDocument <Document > updateDocument = cursor .next ();
252253
253- assertThat (insertDocument .getFullDocument ().get ("a" )).isEqualTo (i );
254- insert .add (insertDocument .getFullDocument ());
254+ assertThat (insertDocument .getFullDocument ().get ("a" )).isEqualTo (i );
255+ insert .add (insertDocument .getFullDocument ());
255256
256- assertThat (updateDocument .getFullDocument ().get ("a" )).isEqualTo (i );
257- update .add (updateDocument .getFullDocument ());
257+ assertThat (updateDocument .getFullDocument ().get ("a" )).isEqualTo (i );
258+ update .add (updateDocument .getFullDocument ());
258259
259- changeStreamsResult .addAll (Arrays .asList (insertDocument , updateDocument ));
260+ changeStreamsResult .addAll (Arrays .asList (insertDocument , updateDocument ));
261+ }
260262 }
261263
262264 assertThat (insert .size ()).isEqualTo (numberOfDocs );
@@ -267,94 +269,107 @@ public void testChangeStreamInsertAndUpdateFullDocumentLookup() {
267269 @ Test
268270 public void testChangeStreamUpdateDefault () {
269271 collection .insertOne (json ("a: 1, b: 2, c: 3" ));
270- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();
271- collection .updateOne (eq ("a" , 1 ), json ("$set: {b: 0, c: 10}" ));
272- ChangeStreamDocument <Document > updateDocument = cursor .next ();
273- Document fullDoc = updateDocument .getFullDocument ();
274- assertThat (fullDoc ).isNotNull ();
275- assertThat (fullDoc .get ("b" )).isEqualTo (0 );
276- assertThat (fullDoc .get ("c" )).isEqualTo (10 );
277-
278- collection .updateOne (eq ("a" , 1 ), unset ("b" ));
279- updateDocument = cursor .next ();
280- fullDoc = updateDocument .getFullDocument ();
281- assertThat (fullDoc ).isNotNull ();
282- assertThat (fullDoc .get ("b" )).isEqualTo ("" );
272+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ()) {
273+ collection .updateOne (eq ("a" , 1 ), json ("$set: {b: 0, c: 10}" ));
274+ ChangeStreamDocument <Document > updateDocument = cursor .next ();
275+ Document fullDoc = updateDocument .getFullDocument ();
276+ assertThat (fullDoc ).isNotNull ();
277+ assertThat (fullDoc .get ("b" )).isEqualTo (0 );
278+ assertThat (fullDoc .get ("c" )).isEqualTo (10 );
279+
280+ collection .updateOne (eq ("a" , 1 ), unset ("b" ));
281+ updateDocument = cursor .next ();
282+ fullDoc = updateDocument .getFullDocument ();
283+ assertThat (fullDoc ).isNotNull ();
284+ assertThat (fullDoc .get ("b" )).isEqualTo ("" );
285+ }
283286 }
284287
285288 @ Test
286289 public void testChangeStreamDelete () {
287290 collection .insertOne (json ("_id: 1" ));
288- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();
289- collection .deleteOne (json ("_id: 1" ));
290- ChangeStreamDocument <Document > deleteDocument = cursor .next ();
291- assertThat (deleteDocument .getDocumentKey ().get ("_id" )).isEqualTo (new BsonInt32 (1 ));
291+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ()) {
292+ collection .deleteOne (json ("_id: 1" ));
293+ ChangeStreamDocument <Document > deleteDocument = cursor .next ();
294+ assertThat (deleteDocument .getDocumentKey ().get ("_id" )).isEqualTo (new BsonInt32 (1 ));
295+ }
292296 }
293297
294298 @ Test
295299 public void testChangeStreamStartAfter () {
296300 collection .insertOne (json ("a: 1" )); // This is needed to initialize the collection in the server.
297- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();
298- collection .insertOne (json ("a: 2" ));
299- collection .insertOne (json ("a: 3" ));
300- ChangeStreamDocument <Document > document = cursor .next ();
301- BsonDocument resumeToken = document .getResumeToken ();
302-
303- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2 = collection .watch ().startAfter (resumeToken ).cursor ();
304- ChangeStreamDocument <Document > document2 = cursor2 .next ();
305- assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
301+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ()) {
302+ collection .insertOne (json ("a: 2" ));
303+ collection .insertOne (json ("a: 3" ));
304+ ChangeStreamDocument <Document > document = cursor .next ();
305+ BsonDocument resumeToken = document .getResumeToken ();
306+
307+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2
308+ = collection .watch ().startAfter (resumeToken ).cursor ()) {
309+ ChangeStreamDocument <Document > document2 = cursor2 .next ();
310+ assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
311+ }
312+ }
306313 }
307314
308315 @ Test
309316 public void testChangeStreamResumeAfter () {
310317 collection .insertOne (json ("a: 1" ));
311- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();
312- collection .insertOne (json ("a: 2" ));
313- collection .insertOne (json ("a: 3" ));
314- ChangeStreamDocument <Document > document = cursor .next ();
315- BsonDocument resumeToken = document .getResumeToken ();
316-
317- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2 = collection .watch ().resumeAfter (resumeToken ).cursor ();
318- ChangeStreamDocument <Document > document2 = cursor2 .next ();
319- assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
318+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();) {
319+ collection .insertOne (json ("a: 2" ));
320+ collection .insertOne (json ("a: 3" ));
321+ ChangeStreamDocument <Document > document = cursor .next ();
322+ BsonDocument resumeToken = document .getResumeToken ();
323+
324+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2
325+ = collection .watch ().resumeAfter (resumeToken ).cursor ()) {
326+ ChangeStreamDocument <Document > document2 = cursor2 .next ();
327+ assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
328+ }
329+ }
320330 }
321331
322332 @ Test
323333 public void testChangeStreamResumeAfterTerminalEvent () {
324334 assertThrows (NoSuchElementException .class , () -> {
325335 MongoCollection <Document > col = db .getCollection ("test-collection" );
326336 ChangeStreamIterable <Document > watch = col .watch ().fullDocument (FullDocument .UPDATE_LOOKUP ).batchSize (1 );
327- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = watch .cursor ();
328- col .insertOne (json ("a: 1" ));
329- cursor .next ();
330-
331- col .drop ();
332-
333- ChangeStreamDocument <Document > document = cursor .next ();
334- BsonDocument resumeToken = document .getResumeToken ();
335- cursor = watch .resumeAfter (resumeToken ).cursor ();
336- document = cursor .next ();
337-
338- assertThat (document ).isNotNull ();
339- assertThat (document .getOperationType ()).isEqualTo (com .mongodb .client .model .changestream .OperationType .INVALIDATE );
340- cursor .next ();
337+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = watch .cursor ();) {
338+ col .insertOne (json ("a: 1" ));
339+ cursor .next ();
340+
341+ col .drop ();
342+
343+ ChangeStreamDocument <Document > document = cursor .next ();
344+ BsonDocument resumeToken = document .getResumeToken ();
345+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> resumeAfterCursor
346+ = watch .resumeAfter (resumeToken ).cursor ();) {
347+ document = resumeAfterCursor .next ();
348+
349+ assertThat (document ).isNotNull ();
350+ assertThat (document .getOperationType ())
351+ .isEqualTo (com .mongodb .client .model .changestream .OperationType .INVALIDATE );
352+ resumeAfterCursor .next ();
353+ }
354+ }
341355 });
342356 }
343357
344358 @ Test
345359 public void testChangeStreamStartAtOperationTime () {
346360 collection .insertOne (json ("a: 1" ));
347- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ();
348- collection .insertOne (json ("a: 2" ));
349- collection .insertOne (json ("a: 3" ));
350- ChangeStreamDocument <Document > document = cursor .next ();
351- BsonTimestamp startAtOperationTime = document .getClusterTime ();
352-
353- MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2 = collection .watch ().startAtOperationTime (startAtOperationTime ).cursor ();
354- ChangeStreamDocument <Document > document2 = cursor2 .next ();
355- assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (2 );
356- document2 = cursor2 .next ();
357- assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
361+ try (MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor = collection .watch ().cursor ()) {
362+ collection .insertOne (json ("a: 2" ));
363+ collection .insertOne (json ("a: 3" ));
364+ ChangeStreamDocument <Document > document = cursor .next ();
365+ BsonTimestamp startAtOperationTime = document .getClusterTime ();
366+
367+ MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor2 = collection .watch ().startAtOperationTime (startAtOperationTime ).cursor ();
368+ ChangeStreamDocument <Document > document2 = cursor2 .next ();
369+ assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (2 );
370+ document2 = cursor2 .next ();
371+ assertThat (document2 .getFullDocument ().get ("a" )).isEqualTo (3 );
372+ }
358373 }
359374
360375 @ Test
@@ -372,9 +387,9 @@ public void testChangeStreamAndReplaceOneWithUpsertTrue() throws Exception {
372387 TestSubscriber <Document > findSubscriber = new TestSubscriber <>();
373388 asyncCollection .find (json ("a:1" )).subscribe (findSubscriber );
374389 findSubscriber .awaitTerminalEvent ();
375- assertThat (findSubscriber .getSingleValue ( ).get ("a" )).isEqualTo (1 );
390+ assertThat (findSubscriber .values (). get ( 0 ).get ("a" )).isEqualTo (1 );
376391
377- streamSubscriber .awaitCount (1 );
392+ streamSubscriber .awaitCount (1 ). assertValueCount ( 1 ). cancel () ;
378393 assertThat (streamSubscriber .values ().get (0 ).getOperationType ().getValue ()).isEqualTo ("insert" );
379394 assertThat (streamSubscriber .values ().get (0 ).getFullDocument ()).isEqualTo (findSubscriber .values ().get (0 ));
380395 }
@@ -399,8 +414,8 @@ public void testSimpleChangeStreamWithFilter() throws Exception {
399414 insertSubscriber1 .awaitTerminalEvent ();
400415 insertSubscriber2 .awaitTerminalEvent ();
401416
402- streamSubscriber .awaitCount (1 );
403- assertThat (streamSubscriber .getSingleValue ( ).getFullDocument ().get ("bu" )).isEqualTo ("abc" );
417+ streamSubscriber .awaitCount (1 ). assertValueCount ( 1 ). cancel () ;
418+ assertThat (streamSubscriber .values (). get ( 0 ).getFullDocument ().get ("bu" )).isEqualTo ("abc" );
404419 }
405420
406421}
0 commit comments