Skip to content

Commit 4e5e6df

Browse files
committed
Undo background task for close.
1 parent 81358a3 commit 4e5e6df

4 files changed

Lines changed: 18 additions & 51 deletions

File tree

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def __init__(self, queue: EventQueue):
3232
self.queue = queue
3333
self._timeout = 0.5
3434
self._exception: BaseException | None = None
35-
self._close_task: asyncio.Task[None] | None = None
3635
logger.debug('EventConsumer initialized')
3736

3837
async def consume_one(self) -> Event:
@@ -107,25 +106,13 @@ async def consume_all(self) -> AsyncGenerator[Event]:
107106
)
108107
)
109108

110-
# Initiate a graceful close in the background.
111-
# We use immediate=False to ensure tapped child queues can finish
112-
# draining their events (preventing data loss for slow consumers).
113-
# We use clear_parent_events=True to prevent deadlocks since this
114-
# consumer is stopping and won't dequeue any remaining trailing events.
115-
# We use create_task instead of awaiting it directly because awaiting
116-
# immediate=False would block this consumer until all child queues
117-
# are also drained.
118109
# Make sure the yield is after the close events, otherwise
119110
# the caller may end up in a blocked state where this
120111
# generator isn't called again to close things out and the
121112
# other part is waiting for an event or a closed queue.
122113
if is_final_event:
123114
logger.debug('Stopping event consumption in consume_all.')
124-
self._close_task = asyncio.create_task(
125-
self.queue.close(
126-
immediate=False, clear_parent_events=True
127-
)
128-
)
115+
await self.queue.close(True)
129116
yield event
130117
break
131118
yield event

src/a2a/server/events/event_queue.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,7 @@ def tap(self) -> 'EventQueue':
172172
self._children.append(queue)
173173
return queue
174174

175-
async def close(
176-
self, immediate: bool = False, clear_parent_events: bool = False
177-
) -> None:
175+
async def close(self, immediate: bool = False) -> None:
178176
"""Closes the queue for future push events and also closes all child queues.
179177
180178
Args:
@@ -183,9 +181,6 @@ async def close(
183181
`QueueShutDown`. If False (default), the queue is marked as closed to new
184182
events, but existing events can still be dequeued and processed until the
185183
queue is fully drained.
186-
clear_parent_events: If True, completely clears all pending events from this
187-
specific parent queue without processing them. This parameter is ignored if
188-
`immediate=True`.
189184
"""
190185
logger.debug('Closing EventQueue.')
191186
async with self._lock:
@@ -195,9 +190,6 @@ async def close(
195190

196191
self.queue.shutdown(immediate)
197192

198-
if clear_parent_events and not immediate:
199-
await self.clear_events(clear_child_queues=False)
200-
201193
await asyncio.gather(
202194
*(child.close(immediate) for child in self._children)
203195
)

tests/server/events/test_event_consumer.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ async def test_consume_all_handles_validation_error(
461461
)
462462

463463

464+
@pytest.mark.xfail(reason='https://github.com/a2aproject/a2a-python/issues/869')
464465
@pytest.mark.asyncio
465466
async def test_graceful_close_allows_tapped_queues_to_drain() -> None:
466467

@@ -515,30 +516,25 @@ async def slow_consume() -> list:
515516
assert len(slow_events) == 3
516517

517518

519+
@pytest.mark.xfail(reason='https://github.com/a2aproject/a2a-python/issues/869', raises=asyncio.TimeoutError)
518520
@pytest.mark.asyncio
519521
async def test_background_close_deadlocks_on_trailing_events() -> None:
520522
queue = EventQueue()
521-
consumer = EventConsumer(queue)
522523

523524
# Producer enqueues a final event, but then enqueues another event
524525
# (e.g., simulating a delayed log message, race condition, or multiple messages).
525526
await queue.enqueue_event(Message(message_id='final'))
526527
await queue.enqueue_event(Message(message_id='trailing_log'))
527528

528-
# Consume events. The consumer will break its internal loop on the 'final' message,
529-
# leaving 'trailing_log' in the queue forever.
530-
events = [event async for event in consumer.consume_all()]
529+
# Consumer dequeues 'final' but stops there (e.g. because it is a final event).
530+
event = await queue.dequeue_event()
531+
assert isinstance(event, Message) and event.message_id == 'final'
532+
queue.task_done()
531533

532-
assert len(events) == 1
533-
assert consumer._close_task is not None
534-
535-
# The background close task awaits queue.close(immediate=False), which waits for
536-
# queue.join(). Since we passed clear_parent_events=True, it clears trailing events
537-
# and successfully completes without deadlocking.
538-
try:
539-
await asyncio.wait_for(consumer._close_task, timeout=0.1)
540-
except asyncio.TimeoutError:
541-
pytest.fail('Background close task deadlocked on trailing events!')
534+
# Now attempt a graceful close. This demonstrates the deadlock that
535+
# the previous implementation (with background task and clear_parent_events)
536+
# was trying to solve.
537+
await asyncio.wait_for(queue.close(immediate=False), timeout=0.1)
542538

543539

544540
@pytest.mark.asyncio

tests/server/events/test_event_queue.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -482,15 +482,14 @@ async def test_close_propagates_to_children(event_queue: EventQueue) -> None:
482482
assert child_queue2.is_closed()
483483

484484

485+
@pytest.mark.xfail(reason='https://github.com/a2aproject/a2a-python/issues/869')
485486
@pytest.mark.asyncio
486487
async def test_enqueue_close_race_condition() -> None:
487488
queue = EventQueue()
488489
event = create_sample_message()
489490

490491
enqueue_task = asyncio.create_task(queue.enqueue_event(event))
491-
close_task = asyncio.create_task(
492-
queue.close(immediate=False, clear_parent_events=True)
493-
)
492+
close_task = asyncio.create_task(queue.close(immediate=False))
494493

495494
try:
496495
results = await asyncio.wait_for(
@@ -582,19 +581,16 @@ async def getter():
582581

583582

584583
@pytest.mark.parametrize(
585-
'immediate, clear_parent_events, expected_events, close_blocks',
584+
'immediate, expected_events, close_blocks',
586585
[
587-
(False, False, (1, 1), True),
588-
(False, True, (0, 1), True),
589-
(True, False, (0, 0), False),
590-
(True, True, (0, 0), False),
586+
(False, (1, 1), True),
587+
(True, (0, 0), False),
591588
],
592589
)
593590
@pytest.mark.asyncio
594591
async def test_event_queue_close_behaviors(
595592
event_queue: EventQueue,
596593
immediate: bool,
597-
clear_parent_events: bool,
598594
expected_events: tuple[int, int],
599595
close_blocks: bool,
600596
) -> None:
@@ -611,11 +607,7 @@ async def test_event_queue_close_behaviors(
611607
event_queue.queue = QueueJoinWrapper(event_queue.queue, join_reached)
612608
child_queue.queue = QueueJoinWrapper(child_queue.queue, join_reached)
613609

614-
close_task = asyncio.create_task(
615-
event_queue.close(
616-
immediate=immediate, clear_parent_events=clear_parent_events
617-
)
618-
)
610+
close_task = asyncio.create_task(event_queue.close(immediate=immediate))
619611

620612
if close_blocks:
621613
await join_reached.wait()

0 commit comments

Comments
 (0)