Skip to content

Commit 25e79b9

Browse files
committed
Rework reference counting
1 parent 7f22b8f commit 25e79b9

1 file changed

Lines changed: 11 additions & 7 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,18 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
7474
// Track event count to ensure we only count down when the FINAL event's write completes,
7575
// not when earlier events' pending callbacks fire
7676
tube.whenRequested(requested -> {
77-
int ackCount = eventsAcknowledged.incrementAndGet();
77+
// Only count acknowledgments for actual events, not initial subscription requests
78+
// The initial onSubscribe() triggers 2 callbacks before any events are sent
7879
int sent = eventsSent.get();
79-
boolean waiting = waitingForFinalEventWrite.get();
80-
LOGGER.debug("whenRequested callback: requested={}, ack={}, sent={}, waiting={}",
81-
requested, ackCount, sent, waiting);
82-
if (waiting && ackCount >= sent) {
83-
LOGGER.debug("Subscriber acknowledged final event ({}/{}) - async write completed", ackCount, sent);
84-
finalEventWriteLatch.countDown();
80+
if (sent > 0) {
81+
int ackCount = eventsAcknowledged.incrementAndGet();
82+
boolean waiting = waitingForFinalEventWrite.get();
83+
LOGGER.debug("whenRequested callback: requested={}, ack={}, sent={}, waiting={}",
84+
requested, ackCount, sent, waiting);
85+
if (waiting && ackCount >= sent) {
86+
LOGGER.debug("Subscriber acknowledged final event ({}/{}) - async write completed", ackCount, sent);
87+
finalEventWriteLatch.countDown();
88+
}
8589
}
8690
});
8791

0 commit comments

Comments
 (0)