Skip to content

Commit c1d9185

Browse files
committed
Adjust SSE for jsonrpc
1 parent c152b03 commit c1d9185

2 files changed

Lines changed: 13 additions & 0 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,14 @@ public void onNext(String sseEvent) {
784784
if (headers.get(CONTENT_TYPE) == null) {
785785
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
786786
}
787+
// Additional SSE headers to prevent buffering
788+
headers.set("Cache-Control", "no-cache");
789+
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
787790
response.setChunked(true);
791+
792+
// CRITICAL: Disable write queue max size to prevent buffering
793+
// Vert.x buffers writes by default - we need immediate flushing for SSE
794+
response.setWriteQueueMaxSize(1); // Force immediate flush
788795
}
789796

790797
// Write SSE-formatted string to response

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,12 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
242242
boolean writeCompleted = finalEventWriteLatch.await(2, TimeUnit.SECONDS);
243243
if (writeCompleted) {
244244
LOGGER.debug("Final event write confirmed via backpressure signal");
245+
// Additional safety delay: backpressure callback is shared across all events,
246+
// so it might fire for a previous event after we set the flag. Also, even
247+
// with setWriteQueueMaxSize(1), Vert.x may have additional buffering.
248+
// 50ms ensures the final event is truly transmitted to the network.
249+
Thread.sleep(50);
250+
LOGGER.debug("Safety buffer flush delay completed");
245251
} else {
246252
LOGGER.warn("Timeout waiting for final event write confirmation - proceeding anyway");
247253
}

0 commit comments

Comments
 (0)