Skip to content

Commit 41cdccf

Browse files
authored
Merge branch 'main' into issue_765
2 parents d57e2a8 + d3f88cb commit 41cdccf

12 files changed

Lines changed: 422 additions & 257 deletions

File tree

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,15 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
299299
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
300300
.end(serializeResponse(error));
301301
} else if (streaming) {
302-
final Multi<? extends A2AResponse<?>> finalStreamingResponse = streamingResponse;
303-
executor.execute(() -> {
304-
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
305-
AtomicLong eventIdCounter = new AtomicLong(0);
306-
Multi<String> sseEvents = finalStreamingResponse
307-
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
308-
// Write SSE-formatted strings to HTTP response
309-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
310-
});
302+
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
303+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
304+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
305+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
306+
AtomicLong eventIdCounter = new AtomicLong(0);
307+
Multi<String> sseEvents = streamingResponse
308+
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
309+
// Write SSE-formatted strings to HTTP response
310+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
311311

312312
} else {
313313
rc.response()
@@ -783,7 +783,17 @@ public void onNext(String sseEvent) {
783783
if (headers.get(CONTENT_TYPE) == null) {
784784
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
785785
}
786+
// Additional SSE headers to prevent buffering
787+
headers.set("Cache-Control", "no-cache");
788+
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
786789
response.setChunked(true);
790+
791+
// CRITICAL: Disable write queue max size to prevent buffering
792+
// Vert.x buffers writes by default - we need immediate flushing for SSE
793+
response.setWriteQueueMaxSize(1);
794+
795+
// Send initial SSE comment to kickstart the stream
796+
response.write(": SSE stream started\n\n");
787797
}
788798

789799
// Write SSE-formatted string to response

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,15 @@ public void sendMessageStreaming(@Body String body, RoutingContext rc) {
221221
if (error != null) {
222222
sendResponse(rc, error);
223223
} else if (streamingResponse != null) {
224-
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
225-
executor.execute(() -> {
226-
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
227-
AtomicLong eventIdCounter = new AtomicLong(0);
228-
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
229-
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
230-
// Write SSE-formatted strings to HTTP response
231-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
232-
});
224+
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
225+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
226+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
227+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
228+
AtomicLong eventIdCounter = new AtomicLong(0);
229+
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
230+
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
231+
// Write SSE-formatted strings to HTTP response
232+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
233233
}
234234
}
235235
}
@@ -431,15 +431,15 @@ public void subscribeToTask(RoutingContext rc) {
431431
if (error != null) {
432432
sendResponse(rc, error);
433433
} else if (streamingResponse != null) {
434-
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
435-
executor.execute(() -> {
436-
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
437-
AtomicLong eventIdCounter = new AtomicLong(0);
438-
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
439-
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
440-
// Write SSE-formatted strings to HTTP response
441-
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
442-
});
434+
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
435+
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
436+
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
437+
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
438+
AtomicLong eventIdCounter = new AtomicLong(0);
439+
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
440+
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
441+
// Write SSE-formatted strings to HTTP response
442+
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
443443
}
444444
}
445445
}

0 commit comments

Comments
 (0)