Skip to content

Commit 43428de

Browse files
committed
feat: add appendRecords operation with consistency checks
1 parent 6f085c3 commit 43428de

17 files changed

Lines changed: 2043 additions & 6 deletions

docs/api/appending-events.md

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,15 @@ client.appendToStream("some-stream", options, eventData)
247247
.get();
248248
```
249249

250-
## Append to multiple streams
250+
## Atomic appends
251251

252-
::: note
253-
This feature is only available in KurrentDB 25.1 and later.
254-
:::
252+
KurrentDB provides two operations for appending events to one or more streams in a single atomic transaction: `appendRecords` and `multiStreamAppend`. Both guarantee that either all writes succeed or the entire operation fails, but they differ in how records are organized, ordered, and validated.
255253

256-
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
254+
| | `appendRecords` | `multiStreamAppend` |
255+
|------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------|
256+
| **Available since** | KurrentDB 26.1 | KurrentDB 25.1 |
257+
| **Record ordering** | Interleaved. Records from different streams can be mixed, and their exact order is preserved in the global log. | Grouped. All records for a stream are sent together; ordering across streams is not guaranteed. |
258+
| **Consistency checks** | Decoupled. Can validate the state of any stream, including streams not being written to. | Coupled. Expected state is specified per stream being written to. |
257259

258260
::: warning
259261
Metadata must be a valid JSON object, using string keys and string values only.
@@ -262,6 +264,88 @@ KurrentDB's metadata handling. This restriction will be lifted in the next major
262264
release.
263265
:::
264266

267+
### AppendRecords
268+
269+
::: note
270+
This feature is only available in KurrentDB 26.1 and later.
271+
:::
272+
273+
`appendRecords` appends events to one or more streams atomically. Each record specifies which stream it targets, and the exact order of records is preserved in the global log across all streams.
274+
275+
#### Single stream
276+
277+
The simplest usage appends events to a single stream:
278+
279+
```java
280+
EventData eventOne = EventData
281+
.builderAsJson("OrderPlaced", "{\"orderId\": \"123\"}".getBytes())
282+
.build();
283+
284+
EventData eventTwo = EventData
285+
.builderAsJson("OrderShipped", "{\"orderId\": \"123\"}".getBytes())
286+
.build();
287+
288+
client.appendRecords("order-123", Arrays.asList(eventOne, eventTwo)).get();
289+
```
290+
291+
When no expected state is provided, no consistency check is performed, which is equivalent to `StreamState.any()`.
292+
293+
You can also pass an expected stream state for optimistic concurrency:
294+
295+
```java
296+
client.appendRecords("order-123", StreamState.noStream(), Arrays.asList(eventOne, eventTwo)).get();
297+
```
298+
299+
#### Multiple streams
300+
301+
Use `AppendRecord` to target different streams. Records can be interleaved freely, and the global log preserves the exact order you specify:
302+
303+
```java
304+
List<AppendRecord> records = Arrays.asList(
305+
new AppendRecord("order-stream", EventData
306+
.builderAsJson("OrderCreated", "{\"orderId\": \"123\"}".getBytes())
307+
.build()),
308+
new AppendRecord("inventory-stream", EventData
309+
.builderAsJson("ItemReserved", "{\"itemId\": \"abc\", \"quantity\": 2}".getBytes())
310+
.build()),
311+
new AppendRecord("order-stream", EventData
312+
.builderAsJson("OrderConfirmed", "{\"orderId\": \"123\"}".getBytes())
313+
.build())
314+
);
315+
316+
client.appendRecords(records).get();
317+
```
318+
319+
#### Consistency checks
320+
321+
Consistency checks let you validate the state of any stream, including streams you are not writing to, before the append is committed. All checks are evaluated atomically: if any check fails, the entire operation is rejected and an `AppendConsistencyViolationException` is thrown with details about every failing check and the actual state observed.
322+
323+
```java
324+
List<AppendRecord> records = Collections.singletonList(
325+
new AppendRecord("order-stream", EventData
326+
.builderAsJson("OrderConfirmed", "{\"orderId\": \"123\"}".getBytes())
327+
.build())
328+
);
329+
330+
// ensure the inventory stream exists before confirming the order,
331+
// even though we are not writing to it
332+
List<ConsistencyCheck> checks = Collections.singletonList(
333+
new ConsistencyCheck.StreamStateCheck("inventory-stream", StreamState.streamExists())
334+
);
335+
336+
client.appendRecords(records, checks).get();
337+
```
338+
339+
Because checks are decoupled from writes, you can validate the state of streams you are not writing to, enabling patterns where a business decision depends on the state of multiple streams but the resulting event is written to only one of them.
340+
341+
### MultiStreamAppend
342+
343+
::: note
344+
This feature is only available in KurrentDB 25.1 and later.
345+
:::
346+
347+
`multiStreamAppend` appends events to one or more streams atomically. Records are grouped per stream using `AppendStreamRequest`, where each request specifies a stream name, an expected state, and the events for that stream.
348+
265349
```java
266350
JsonMapper mapper = new JsonMapper();
267351

