Skip to content

Commit 3f877a7

Browse files
committed
review fixes
1 parent 378970b commit 3f877a7

2 files changed

Lines changed: 14 additions & 10 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,7 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
115115
LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})",
116116
pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue));
117117
// Clear the flag on the queue itself, not just the local variable
118-
if (queue instanceof EventQueue.ChildQueue) {
119-
((EventQueue.ChildQueue) queue).clearAwaitingFinalEvent();
120-
}
118+
queue.clearAwaitingFinalEvent();
121119
awaitingFinal = false; // Also update local variable for this iteration
122120
}
123121
} else {

server-common/src/main/java/io/a2a/server/events/EventQueue.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,17 @@ public boolean isAwaitingFinalEvent() {
323323
return false;
324324
}
325325

326+
/**
327+
* Clears the awaiting final event flag.
328+
* <p>
329+
* Default implementation is a no-op for queues that don't track this state.
330+
* ChildQueue overrides this to actually clear the flag.
331+
* </p>
332+
*/
333+
public void clearAwaitingFinalEvent() {
334+
// Default no-op implementation - overridden by ChildQueue
335+
}
336+
326337
/**
327338
* Closes this event queue gracefully, allowing pending events to be consumed.
328339
*/
@@ -754,12 +765,6 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
754765
if (item != null) {
755766
Event event = item.getEvent();
756767
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
757-
// Clear the awaiting flag only if this is a final event
758-
// This allows EventConsumer grace period logic to proceed correctly
759-
if (awaitingFinalEvent && isFinalEvent(event)) {
760-
awaitingFinalEvent = false;
761-
LOGGER.debug("ChildQueue {} received final event while awaiting - flag cleared", System.identityHashCode(this));
762-
}
763768
} else {
764769
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
765770
}
@@ -824,7 +829,8 @@ void expectFinalEvent() {
824829
* Called by EventConsumer when it has waited too long for the final event.
825830
* This allows normal timeout logic to proceed if the final event never arrives.
826831
*/
827-
void clearAwaitingFinalEvent() {
832+
@Override
833+
public void clearAwaitingFinalEvent() {
828834
awaitingFinalEvent = false;
829835
LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this));
830836
}

0 commit comments

Comments
 (0)