Skip to content

Commit 7f22b8f

Browse files
kabirclaude
andcommitted
fix: add event counting to prevent premature stream closure
Track sent vs acknowledged events to ensure backpressure callback fires only for final event. Add 100ms timeout fallback in onComplete(). Remove REST SSE kickstart comment to match JSONRPC. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 42b6e82 commit 7f22b8f

3 files changed

Lines changed: 55 additions & 24 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -827,16 +827,27 @@ public void onComplete() {
827827
response.end();
828828
} else {
829829
// Events were written - send final SSE comment to ensure flush before closing
830-
// Since SSE writes are sequential, when this comment's write completes, we know
831-
// all previous events (including the final event) have been written to the socket.
832-
// This provides proper synchronization without arbitrary delays.
833-
response.write(": stream closing\n\n", ar -> {
834-
if (ar.succeeded()) {
835-
logger.debug("Final SSE comment written, closing stream");
830+
// Use hybrid approach: try to wait for write completion, but timeout if client already closed
831+
java.util.concurrent.atomic.AtomicBoolean ended = new java.util.concurrent.atomic.AtomicBoolean(false);
832+
833+
// Set a timeout to ensure we close within 100ms even if callback never fires
834+
// (client may have already closed due to auto-close on final event)
835+
long timerId = rc.vertx().setTimer(100, id -> {
836+
if (ended.compareAndSet(false, true)) {
837+
logger.debug("Timeout waiting for final SSE comment, closing stream");
836838
response.end();
837-
} else {
838-
// Write failed (likely client disconnect) - still try to close cleanly
839-
logger.debug("Final SSE comment write failed, closing stream anyway", ar.cause());
839+
}
840+
});
841+
842+
// Try to write the final comment - if it succeeds, we close immediately
843+
response.write(": stream closing\n\n", ar -> {
844+
if (ended.compareAndSet(false, true)) {
845+
rc.vertx().cancelTimer(timerId);
846+
if (ar.succeeded()) {
847+
logger.debug("Final SSE comment written, closing stream");
848+
} else {
849+
logger.debug("Final SSE comment write failed, closing stream anyway");
850+
}
840851
response.end();
841852
}
842853
});

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -912,10 +912,6 @@ public void onNext(String sseEvent) {
912912
// CRITICAL: Disable write queue max size to prevent buffering
913913
// Vert.x buffers writes by default - we need immediate flushing for SSE
914914
response.setWriteQueueMaxSize(1); // Force immediate flush
915-
916-
// Send initial SSE comment to kickstart the stream
917-
// This forces Vert.x to send headers and start the stream immediately
918-
response.write(": SSE stream started\n\n");
919915
}
920916

921917
// Write SSE-formatted string to response
@@ -954,16 +950,27 @@ public void onComplete() {
954950
response.end();
955951
} else {
956952
// Events were written - send final SSE comment to ensure flush before closing
957-
// Since SSE writes are sequential, when this comment's write completes, we know
958-
// all previous events (including the final event) have been written to the socket.
959-
// This provides proper synchronization without arbitrary delays.
960-
response.write(": stream closing\n\n", ar -> {
961-
if (ar.succeeded()) {
962-
logger.debug("Final SSE comment written, closing stream");
953+
// Use hybrid approach: try to wait for write completion, but timeout if client already closed
954+
java.util.concurrent.atomic.AtomicBoolean ended = new java.util.concurrent.atomic.AtomicBoolean(false);
955+
956+
// Set a timeout to ensure we close within 100ms even if callback never fires
957+
// (client may have already closed due to auto-close on final event)
958+
long timerId = rc.vertx().setTimer(100, id -> {
959+
if (ended.compareAndSet(false, true)) {
960+
logger.debug("Timeout waiting for final SSE comment, closing stream");
963961
response.end();
964-
} else {
965-
// Write failed (likely client disconnect) - still try to close cleanly
966-
logger.debug("Final SSE comment write failed, closing stream anyway", ar.cause());
962+
}
963+
});
964+
965+
// Try to write the final comment - if it succeeds, we close immediately
966+
response.write(": stream closing\n\n", ar -> {
967+
if (ended.compareAndSet(false, true)) {
968+
rc.vertx().cancelTimer(timerId);
969+
if (ar.succeeded()) {
970+
logger.debug("Final SSE comment written, closing stream");
971+
} else {
972+
logger.debug("Final SSE comment write failed, closing stream anyway");
973+
}
967974
response.end();
968975
}
969976
});

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.concurrent.Flow;
55
import java.util.concurrent.TimeUnit;
66
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.atomic.AtomicInteger;
78

89
import io.a2a.spec.A2AError;
910
import io.a2a.spec.A2AServerException;
@@ -65,12 +66,21 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
6566
// Used to wait for subscriber to finish writing final event before calling tube.complete()
6667
final CountDownLatch finalEventWriteLatch = new CountDownLatch(1);
6768
final AtomicBoolean waitingForFinalEventWrite = new AtomicBoolean(false);
69+
final AtomicInteger eventsSent = new AtomicInteger(0);
70+
final AtomicInteger eventsAcknowledged = new AtomicInteger(0);
6871

6972
// Register callback to detect when subscriber finishes writing events
7073
// The subscriber (A2AServerRoutes) calls upstream.request(1) after async write completes
74+
// Track event count to ensure we only count down when the FINAL event's write completes,
75+
// not when earlier events' pending callbacks fire
7176
tube.whenRequested(requested -> {
72-
if (waitingForFinalEventWrite.get()) {
73-
LOGGER.debug("Subscriber requested {} items after final event - async write completed", requested);
77+
int ackCount = eventsAcknowledged.incrementAndGet();
78+
int sent = eventsSent.get();
79+
boolean waiting = waitingForFinalEventWrite.get();
80+
LOGGER.debug("whenRequested callback: requested={}, ack={}, sent={}, waiting={}",
81+
requested, ackCount, sent, waiting);
82+
if (waiting && ackCount >= sent) {
83+
LOGGER.debug("Subscriber acknowledged final event ({}/{}) - async write completed", ackCount, sent);
7484
finalEventWriteLatch.countDown();
7585
}
7686
});
@@ -220,8 +230,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
220230
// Set flag BEFORE sending to ensure whenRequested callback can detect final event
221231
if (isFinalEvent) {
222232
waitingForFinalEventWrite.set(true);
233+
LOGGER.debug("Sending final event, waitingForFinalEventWrite=true");
223234
}
224235
tube.send(item);
236+
int newCount = eventsSent.incrementAndGet();
237+
LOGGER.debug("Event sent: eventsSent={}, isFinal={}", newCount, isFinalEvent);
225238
isFinalSent = isFinalEvent;
226239
}
227240

0 commit comments

Comments
 (0)