Skip to content

Commit c152b03

Browse files
committed
Use backpressure to see when final event is emitted
1 parent d823afb commit c152b03

1 file changed

Lines changed: 36 additions & 8 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.a2a.server.events;
22

3+
import java.util.concurrent.CountDownLatch;
34
import java.util.concurrent.Flow;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicBoolean;
47

58
import io.a2a.spec.A2AError;
69
import io.a2a.spec.A2AServerException;
@@ -57,6 +60,21 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
5760
.withBufferSize(256);
5861
return ZeroPublisher.create(conf, tube -> {
5962
boolean completed = false;
63+
64+
// Synchronization mechanism for final event write completion
65+
// Used to wait for subscriber to finish writing final event before calling tube.complete()
66+
final CountDownLatch finalEventWriteLatch = new CountDownLatch(1);
67+
final AtomicBoolean waitingForFinalEventWrite = new AtomicBoolean(false);
68+
69+
// Register callback to detect when subscriber finishes writing events
70+
// The subscriber (A2AServerRoutes) calls upstream.request(1) after async write completes
71+
tube.whenRequested(requested -> {
72+
if (waitingForFinalEventWrite.get()) {
73+
LOGGER.debug("Subscriber requested {} items after final event - async write completed", requested);
74+
finalEventWriteLatch.countDown();
75+
}
76+
});
77+
6078
try {
6179
while (true) {
6280
// Check if cancelled by client disconnect
@@ -199,6 +217,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
199217
// and should not be exposed to API consumers
200218
boolean isFinalSent = false;
201219
if (!(event instanceof QueueClosedEvent)) {
220+
// Set flag BEFORE sending to ensure whenRequested callback can detect final event
221+
if (isFinalEvent) {
222+
waitingForFinalEventWrite.set(true);
223+
}
202224
tube.send(item);
203225
isFinalSent = isFinalEvent;
204226
}
@@ -208,18 +230,24 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
208230
queue.close();
209231
LOGGER.debug("Queue closed, breaking loop for queue {}", System.identityHashCode(queue));
210232

211-
// CRITICAL: Allow tube buffer to flush before calling tube.complete()
212-
// tube.send() buffers events asynchronously. If we call tube.complete() immediately,
213-
// the stream-end signal can reach the client BEFORE the buffered final event,
214-
// causing the client to close the connection and never receive the final event.
215-
// This is especially important in replicated scenarios where events arrive via Kafka
216-
// and timing is less deterministic. A delay ensures the buffer flushes.
217-
// Increased to 150ms to account for CI environment latency and JVM scheduling delays.
233+
// CRITICAL: Wait for subscriber to process final event before calling tube.complete()
234+
// The subscriber (A2AServerRoutes) writes events asynchronously via response.write().
235+
// Only after the async write completes does it call upstream.request(1), which triggers
236+
// our whenRequested callback. By waiting for this callback, we ensure the final event
237+
// has been written to the HTTP socket before we call tube.complete(), preventing the
238+
// "Stream cancelled" error where the stream-end signal reaches the client before the
239+
// buffered final event.
218240
if (isFinalSent) {
219241
try {
220-
Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments
242+
boolean writeCompleted = finalEventWriteLatch.await(2, TimeUnit.SECONDS);
243+
if (writeCompleted) {
244+
LOGGER.debug("Final event write confirmed via backpressure signal");
245+
} else {
246+
LOGGER.warn("Timeout waiting for final event write confirmation - proceeding anyway");
247+
}
221248
} catch (InterruptedException e) {
222249
Thread.currentThread().interrupt();
250+
LOGGER.warn("Interrupted while waiting for final event write confirmation");
223251
}
224252
}
225253
break;

0 commit comments

Comments
 (0)