-
Notifications
You must be signed in to change notification settings - Fork 147
fix: resolve intermittent stream closure failures in EventConsumer #777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
+89
−7
Closed
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ public class EventConsumer { | |
| private volatile boolean agentCompleted = false; | ||
| private volatile int pollTimeoutsAfterAgentCompleted = 0; | ||
| private volatile @Nullable TaskState lastSeenTaskState = null; | ||
| private volatile int pollTimeoutsWhileAwaitingFinal = 0; | ||
|
|
||
| private static final String ERROR_MSG = "Agent did not return any response"; | ||
| private static final int NO_WAIT = -1; | ||
|
|
@@ -32,6 +33,10 @@ public class EventConsumer { | |
| // Grace period allows Kafka replication to deliver late-arriving events | ||
| // 3 timeouts * 500ms = 1500ms grace period for replication delays | ||
| private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3; | ||
| // Maximum time to wait for final event when awaitingFinalEvent is set | ||
| // If event doesn't arrive after this many timeouts, assume it won't arrive | ||
| // 6 timeouts * 500ms = 3000ms maximum wait for final event arrival | ||
| private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6; | ||
|
|
||
| public EventConsumer(EventQueue queue) { | ||
| this.queue = queue; | ||
|
|
@@ -82,8 +87,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |
| item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); | ||
| if (item == null) { | ||
| int queueSize = queue.size(); | ||
| LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}", | ||
| agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted); | ||
| boolean awaitingFinal = queue.isAwaitingFinalEvent(); | ||
| LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}", | ||
| agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal); | ||
| // If agent completed, a poll timeout means no more events are coming | ||
| // MainEventBusProcessor has 500ms to distribute events from MainEventBus | ||
| // If we timeout with agentCompleted=true, all events have been distributed | ||
|
|
@@ -94,8 +100,31 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |
| // CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED) | ||
| // Per A2A spec, interrupted states are NOT terminal - the stream must stay open | ||
| // for future state updates even after agent completes (agent will be re-invoked later). | ||
| // | ||
| // CRITICAL: Don't start timeout counter if we're awaiting a final event. | ||
| // The awaitingFinalEvent flag is set when MainQueue enqueues a final event | ||
| // but it hasn't been distributed to this ChildQueue yet. | ||
| // HOWEVER: If we've been waiting too long for the final event (>3s), give up and | ||
| // proceed with normal timeout logic to prevent infinite waiting. | ||
| boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); | ||
| if (agentCompleted && queueSize == 0 && !isInterruptedState) { | ||
|
|
||
| // Track how long we've been waiting for the final event | ||
| if (awaitingFinal && queueSize == 0) { | ||
| pollTimeoutsWhileAwaitingFinal++; | ||
| if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { | ||
| LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", | ||
| pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); | ||
| // Clear the flag on the queue itself, not just the local variable | ||
| if (queue instanceof EventQueue.ChildQueue) { | ||
| ((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent(); | ||
| } | ||
| awaitingFinal = false; // Also update local variable for this iteration | ||
| } | ||
| } else { | ||
| pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting | ||
| } | ||
|
|
||
| if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { | ||
| pollTimeoutsAfterAgentCompleted++; | ||
| if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { | ||
| LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})", | ||
|
|
@@ -116,11 +145,16 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |
| LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", | ||
| queueSize, System.identityHashCode(queue)); | ||
| pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive | ||
| } else if (agentCompleted && awaitingFinal) { | ||
| LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})", | ||
| pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue)); | ||
| pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final | ||
| } | ||
| continue; | ||
| } | ||
| // Event received - reset timeout counter | ||
| // Event received - reset timeout counters | ||
| pollTimeoutsAfterAgentCompleted = 0; | ||
| pollTimeoutsWhileAwaitingFinal = 0; | ||
| event = item.getEvent(); | ||
| LOGGER.debug("EventConsumer received event: {} (queue={})", | ||
| event.getClass().getSimpleName(), System.identityHashCode(queue)); | ||
|
|
@@ -179,10 +213,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |
| // the stream-end signal can reach the client BEFORE the buffered final event, | ||
| // causing the client to close the connection and never receive the final event. | ||
| // This is especially important in replicated scenarios where events arrive via Kafka | ||
| // and timing is less deterministic. A small delay ensures the buffer flushes. | ||
| // and timing is less deterministic. A delay ensures the buffer flushes. | ||
| // Increased to 150ms to account for CI environment latency and JVM scheduling delays. | ||
| if (isFinalSent) { | ||
| try { | ||
| Thread.sleep(50); // 50ms to allow SSE buffer flush | ||
| Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe move that to a constant ? |
||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.