@@ -24,6 +24,7 @@ public class EventConsumer {
2424 private volatile boolean agentCompleted = false ;
2525 private volatile int pollTimeoutsAfterAgentCompleted = 0 ;
2626 private volatile @ Nullable TaskState lastSeenTaskState = null ;
27+ private volatile int pollTimeoutsWhileAwaitingFinal = 0 ;
2728
2829 private static final String ERROR_MSG = "Agent did not return any response" ;
2930 private static final int NO_WAIT = -1 ;
@@ -32,6 +33,10 @@ public class EventConsumer {
3233 // Grace period allows Kafka replication to deliver late-arriving events
3334 // 3 timeouts * 500ms = 1500ms grace period for replication delays
3435 private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3 ;
36+ // Maximum time to wait for final event when awaitingFinalEvent is set
37+ // If event doesn't arrive after this many timeouts, assume it won't arrive
38+ // 6 timeouts * 500ms = 3000ms maximum wait for final event arrival
39+ private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6 ;
3540
3641 public EventConsumer (EventQueue queue ) {
3742 this .queue = queue ;
@@ -83,8 +88,8 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
8388 if (item == null ) {
8489 int queueSize = queue .size ();
8590 boolean awaitingFinal = queue .isAwaitingFinalEvent ();
86- LOGGER .debug ("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}" ,
87- agentCompleted , queueSize , awaitingFinal , pollTimeoutsAfterAgentCompleted );
91+ LOGGER .debug ("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={} " ,
92+ agentCompleted , queueSize , awaitingFinal , pollTimeoutsAfterAgentCompleted , pollTimeoutsWhileAwaitingFinal );
8893 // If agent completed, a poll timeout means no more events are coming
8994 // MainEventBusProcessor has 500ms to distribute events from MainEventBus
9095 // If we timeout with agentCompleted=true, all events have been distributed
@@ -99,7 +104,22 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
99104 // CRITICAL: Don't start timeout counter if we're awaiting a final event.
100105 // The awaitingFinalEvent flag is set when MainQueue enqueues a final event
101106 // but it hasn't been distributed to this ChildQueue yet.
107+ // HOWEVER: If we've been waiting too long for the final event (>3s), give up and
108+ // proceed with normal timeout logic to prevent infinite waiting.
102109 boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState .isInterrupted ();
110+
111+ // Track how long we've been waiting for the final event
112+ if (awaitingFinal && queueSize == 0 ) {
113+ pollTimeoutsWhileAwaitingFinal ++;
114+ if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL ) {
115+ LOGGER .debug ("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})" ,
116+ pollTimeoutsWhileAwaitingFinal , System .identityHashCode (queue ));
117+ awaitingFinal = false ; // Give up waiting, let normal timeout logic proceed
118+ }
119+ } else {
120+ pollTimeoutsWhileAwaitingFinal = 0 ; // Reset when event arrives or queue not awaiting
121+ }
122+
103123 if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal ) {
104124 pollTimeoutsAfterAgentCompleted ++;
105125 if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED ) {
@@ -117,15 +137,20 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
117137 LOGGER .debug ("Agent completed but task is in interrupted state ({}), stream must remain open (queue={})" ,
118138 lastSeenTaskState , System .identityHashCode (queue ));
119139 pollTimeoutsAfterAgentCompleted = 0 ; // Reset counter
120- } else if (agentCompleted && (queueSize > 0 || awaitingFinal )) {
121- LOGGER .debug ("Agent completed but queue has {} pending events or awaitingFinalEvent={}, resetting timeout counter and continuing to poll (queue={})" ,
122- queueSize , awaitingFinal , System .identityHashCode (queue ));
123- pollTimeoutsAfterAgentCompleted = 0 ; // Reset counter when events arrive or awaiting final
140+ } else if (agentCompleted && queueSize > 0 ) {
141+ LOGGER .debug ("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})" ,
142+ queueSize , System .identityHashCode (queue ));
143+ pollTimeoutsAfterAgentCompleted = 0 ; // Reset counter when events arrive
144+ } else if (agentCompleted && awaitingFinal ) {
145+ LOGGER .debug ("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})" ,
146+ pollTimeoutsWhileAwaitingFinal , MAX_POLL_TIMEOUTS_AWAITING_FINAL , System .identityHashCode (queue ));
147+ pollTimeoutsAfterAgentCompleted = 0 ; // Reset counter while awaiting final
124148 }
125149 continue ;
126150 }
127- // Event received - reset timeout counter
151+ // Event received - reset timeout counters
128152 pollTimeoutsAfterAgentCompleted = 0 ;
153+ pollTimeoutsWhileAwaitingFinal = 0 ;
129154 event = item .getEvent ();
130155 LOGGER .debug ("EventConsumer received event: {} (queue={})" ,
131156 event .getClass ().getSimpleName (), System .identityHashCode (queue ));
0 commit comments