Skip to content

Commit 104a1a6

Browse files
committed
split ChangeStreamCursor from OplogCursor
1 parent 77d5589 commit 104a1a6

8 files changed

Lines changed: 196 additions & 116 deletions

File tree

core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String
644644
int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0);
645645

646646
String namespace = getFullCollectionNamespace(collectionName);
647-
Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation);
647+
Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation);
648648
return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor);
649649
}
650650

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package de.bwaldvogel.mongo.oplog;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
6+
import de.bwaldvogel.mongo.MongoBackend;
7+
import de.bwaldvogel.mongo.backend.TailableCursor;
8+
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
9+
import de.bwaldvogel.mongo.bson.BsonTimestamp;
10+
import de.bwaldvogel.mongo.bson.Document;
11+
import de.bwaldvogel.mongo.exception.MongoServerException;
12+
13+
public class ChangeStreamCursor implements TailableCursor {
14+
15+
private static final String FULL_DOCUMENT = "fullDocument";
16+
17+
private final MongoBackend mongoBackend;
18+
private final Document changeStreamDocument;
19+
private final Aggregation aggregation;
20+
private final OplogCursor oplogCursor;
21+
22+
ChangeStreamCursor(
23+
MongoBackend mongoBackend,
24+
Document changeStreamDocument,
25+
Aggregation aggregation,
26+
OplogCursor oplogCursor
27+
) {
28+
this.mongoBackend = mongoBackend;
29+
this.changeStreamDocument = changeStreamDocument;
30+
this.aggregation = aggregation;
31+
this.oplogCursor = oplogCursor;
32+
}
33+
34+
@Override
35+
public long getId() {
36+
return oplogCursor.getId();
37+
}
38+
39+
@Override
40+
public boolean isEmpty() {
41+
return oplogCursor.isEmpty();
42+
}
43+
44+
@Override
45+
public List<Document> takeDocuments(int numberToReturn) {
46+
return aggregation.runStagesAsStream(
47+
oplogCursor.takeDocuments(numberToReturn).stream()
48+
.map(document -> toChangeStreamResponseDocument(document))
49+
).collect(Collectors.toList());
50+
}
51+
52+
@Override
53+
public OplogPosition getPosition() {
54+
return oplogCursor.getPosition();
55+
}
56+
57+
private Document toChangeStreamResponseDocument(Document oplogDocument) {
58+
OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString());
59+
Document documentKey = new Document();
60+
Document document = getUpdateDocument(oplogDocument);
61+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument);
62+
OplogPosition oplogPosition = new OplogPosition(timestamp);
63+
switch (operationType) {
64+
case UPDATE:
65+
case DELETE:
66+
documentKey = document;
67+
break;
68+
case INSERT:
69+
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
70+
break;
71+
case COMMAND:
72+
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
73+
default:
74+
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
75+
}
76+
77+
return new Document()
78+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
79+
.append("operationType", operationType.getDescription())
80+
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
81+
.append("documentKey", documentKey)
82+
.append("clusterTime", timestamp);
83+
}
84+
85+
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
86+
Document document = getUpdateDocument(oplogDocument);
87+
String operationType = document.keySet().stream().findFirst().orElseThrow(
88+
() -> new MongoServerException("Unspecified command operation type")
89+
);
90+
91+
return new Document()
92+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
93+
.append("operationType", operationType)
94+
.append("clusterTime", timestamp);
95+
}
96+
97+
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
98+
switch (operationType) {
99+
case INSERT:
100+
return getUpdateDocument(document);
101+
case DELETE:
102+
return null;
103+
case UPDATE:
104+
return lookUpUpdateDocument(changeStreamDocument, document);
105+
}
106+
throw new IllegalArgumentException("Invalid operation type");
107+
}
108+
109+
private Document getUpdateDocument(Document document) {
110+
return (Document) document.get(OplogDocumentFields.O);
111+
}
112+
113+
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
114+
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
115+
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
116+
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
117+
String databaseName = namespace.split("\\.")[0];
118+
String collectionName = namespace.split("\\.")[1];
119+
return mongoBackend.resolveDatabase(databaseName)
120+
.resolveCollection(collectionName, true)
121+
.queryAllAsStream()
122+
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
123+
.findFirst()
124+
.orElse(deltaUpdate);
125+
}
126+
return deltaUpdate;
127+
}
128+
129+
private Document getDeltaUpdate(Document updateDocument) {
130+
Document delta = new Document();
131+
if (updateDocument.containsKey("$set")) {
132+
delta.appendAll((Document) updateDocument.get("$set"));
133+
}
134+
if (updateDocument.containsKey("$unset")) {
135+
delta.appendAll((Document) updateDocument.get("$unset"));
136+
}
137+
return delta;
138+
}
139+
140+
}

