Skip to content

Commit 7437b88

Browse files
authored
feat: EventQueue - unify implementation between python versions (#877)
Introduced a compatibility layer using the culsans library to backport asyncio.Queue.shutdown functionality to Python versions older than 3.13. Previous implementation was broken (deadlocks and inconsistent behaviour with 3.13 implementation). Culsans library allowed for unified code between versions. This is one of the steps towards better concurrency model in a2a python sdk. Fixes #869
1 parent 4630efd commit 7437b88

8 files changed

Lines changed: 521 additions & 222 deletions

File tree

.github/actions/spelling/allow.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ cls
3535
coc
3636
codegen
3737
coro
38+
culsans
3839
datamodel
3940
deepwiki
4041
drivername
@@ -127,7 +128,7 @@ taskupdate
127128
testuuid
128129
Tful
129130
tiangolo
131+
TResponse
130132
typ
131133
typeerror
132134
vulnz
133-
TResponse

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies = [
1515
"google-api-core>=1.26.0",
1616
"json-rpc>=1.15.0",
1717
"googleapis-common-protos>=1.70.0",
18+
"culsans>=0.11.0 ; python_full_version < '3.13'",
1819
]
1920

2021
classifiers = [

src/a2a/server/events/event_consumer.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import asyncio
22
import logging
3-
import sys
43

54
from collections.abc import AsyncGenerator
65

76
from pydantic import ValidationError
87

9-
from a2a.server.events.event_queue import Event, EventQueue
8+
from a2a.server.events.event_queue import Event, EventQueue, QueueShutDown
109
from a2a.types.a2a_pb2 import (
1110
Message,
1211
Task,
@@ -17,13 +16,6 @@
1716
from a2a.utils.telemetry import SpanKind, trace_class
1817

1918

20-
# This is an alias to the exception for closed queue
21-
QueueClosed: type[Exception] = asyncio.QueueEmpty
22-
23-
# When using python 3.13 or higher, the closed queue signal is QueueShutdown
24-
if sys.version_info >= (3, 13):
25-
QueueClosed = asyncio.QueueShutDown
26-
2719
logger = logging.getLogger(__name__)
2820

2921

@@ -130,7 +122,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
130122
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
131123
# This class was made an alias of built-in TimeoutError after 3.11
132124
continue
133-
except (QueueClosed, asyncio.QueueEmpty):
125+
except (QueueShutDown, asyncio.QueueEmpty):
134126
# Confirm that the queue is closed, e.g. we aren't on
135127
# python 3.12 and get a queue empty error on an open queue
136128
if self.queue.is_closed():

src/a2a/server/events/event_queue.py

Lines changed: 49 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,31 @@
33
import sys
44

55
from types import TracebackType
6+
from typing import Any
67

78
from typing_extensions import Self
89

10+
11+
if sys.version_info >= (3, 13):
12+
from asyncio import Queue as AsyncQueue
13+
from asyncio import QueueShutDown
14+
15+
def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
16+
"""Create a backwards-compatible queue object."""
17+
return AsyncQueue(maxsize=maxsize)
18+
else:
19+
import culsans
20+
21+
from culsans import AsyncQueue # type: ignore[no-redef]
22+
from culsans import (
23+
AsyncQueueShutDown as QueueShutDown, # type: ignore[no-redef]
24+
)
25+
26+
def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
27+
"""Create a backwards-compatible queue object."""
28+
return culsans.Queue(maxsize=maxsize).async_q # type: ignore[no-any-return]
29+
30+
931
from a2a.types.a2a_pb2 import (
1032
Message,
1133
Task,
@@ -41,7 +63,9 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
4163
if max_queue_size <= 0:
4264
raise ValueError('max_queue_size must be greater than 0')
4365

44-
self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=max_queue_size)
66+
self.queue: AsyncQueue[Event] = _create_async_queue(
67+
maxsize=max_queue_size
68+
)
4569
self._children: list[EventQueue] = []
4670
self._is_closed = False
4771
self._lock = asyncio.Lock()
@@ -73,8 +97,12 @@ async def enqueue_event(self, event: Event) -> None:
7397

7498
logger.debug('Enqueuing event of type: %s', type(event))
7599

76-
# Make sure to use put instead of put_nowait to avoid blocking the event loop.
77-
await self.queue.put(event)
100+
try:
101+
await self.queue.put(event)
102+
except QueueShutDown:
103+
logger.warning('Queue was closed during enqueuing. Event dropped.')
104+
return
105+
78106
for child in self._children:
79107
await child.enqueue_event(event)
80108

@@ -107,14 +135,9 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
107135
asyncio.QueueShutDown: If the queue has been closed and is empty.
108136
"""
109137
async with self._lock:
110-
if (
111-
sys.version_info < (3, 13)
112-
and self._is_closed
113-
and self.queue.empty()
114-
):
115-
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
138+
if self._is_closed and self.queue.empty():
116139
logger.warning('Queue is closed. Event will not be dequeued.')
117-
raise asyncio.QueueEmpty('Queue is closed.')
140+
raise QueueShutDown('Queue is closed.')
118141

119142
if no_wait:
120143
logger.debug('Attempting to dequeue event (no_wait=True).')
@@ -152,56 +175,26 @@ def tap(self) -> 'EventQueue':
152175
async def close(self, immediate: bool = False) -> None:
153176
"""Closes the queue for future push events and also closes all child queues.
154177
155-
Once closed, no new events can be enqueued. Behavior is consistent across
156-
Python versions:
157-
- Python >= 3.13: Uses `asyncio.Queue.shutdown` to stop the queue. With
158-
`immediate=True` the queue is shut down and pending events are cleared; with
159-
`immediate=False` the queue is shut down and we wait for it to drain via
160-
`queue.join()`.
161-
- Python < 3.13: Emulates the same semantics by clearing on `immediate=True`
162-
or awaiting `queue.join()` on `immediate=False`.
163-
164-
Consumers attempting to dequeue after close on an empty queue will observe
165-
`asyncio.QueueShutDown` on Python >= 3.13 and `asyncio.QueueEmpty` on
166-
Python < 3.13.
167-
168178
Args:
169-
immediate (bool):
170-
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
171-
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
172-
179+
immediate: If True, immediately flushes the queue, discarding all pending
180+
events, and causes any currently blocked `dequeue_event` calls to raise
181+
`QueueShutDown`. If False (default), the queue is marked as closed to new
182+
events, but existing events can still be dequeued and processed until the
183+
queue is fully drained.
173184
"""
174185
logger.debug('Closing EventQueue.')
175186
async with self._lock:
176-
# If already closed, just return.
177187
if self._is_closed and not immediate:
178188
return
179-
if not self._is_closed:
180-
self._is_closed = True
181-
# If using python 3.13 or higher, use shutdown but match <3.13 semantics
182-
if sys.version_info >= (3, 13):
183-
if immediate:
184-
# Immediate: stop queue and clear any pending events, then close children
185-
self.queue.shutdown(True)
186-
await self.clear_events(True)
187-
for child in self._children:
188-
await child.close(True)
189-
return
190-
# Graceful: prevent further gets/puts via shutdown, then wait for drain and children
191-
self.queue.shutdown(False)
192-
await asyncio.gather(
193-
self.queue.join(), *(child.close() for child in self._children)
194-
)
195-
# Otherwise, join the queue
196-
else:
197-
if immediate:
198-
await self.clear_events(True)
199-
for child in self._children:
200-
await child.close(immediate)
201-
return
202-
await asyncio.gather(
203-
self.queue.join(), *(child.close() for child in self._children)
204-
)
189+
self._is_closed = True
190+
191+
self.queue.shutdown(immediate)
192+
193+
await asyncio.gather(
194+
*(child.close(immediate) for child in self._children)
195+
)
196+
if not immediate:
197+
await self.queue.join()
205198

206199
def is_closed(self) -> bool:
207200
"""Checks if the queue is closed."""
@@ -234,15 +227,8 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
234227
cleared_count += 1
235228
except asyncio.QueueEmpty:
236229
pass
237-
except Exception as e:
238-
# Handle Python 3.13+ QueueShutDown
239-
if (
240-
sys.version_info >= (3, 13)
241-
and type(e).__name__ == 'QueueShutDown'
242-
):
243-
pass
244-
else:
245-
raise
230+
except QueueShutDown:
231+
pass
246232

247233
if cleared_count > 0:
248234
logger.debug(

tests/server/events/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)