Skip to content

Commit 378970b

Browse files
kabirclaude
andcommitted
fix: resolve intermittent stream closure failures in EventConsumer
Fixes two separate race conditions causing intermittent test failures: 1. Grace period closing streams before final events arrive - EventConsumer now checks awaitingFinalEvent flag before starting timeout counter, preventing premature closure when final events are in-transit through MainEventBusProcessor - Increased grace period timeout to 150ms for CI environment latency 2. Client auto-close IOException not filtered in testAuthRequiredWorkflow - AbstractSSEEventListener cancels SSE streams on final events (by design) - This generates "Stream cancelled" IOException that must be filtered - testAuthRequiredWorkflow was the only test missing this filter Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent c8ba7df commit 378970b

3 files changed

Lines changed: 85 additions & 7 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class EventConsumer {
2424
private volatile boolean agentCompleted = false;
2525
private volatile int pollTimeoutsAfterAgentCompleted = 0;
2626
private volatile @Nullable TaskState lastSeenTaskState = null;
27+
private volatile int pollTimeoutsWhileAwaitingFinal = 0;
2728

2829
private static final String ERROR_MSG = "Agent did not return any response";
2930
private static final int NO_WAIT = -1;
@@ -32,6 +33,10 @@ public class EventConsumer {
3233
// Grace period allows Kafka replication to deliver late-arriving events
3334
// 3 timeouts * 500ms = 1500ms grace period for replication delays
3435
private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3;
36+
// Maximum time to wait for final event when awaitingFinalEvent is set
37+
// If event doesn't arrive after this many timeouts, assume it won't arrive
38+
// 6 timeouts * 500ms = 3000ms maximum wait for final event arrival
39+
private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6;
3540

3641
public EventConsumer(EventQueue queue) {
3742
this.queue = queue;
@@ -82,8 +87,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
8287
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
8388
if (item == null) {
8489
int queueSize = queue.size();
85-
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}",
86-
agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted);
90+
boolean awaitingFinal = queue.isAwaitingFinalEvent();
91+
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}",
92+
agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal);
8793
// If agent completed, a poll timeout means no more events are coming
8894
// MainEventBusProcessor has 500ms to distribute events from MainEventBus
8995
// If we timeout with agentCompleted=true, all events have been distributed
@@ -94,8 +100,31 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
94100
// CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED)
95101
// Per A2A spec, interrupted states are NOT terminal - the stream must stay open
96102
// for future state updates even after agent completes (agent will be re-invoked later).
103+
//
104+
// CRITICAL: Don't start timeout counter if we're awaiting a final event.
105+
// The awaitingFinalEvent flag is set when MainQueue enqueues a final event
106+
// but it hasn't been distributed to this ChildQueue yet.
107+
// HOWEVER: If we've been waiting too long for the final event (>3s), give up and
108+
// proceed with normal timeout logic to prevent infinite waiting.
97109
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
98-
if (agentCompleted && queueSize == 0 && !isInterruptedState) {
110+
111+
// Track how long we've been waiting for the final event
112+
if (awaitingFinal && queueSize == 0) {
113+
pollTimeoutsWhileAwaitingFinal++;
114+
if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) {
115+
LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})",
116+
pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue));
117+
// Clear the flag on the queue itself, not just the local variable
118+
if (queue instanceof EventQueue.ChildQueue) {
119+
((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent();
120+
}
121+
awaitingFinal = false; // Also update local variable for this iteration
122+
}
123+
} else {
124+
pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting
125+
}
126+
127+
if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) {
99128
pollTimeoutsAfterAgentCompleted++;
100129
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
101130
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
@@ -116,11 +145,16 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
116145
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
117146
queueSize, System.identityHashCode(queue));
118147
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive
148+
} else if (agentCompleted && awaitingFinal) {
149+
LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})",
150+
pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue));
151+
pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final
119152
}
120153
continue;
121154
}
122-
// Event received - reset timeout counter
155+
// Event received - reset timeout counters
123156
pollTimeoutsAfterAgentCompleted = 0;
157+
pollTimeoutsWhileAwaitingFinal = 0;
124158
event = item.getEvent();
125159
LOGGER.debug("EventConsumer received event: {} (queue={})",
126160
event.getClass().getSimpleName(), System.identityHashCode(queue));
@@ -179,10 +213,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
179213
// the stream-end signal can reach the client BEFORE the buffered final event,
180214
// causing the client to close the connection and never receive the final event.
181215
// This is especially important in replicated scenarios where events arrive via Kafka
182-
// and timing is less deterministic. A small delay ensures the buffer flushes.
216+
// and timing is less deterministic. A delay ensures the buffer flushes.
217+
// Increased to 150ms to account for CI environment latency and JVM scheduling delays.
183218
if (isFinalSent) {
184219
try {
185-
Thread.sleep(50); // 50ms to allow SSE buffer flush
220+
Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments
186221
} catch (InterruptedException e) {
187222
Thread.currentThread().interrupt();
188223
}

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,25 @@ public void taskDone() {
304304
*/
305305
public abstract int size();
306306

307+
/**
308+
* Returns whether this queue is awaiting a final event to be delivered.
309+
* <p>
310+
* This is used by EventConsumer to determine if it should keep polling even when
311+
* the queue is empty. A final event may still be in-transit through MainEventBusProcessor.
312+
* </p>
313+
* <p>
314+
* For MainQueue: always returns false (MainQueue cannot be consumed).
315+
* For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called
316+
* but the final event hasn't been received yet.
317+
* </p>
318+
*
319+
* @return true if awaiting a final event, false otherwise
320+
*/
321+
public boolean isAwaitingFinalEvent() {
322+
// Default implementation - overridden by ChildQueue
323+
return false;
324+
}
325+
307326
/**
308327
* Closes this event queue gracefully, allowing pending events to be consumed.
309328
*/
@@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
735754
if (item != null) {
736755
Event event = item.getEvent();
737756
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
757+
// Clear the awaiting flag only if this is a final event
758+
// This allows EventConsumer grace period logic to proceed correctly
759+
if (awaitingFinalEvent && isFinalEvent(event)) {
760+
awaitingFinalEvent = false;
761+
LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this));
762+
}
738763
} else {
739764
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
740765
}
@@ -757,6 +782,11 @@ public int size() {
757782
return queue.size();
758783
}
759784

785+
@Override
786+
public boolean isAwaitingFinalEvent() {
787+
return awaitingFinalEvent;
788+
}
789+
760790
@Override
761791
public void awaitQueuePollerStart() throws InterruptedException {
762792
parent.awaitQueuePollerStart();
@@ -790,6 +820,15 @@ void expectFinalEvent() {
790820
LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this));
791821
}
792822

823+
/**
824+
* Called by EventConsumer when it has waited too long for the final event.
825+
* This allows normal timeout logic to proceed if the final event never arrives.
826+
*/
827+
void clearAwaitingFinalEvent() {
828+
awaitingFinalEvent = false;
829+
LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this));
830+
}
831+
793832
@Override
794833
public void close() {
795834
close(false);

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1921,7 +1921,11 @@ public void testAuthRequiredWorkflow() throws Exception {
19211921
}
19221922
};
19231923

1924-
Consumer<Throwable> errorHandler = errorRef::set;
1924+
Consumer<Throwable> errorHandler = error -> {
1925+
if (!isStreamClosedError(error)) {
1926+
errorRef.set(error);
1927+
}
1928+
};
19251929

19261930
// Wait for subscription to be established
19271931
CountDownLatch subscriptionLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)