core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java

Lines changed: 23 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@
22

33
import java.util.List;
44
import java.util.UUID;
5-
import java.util.function.Function;
65
import java.util.stream.Collectors;
76
import java.util.stream.Stream;
87

98
import de.bwaldvogel.mongo.MongoBackend;
109
import de.bwaldvogel.mongo.MongoCollection;
11-
import de.bwaldvogel.mongo.backend.Cursor;
1210
import de.bwaldvogel.mongo.backend.CursorRegistry;
11+
import de.bwaldvogel.mongo.backend.TailableCursor;
1312
import de.bwaldvogel.mongo.backend.Utils;
1413
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
1514
import de.bwaldvogel.mongo.bson.BsonTimestamp;
1615
import de.bwaldvogel.mongo.bson.Document;
17-
import de.bwaldvogel.mongo.exception.MongoServerException;
1816

1917
public class CollectionBackedOplog implements Oplog {
2018

@@ -78,23 +76,13 @@ public void handleDropCollection(String namespace) {
7876
collection.addDocument(toOplogDropCollection(databaseName, collectionName));
7977
}
8078

81-
private Stream<Document> streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation) {
82-
return aggregation.runStagesAsStream(collection.queryAllAsStream()
83-
.filter(document -> {
84-
BsonTimestamp timestamp = getOplogTimestamp(document);
85-
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
86-
return documentOplogPosition.isAfter(position);
87-
})
88-
.sorted((o1, o2) -> {
89-
BsonTimestamp timestamp1 = getOplogTimestamp(o1);
90-
BsonTimestamp timestamp2 = getOplogTimestamp(o2);
91-
return timestamp1.compareTo(timestamp2);
92-
})
93-
.map(document -> toChangeStreamResponseDocument(document, changeStreamDocument)));
79+
@Override
80+
public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
81+
return new OplogCursor(cursorRegistry.generateCursorId(), this::streamOplog, initialOplogPosition);
9482
}
9583

9684
@Override
97-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
85+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
9886
Document startAfter = (Document) changeStreamDocument.get("startAfter");
9987
Document resumeAfter = (Document) changeStreamDocument.get("resumeAfter");
10088
BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME);
@@ -107,7 +95,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
10795
String collectionName = Utils.getCollectionNameFromFullName(namespace);
10896
boolean resumeAfterTerminalEvent = collection.queryAllAsStream()
10997
.filter(document -> {
110-
BsonTimestamp timestamp = getOplogTimestamp(document);
98+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
11199
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
112100
return initialOplogPosition.isAfter(documentOplogPosition.inclusive());
113101
})
@@ -125,12 +113,27 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
125113
initialOplogPosition = new OplogPosition(oplogClock.now());
126114
}
127115

128-
Function<OplogPosition, Stream<Document>> streamSupplier = position -> streamOplog(changeStreamDocument, position, aggregation);
129-
OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition);
116+
OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition);
117+
ChangeStreamCursor cursor
118+
= new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor);
130119
cursorRegistry.add(cursor);
131120
return cursor;
132121
}
133122