@@ -293,6 +377,10 @@ List<AppendStreamRequest> requests = Arrays.asList(
293377
)
294378
);
295379

296-
MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get();
380+
MultiStreamAppendResponse result = client.multiStreamAppend(requests.iterator()).get();
297381
```
298382

383+
Each stream can only appear once in the request. The expected state is validated per stream before the transaction is committed.
384+
385+
The result returns the position of the last appended record in the transaction and a collection of responses for each stream.
386+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.kurrent.dbclient;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
6+
/**
7+
* Exception thrown when one or more consistency checks fail during an AppendRecords operation.
8+
* The entire transaction is aborted and no records are written.
9+
*/
10+
public class AppendConsistencyViolationException extends RuntimeException {
11+
private final List<ConsistencyViolation> violations;
12+
13+
/**
14+
* Creates a new AppendConsistencyViolationException.
15+
*
16+
* @param violations the consistency violations that caused the transaction to be aborted.
17+
*/
18+
public AppendConsistencyViolationException(List<ConsistencyViolation> violations) {
19+
super(formatMessage(violations));
20+
this.violations = violations;
21+
}
22+
23+
/**
24+
* Returns the consistency violations that caused the transaction to be aborted.
25+
*/
26+
public List<ConsistencyViolation> getViolations() {
27+
return violations;
28+
}
29+
30+
private static String formatMessage(List<ConsistencyViolation> violations) {
31+
String details = violations.stream()
32+
.map(v -> String.format("[Check %d: Stream '%s' expected state %s, actual state %s]",
33+
v.getCheckIndex(), v.getStream(), v.getExpectedState(), v.getActualState()))
34+
.collect(Collectors.joining(", "));
35+
return "Append failed due to consistency violation(s): " + details;
36+
}
37+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.kurrent.dbclient;
2+
3+
/**
4+
* Represents a record to be appended to a specific stream in an
5+
* {@link KurrentDBClient#appendRecords} operation.
6+
* Each record specifies its own target stream, allowing interleaved writes across multiple streams.
7+
*/
8+
public class AppendRecord {
9+
private final String stream;
10+
private final EventData record;
11+
12+
/**
13+
* Creates a new AppendRecord targeting the specified stream.
14+
*
15+
* @param stream the name of the target stream for this record.
16+
* @param record the event data to append.
17+
*/
18+
public AppendRecord(String stream, EventData record) {
19+
this.stream = stream;
20+
this.record = record;
21+
}
22+
23+
/**
24+
* Returns the name of the target stream.
25+
*/
26+
public String getStream() {
27+
return stream;
28+
}
29+
30+
/**
31+
* Returns the event data to append.
32+
*/
33+
public EventData getRecord() {
34+
return record;
35+
}
36+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package io.kurrent.dbclient;
2+
3+
import com.google.protobuf.Any;
4+
import com.google.protobuf.ByteString;
5+
import com.google.protobuf.Value;
6+
import io.grpc.StatusRuntimeException;
7+
import io.grpc.protobuf.StatusProto;
8+
import io.grpc.stub.ClientCallStreamObserver;
9+
import io.grpc.stub.ClientResponseObserver;
10+
import io.kurrentdb.protocol.v2.streams.AppendRecordsRequest;
11+
import io.kurrentdb.protocol.v2.streams.SchemaInfo;
12+
import io.kurrentdb.protocol.v2.streams.StreamsServiceGrpc;
13+
14+
import io.kurrentdb.protocol.v2.streams.errors.AppendConsistencyViolationErrorDetails;
15+
import io.kurrentdb.protocol.v2.streams.errors.AppendRecordSizeExceededErrorDetails;
16+
import io.kurrentdb.protocol.v2.streams.errors.AppendTransactionSizeExceededErrorDetails;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
class AppendRecords {
24+
private final GrpcClient client;
25+
private final List<AppendRecord> records;
26+
private final List<ConsistencyCheck> checks;
27+
28+
public AppendRecords(GrpcClient client, List<AppendRecord> records, List<ConsistencyCheck> checks) {
29+
this.client = client;
30+
this.records = records;
31+
this.checks = checks;
32+
}
33+
34+
public CompletableFuture<AppendRecordsResponse> execute() {
35+
if (records.isEmpty()) {
36+
CompletableFuture<AppendRecordsResponse> result = new CompletableFuture<>();
37+
result.completeExceptionally(new IllegalArgumentException("At least one record is required."));
38+
return result;
39+
}
40+
41+
return this.client.runWithArgs(args -> ClientTelemetry.traceAppendRecords(
42+
this::appendRecords,
43+
args,
44+
this.records,
45+
this.client.getSettings()));
46+
}
47+
48+
private CompletableFuture<AppendRecordsResponse> appendRecords(WorkItemArgs args, List<AppendRecord> records) {
49+
CompletableFuture<AppendRecordsResponse> result = new CompletableFuture<>();
50+
51+
if (!args.supportFeature(FeatureFlags.APPEND_RECORDS)) {
52+
result.completeExceptionally(new UnsupportedOperationException(
53+
"AppendRecords is not supported by the server. Requires KurrentDB 26.1 or later."));
54+
return result;
55+
}
56+
57+
StreamsServiceGrpc.StreamsServiceStub stub = GrpcUtils.configureStub(
58+
StreamsServiceGrpc.newStub(args.getChannel()),
59+
this.client.getSettings(),
60+
new OptionsBase<>(),
61+
null,
62+
false);
63+
64+
try {
65+
AppendRecordsRequest.Builder requestBuilder = AppendRecordsRequest.newBuilder();
66+
67+
for (AppendRecord record : records) {
68+
EventData event = record.getRecord();
69+
70+
io.kurrentdb.protocol.v2.streams.AppendRecord.Builder recordBuilder =
71+
io.kurrentdb.protocol.v2.streams.AppendRecord.newBuilder()
72+
.setStream(record.getStream())
73+
.setData(ByteString.copyFrom(event.getEventData()))
74+
.setRecordId(event.getEventId().toString())
75+
.setSchema(SchemaInfo.newBuilder()
76+
.setFormat(ContentTypeMapper.toSchemaDataFormat(event.getContentType()))
77+
.setName(event.getEventType()));
78+
79+
if (event.getUserMetadata() != null) {
80+
Map<String, Value> properties = DynamicValueMapper.mapJsonToValueMap(event.getUserMetadata());
81+
recordBuilder.putAllProperties(properties);
82+
}
83+
84+
requestBuilder.addRecords(recordBuilder.build());
85+
}
86+
87+
if (checks != null) {
88+
for (ConsistencyCheck check : checks) {
89+
if (check instanceof ConsistencyCheck.StreamStateCheck) {
90+
ConsistencyCheck.StreamStateCheck stateCheck = (ConsistencyCheck.StreamStateCheck) check;
91+
requestBuilder.addChecks(
92+
io.kurrentdb.protocol.v2.streams.ConsistencyCheck.newBuilder()
93+
.setStreamState(
94+
io.kurrentdb.protocol.v2.streams.ConsistencyCheck.StreamStateCheck.newBuilder()
95+
.setStream(stateCheck.getStream())
96+
.setExpectedState(stateCheck.getExpectedState().toRawLong())
97+
.build())
98+
.build());
99+
}
100+
}
101+
}
102+
103+
stub.appendRecords(requestBuilder.build(), new AppendRecordsObserver(result));
104+
} catch (RuntimeException e) {
105+
result.completeExceptionally(e);
106+
}
107+
108+
return result;
109+
}
110+
111+
private AppendRecordsResponse onResponse(io.kurrentdb.protocol.v2.streams.AppendRecordsResponse response) {
112+
List<AppendResponse> results = new ArrayList<>(response.getRevisionsCount());
113+
114+
for (io.kurrentdb.protocol.v2.streams.StreamRevision revision : response.getRevisionsList()) {
115+
results.add(new AppendResponse(revision.getStream(), revision.getRevision()));
116+
}
117+
118+
return new AppendRecordsResponse(response.getPosition(), results);
119+
}
120+
121+
private class AppendRecordsObserver implements ClientResponseObserver<AppendRecordsRequest, io.kurrentdb.protocol.v2.streams.AppendRecordsResponse> {
122+
private final CompletableFuture<AppendRecordsResponse> result;
123+
124+
public AppendRecordsObserver(CompletableFuture<AppendRecordsResponse> result) {
125+
this.result = result;
126+
}
127+
128+
@Override
129+
public void beforeStart(ClientCallStreamObserver<AppendRecordsRequest> requestStream) {
130+
}
131+
132+
@Override
133+
public void onNext(io.kurrentdb.protocol.v2.streams.AppendRecordsResponse response) {
134+
try {
135+
AppendRecordsResponse converted = onResponse(response);
136+
result.complete(converted);
137+
} catch (Throwable e) {
138+
result.completeExceptionally(e);
139+
}
140+
}
141+
142+
@Override
143+
public void onError(Throwable t) {
144+
if (GrpcUtils.handleNotLeaderError(t, result)) return;
145+
146+
if (t instanceof StatusRuntimeException) {
147+
StatusRuntimeException e = (StatusRuntimeException) t;
148+
com.google.rpc.Status status = StatusProto.fromThrowable(e);
149+
150+
if (status != null && status.getDetailsCount() > 0) {
151+
for (Any d : status.getDetailsList()) {
152+
try {
153+
if (d.is(AppendConsistencyViolationErrorDetails.class)) {
154+
AppendConsistencyViolationErrorDetails details =
155+
d.unpack(AppendConsistencyViolationErrorDetails.class);
156+
List<ConsistencyViolation> violations = new ArrayList<>();
157+
158+
for (io.kurrentdb.protocol.v2.streams.errors.ConsistencyViolation v : details.getViolationsList()) {
159+
if (v.hasStreamState()) {
160+
io.kurrentdb.protocol.v2.streams.errors.ConsistencyViolation.StreamStateViolation ss =
161+
v.getStreamState();
162+
violations.add(new ConsistencyViolation(
163+
v.getCheckIndex(),
164+
ss.getStream(),
165+
StreamState.fromRawLong(ss.getExpectedState()),
166+
StreamState.fromRawLong(ss.getActualState())));
167+
}
168+
}
169+
170+
result.completeExceptionally(new AppendConsistencyViolationException(violations));
171+
return;
172+
} else if (d.is(AppendRecordSizeExceededErrorDetails.class)) {
173+
AppendRecordSizeExceededErrorDetails details =
174+
d.unpack(AppendRecordSizeExceededErrorDetails.class);
175+
result.completeExceptionally(new RecordSizeExceededException(
176+
details.getStream(), details.getRecordId(),
177+
details.getSize(), details.getMaxSize()));
178+
return;
179+
} else if (d.is(AppendTransactionSizeExceededErrorDetails.class)) {
180+
AppendTransactionSizeExceededErrorDetails details =
181+
d.unpack(AppendTransactionSizeExceededErrorDetails.class);
182+
result.completeExceptionally(new TransactionMaxSizeExceededException(
183+
details.getSize(), details.getMaxSize()));
184+
return;
185+
}
186+
} catch (com.google.protobuf.InvalidProtocolBufferException ex) {
187+
result.completeExceptionally(ex);
188+
return;
189+
}
190+
}
191+
}
192+
}
193+
194+
result.completeExceptionally(t);
195+
}
196+
197+
@Override
198+
public void onCompleted() {
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)