Skip to content

Commit 60ed504

Browse files
committed
fix: prevent premature stream closure in EventConsumer grace period
The EventConsumer grace period logic could close streams prematurely when final events were still in-transit through MainEventBusProcessor. This manifested as intermittent test failures where the stream would close before all events were delivered. Root Cause: - When agent execution completes, EventConsumer enters a grace period - It polls the ChildQueue with 500ms timeout, allowing 3 consecutive timeouts (1.5s total) before closing the stream - The original logic only checked queue.size() == 0 - However, final events can be in-transit: MainQueue → MainEventBus → MainEventBusProcessor → ChildQueue - This timing window (typically <500ms) could result in premature closure when the local queue was empty but the final event hadn't arrived yet Solution: - Added EventQueue.isAwaitingFinalEvent() method - MainQueue calls child.expectFinalEvent() when enqueueing final events - EventConsumer checks awaitingFinalEvent flag before starting timeout counter: agentCompleted && queueSize == 0 && !awaitingFinal - ChildQueue clears the flag only when a FINAL event is dequeued (not on any event, to avoid clearing it too early when non-final events arrive) - This ensures the grace period doesn't start counting down while a final event is still being distributed The fix handles both local execution (events available immediately) and replicated scenarios (events may arrive via Kafka with delays).
1 parent f668b77 commit 60ed504

2 files changed

Lines changed: 42 additions & 7 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
8282
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
8383
if (item == null) {
8484
int queueSize = queue.size();
85-
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}",
86-
agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted);
85+
boolean awaitingFinal = queue.isAwaitingFinalEvent();
86+
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}",
87+
agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted);
8788
// If agent completed, a poll timeout means no more events are coming
8889
// MainEventBusProcessor has 500ms to distribute events from MainEventBus
8990
// If we timeout with agentCompleted=true, all events have been distributed
@@ -94,8 +95,12 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
9495
// CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED)
9596
// Per A2A spec, interrupted states are NOT terminal - the stream must stay open
9697
// for future state updates even after agent completes (agent will be re-invoked later).
98+
//
99+
// CRITICAL: Don't start timeout counter if we're awaiting a final event.
100+
// The awaitingFinalEvent flag is set when MainQueue enqueues a final event
101+
// but it hasn't been distributed to this ChildQueue yet.
97102
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
98-
if (agentCompleted && queueSize == 0 && !isInterruptedState) {
103+
if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) {
99104
pollTimeoutsAfterAgentCompleted++;
100105
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
101106
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
@@ -112,10 +117,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
112117
LOGGER.debug("Agent completed but task is in interrupted state ({}), stream must remain open (queue={})",
113118
lastSeenTaskState, System.identityHashCode(queue));
114119
pollTimeoutsAfterAgentCompleted = 0; // Reset counter
115-
} else if (agentCompleted && queueSize > 0) {
116-
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
117-
queueSize, System.identityHashCode(queue));
118-
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive
120+
} else if (agentCompleted && (queueSize > 0 || awaitingFinal)) {
121+
LOGGER.debug("Agent completed but queue has {} pending events or awaitingFinalEvent={}, resetting timeout counter and continuing to poll (queue={})",
122+
queueSize, awaitingFinal, System.identityHashCode(queue));
123+
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive or awaiting final
119124
}
120125
continue;
121126
}

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,25 @@ public void taskDone() {
304304
*/
305305
public abstract int size();
306306

307+
/**
308+
* Returns whether this queue is awaiting a final event to be delivered.
309+
* <p>
310+
* This is used by EventConsumer to determine if it should keep polling even when
311+
* the queue is empty. A final event may still be in-transit through MainEventBusProcessor.
312+
* </p>
313+
* <p>
314+
* For MainQueue: always returns false (MainQueue cannot be consumed).
315+
* For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called
316+
* but the final event hasn't been received yet.
317+
* </p>
318+
*
319+
* @return true if awaiting a final event, false otherwise
320+
*/
321+
public boolean isAwaitingFinalEvent() {
322+
// Default implementation - overridden by ChildQueue
323+
return false;
324+
}
325+
307326
/**
308327
* Closes this event queue gracefully, allowing pending events to be consumed.
309328
*/
@@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
735754
if (item != null) {
736755
Event event = item.getEvent();
737756
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
757+
// Clear the awaiting flag only if this is a final event
758+
// This allows EventConsumer grace period logic to proceed correctly
759+
if (awaitingFinalEvent && isFinalEvent(event)) {
760+
awaitingFinalEvent = false;
761+
LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this));
762+
}
738763
} else {
739764
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
740765
}
@@ -757,6 +782,11 @@ public int size() {
757782
return queue.size();
758783
}
759784

785+
@Override
786+
public boolean isAwaitingFinalEvent() {
787+
return awaitingFinalEvent;
788+
}
789+
760790
@Override
761791
public void awaitQueuePollerStart() throws InterruptedException {
762792
parent.awaitQueuePollerStart();

0 commit comments

Comments
 (0)