Skip to content

Commit 29d1c3d

Browse files
committed
fix: prevent premature stream closure in EventConsumer grace period
The EventConsumer grace period logic could close streams prematurely when final events were still in-transit through MainEventBusProcessor. This manifested as intermittent test failures where the stream would close before all events were delivered. Root Cause: - When agent execution completes, EventConsumer enters a grace period - It polls the ChildQueue with 500ms timeout, allowing 3 consecutive timeouts (1.5s total) before closing the stream - The original logic only checked queue.size() == 0 - However, final events can be in-transit: MainQueue → MainEventBus → MainEventBusProcessor → ChildQueue - This timing window (typically <500ms) could result in premature closure when the local queue was empty but the final event hadn't arrived yet Solution: - Added EventQueue.isAwaitingFinalEvent() method - MainQueue calls child.expectFinalEvent() when enqueueing final events - EventConsumer checks awaitingFinalEvent flag before starting timeout counter: agentCompleted && queueSize == 0 && !awaitingFinal - ChildQueue clears the flag only when a FINAL event is dequeued (not on any event, to avoid clearing it too early when non-final events arrive) - This ensures the grace period doesn't start counting down while a final event is still being distributed The fix handles both local execution (events available immediately) and replicated scenarios (events may arrive via Kafka with delays).
1 parent fd33b3d commit 29d1c3d

2 files changed

Lines changed: 42 additions & 7 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,20 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
8181
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
8282
if (item == null) {
8383
int queueSize = queue.size();
84-
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}",
85-
agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted);
84+
boolean awaitingFinal = queue.isAwaitingFinalEvent();
85+
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}",
86+
agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted);
8687
// If agent completed, a poll timeout means no more events are coming
8788
// MainEventBusProcessor has 500ms to distribute events from MainEventBus
8889
// If we timeout with agentCompleted=true, all events have been distributed
8990
//
9091
// IMPORTANT: In replicated scenarios, remote events may arrive AFTER local agent completes!
9192
// Use grace period to allow for Kafka replication delays (can be 400-500ms)
92-
if (agentCompleted && queueSize == 0) {
93+
//
94+
// CRITICAL: Don't start timeout counter if we're awaiting a final event.
95+
// The awaitingFinalEvent flag is set when MainQueue enqueues a final event
96+
// but it hasn't been distributed to this ChildQueue yet.
97+
if (agentCompleted && queueSize == 0 && !awaitingFinal) {
9398
pollTimeoutsAfterAgentCompleted++;
9499
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
95100
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
@@ -102,10 +107,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
102107
LOGGER.debug("Agent completed but grace period active ({}/{} timeouts), continuing to poll (queue={})",
103108
pollTimeoutsAfterAgentCompleted, MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED, System.identityHashCode(queue));
104109
}
105-
} else if (agentCompleted && queueSize > 0) {
106-
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
107-
queueSize, System.identityHashCode(queue));
108-
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive
110+
} else if (agentCompleted && (queueSize > 0 || awaitingFinal)) {
111+
LOGGER.debug("Agent completed but queue has {} pending events or awaitingFinalEvent={}, resetting timeout counter and continuing to poll (queue={})",
112+
queueSize, awaitingFinal, System.identityHashCode(queue));
113+
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive or awaiting final
109114
}
110115
continue;
111116
}

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,25 @@ public void taskDone() {
304304
*/
305305
public abstract int size();
306306

307+
/**
308+
* Returns whether this queue is awaiting a final event to be delivered.
309+
* <p>
310+
* This is used by EventConsumer to determine if it should keep polling even when
311+
* the queue is empty. A final event may still be in-transit through MainEventBusProcessor.
312+
* </p>
313+
* <p>
314+
* For MainQueue: always returns false (MainQueue cannot be consumed).
315+
* For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called
316+
* but the final event hasn't been received yet.
317+
* </p>
318+
*
319+
* @return true if awaiting a final event, false otherwise
320+
*/
321+
public boolean isAwaitingFinalEvent() {
322+
// Default implementation - overridden by ChildQueue
323+
return false;
324+
}
325+
307326
/**
308327
* Closes this event queue gracefully, allowing pending events to be consumed.
309328
*/
@@ -735,6 +754,12 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
735754
if (item != null) {
736755
Event event = item.getEvent();
737756
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
757+
// Clear the awaiting flag only if this is a final event
758+
// This allows EventConsumer grace period logic to proceed correctly
759+
if (awaitingFinalEvent && isFinalEvent(event)) {
760+
awaitingFinalEvent = false;
761+
LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this));
762+
}
738763
} else {
739764
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
740765
}
@@ -757,6 +782,11 @@ public int size() {
757782
return queue.size();
758783
}
759784

785+
@Override
786+
public boolean isAwaitingFinalEvent() {
787+
return awaitingFinalEvent;
788+
}
789+
760790
@Override
761791
public void awaitQueuePollerStart() throws InterruptedException {
762792
parent.awaitQueuePollerStart();

0 commit comments

Comments
 (0)