123+
private Stream<Document> streamOplog(OplogPosition position) {
124+
return collection.queryAllAsStream()
125+
.filter(document -> {
126+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
127+
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
128+
return documentOplogPosition.isAfter(position);
129+
})
130+
.sorted((o1, o2) -> {
131+
BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1);
132+
BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2);
133+
return timestamp1.compareTo(timestamp2);
134+
});
135+
}
136+
134137
private Document toOplogDocument(OperationType operationType, String namespace) {
135138
return new Document()
136139
.append(OplogDocumentFields.TIMESTAMP, oplogClock.incrementAndGet())
@@ -168,91 +171,4 @@ private boolean isOplogCollection(String namespace) {
168171
return collection.getFullName().equals(namespace);
169172
}
170173

171-
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
172-
switch (operationType) {
173-
case INSERT:
174-
return getUpdateDocument(document);
175-
case DELETE:
176-
return null;
177-
case UPDATE:
178-
return lookUpUpdateDocument(changeStreamDocument, document);
179-
}
180-
throw new IllegalArgumentException("Invalid operation type");
181-
}
182-
183-
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
184-
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
185-
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
186-
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
187-
String databaseName = namespace.split("\\.")[0];
188-
String collectionName = namespace.split("\\.")[1];
189-
return backend.resolveDatabase(databaseName)
190-
.resolveCollection(collectionName, true)
191-
.queryAllAsStream()
192-
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
193-
.findFirst()
194-
.orElse(deltaUpdate);
195-
}
196-
return deltaUpdate;
197-
}
198-
199-
private Document getDeltaUpdate(Document updateDocument) {
200-
Document delta = new Document();
201-
if (updateDocument.containsKey("$set")) {
202-
delta.appendAll((Document) updateDocument.get("$set"));
203-
}
204-
if (updateDocument.containsKey("$unset")) {
205-
delta.appendAll((Document) updateDocument.get("$unset"));
206-
}
207-
return delta;
208-
}
209-
210-
private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) {
211-
OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString());
212-
Document documentKey = new Document();
213-
Document document = getUpdateDocument(oplogDocument);
214-
BsonTimestamp timestamp = getOplogTimestamp(oplogDocument);
215-
OplogPosition oplogPosition = new OplogPosition(timestamp);
216-
switch (operationType) {
217-
case UPDATE:
218-
case DELETE:
219-
documentKey = document;
220-
break;
221-
case INSERT:
222-
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
223-
break;
224-
case COMMAND:
225-
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
226-
default:
227-
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
228-
}
229-
230-
return new Document()
231-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
232-
.append("operationType", operationType.getDescription())
233-
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
234-
.append("documentKey", documentKey)
235-
.append("clusterTime", timestamp);
236-
}
237-
238-
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
239-
Document document = getUpdateDocument(oplogDocument);
240-
String operationType = document.keySet().stream().findFirst().orElseThrow(
241-
() -> new MongoServerException("Unspecified command operation type")
242-
);
243-
244-
return new Document()
245-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
246-
.append("operationType", operationType)
247-
.append("clusterTime", timestamp);
248-
}
249-
250-
private static BsonTimestamp getOplogTimestamp(Document document) {
251-
return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP);
252-
}
253-
254-
private static Document getUpdateDocument(Document document) {
255-
return (Document) document.get(OplogDocumentFields.O);
256-
}
257-
258174
}

core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
65
import de.bwaldvogel.mongo.backend.EmptyCursor;
6+
import de.bwaldvogel.mongo.backend.TailableCursor;
77
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
88
import de.bwaldvogel.mongo.bson.Document;
99

@@ -35,7 +35,12 @@ public void handleDropCollection(String namespace) {
3535
}
3636

3737
@Override
38-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
38+
public TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
39+
return EmptyCursor.get();
40+
}
41+
42+
@Override
43+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
3944
return EmptyCursor.get();
4045
}
4146
}

core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
5+
import de.bwaldvogel.mongo.backend.TailableCursor;
66
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
77
import de.bwaldvogel.mongo.bson.Document;
88

@@ -16,5 +16,7 @@ public interface Oplog {
1616

1717
void handleDropCollection(String namespace);
1818

19-
Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
19+
TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition);
20+
21+
TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
2022
}

0 commit comments

Comments
 (0)