Skip to content

Commit 42b6e82

Browse files
committed
Write SSE comment before closing stream
1 parent c1d9185 commit 42b6e82

3 files changed

Lines changed: 32 additions & 8 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,23 @@ public void onComplete() {
824824
if (headers.get(CONTENT_TYPE) == null) {
825825
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
826826
}
827+
response.end();
828+
} else {
829+
// Events were written - send final SSE comment to ensure flush before closing
830+
// Since SSE writes are sequential, when this comment's write completes, we know
831+
// all previous events (including the final event) have been written to the socket.
832+
// This provides proper synchronization without arbitrary delays.
833+
response.write(": stream closing\n\n", ar -> {
834+
if (ar.succeeded()) {
835+
logger.debug("Final SSE comment written, closing stream");
836+
response.end();
837+
} else {
838+
// Write failed (likely client disconnect) - still try to close cleanly
839+
logger.debug("Final SSE comment write failed, closing stream anyway", ar.cause());
840+
response.end();
841+
}
842+
});
827843
}
828-
response.end();
829844
}
830845
});
831846
}

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,8 +951,23 @@ public void onComplete() {
951951
if (headers.get(CONTENT_TYPE) == null) {
952952
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
953953
}
954+
response.end();
955+
} else {
956+
// Events were written - send final SSE comment to ensure flush before closing
957+
// Since SSE writes are sequential, when this comment's write completes, we know
958+
// all previous events (including the final event) have been written to the socket.
959+
// This provides proper synchronization without arbitrary delays.
960+
response.write(": stream closing\n\n", ar -> {
961+
if (ar.succeeded()) {
962+
logger.debug("Final SSE comment written, closing stream");
963+
response.end();
964+
} else {
965+
// Write failed (likely client disconnect) - still try to close cleanly
966+
logger.debug("Final SSE comment write failed, closing stream anyway", ar.cause());
967+
response.end();
968+
}
969+
});
954970
}
955-
response.end();
956971
}
957972
});
958973
}

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,6 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
242242
boolean writeCompleted = finalEventWriteLatch.await(2, TimeUnit.SECONDS);
243243
if (writeCompleted) {
244244
LOGGER.debug("Final event write confirmed via backpressure signal");
245-
// Additional safety delay: backpressure callback is shared across all events,
246-
// so it might fire for a previous event after we set the flag. Also, even
247-
// with setWriteQueueMaxSize(1), Vert.x may have additional buffering.
248-
// 50ms ensures the final event is truly transmitted to the network.
249-
Thread.sleep(50);
250-
LOGGER.debug("Safety buffer flush delay completed");
251245
} else {
252246
LOGGER.warn("Timeout waiting for final event write confirmation - proceeding anyway");
253247
}

0 commit comments

Comments
 (0)