Skip to content

Commit e71ac62

Browse files
feat: send task as a first subscribe event (#716)
Following > The operation MUST return a Task object as the first event in the stream introduced in 1.0: https://a2a-protocol.org/latest/specification/#316-subscribe-to-task It also correctly uses `UnsupportedOperationError` as per the "Errors" section from the documentation linked above. Fixes #675 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 427a75b commit e71ac62

3 files changed

Lines changed: 13 additions & 4 deletions

File tree

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,11 +555,15 @@ async def on_subscribe_to_task(
555555

556556
if task.status.state in TERMINAL_TASK_STATES:
557557
raise ServerError(
558-
error=InvalidParamsError(
558+
error=UnsupportedOperationError(
559559
message=f'Task {task.id} is in terminal state: {task.status.state}'
560560
)
561561
)
562562

563+
# The operation MUST return a Task object as the first event in the stream
564+
# https://a2a-protocol.org/latest/specification/#316-subscribe-to-task
565+
yield task
566+
563567
task_manager = TaskManager(
564568
task_id=task.id,
565569
context_id=task.context_id,

tests/server/request_handlers/test_default_request_handler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,7 +1499,10 @@ async def exec_side_effect(_request, queue: EventQueue):
14991499
# Allow producer to emit the next event
15001500
allow_second_event.set()
15011501

1502-
received = await resub_gen.__anext__()
1502+
first_subscribe_event = await anext(resub_gen)
1503+
assert first_subscribe_event == task_for_resub
1504+
1505+
received = await anext(resub_gen)
15031506
assert received == second_event
15041507

15051508
# Finish producer to allow cleanup paths to complete
@@ -2706,7 +2709,7 @@ async def test_on_subscribe_to_task_in_terminal_state(terminal_state):
27062709
async for _ in request_handler.on_subscribe_to_task(params, context):
27072710
pass # pragma: no cover
27082711

2709-
assert isinstance(exc_info.value.error, InvalidParamsError)
2712+
assert isinstance(exc_info.value.error, UnsupportedOperationError)
27102713
assert exc_info.value.error.message
27112714
assert (
27122715
f'Task {task_id} is in terminal state: {terminal_state}'

tests/server/request_handlers/test_jsonrpc_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,9 @@ async def streaming_coro():
703703
collected_events: list[Any] = []
704704
async for event in response:
705705
collected_events.append(event)
706-
assert len(collected_events) == len(events)
706+
assert (
707+
len(collected_events) == len(events) + 1
708+
) # First event is task itself
707709
assert mock_task.history is not None and len(mock_task.history) == 0
708710

709711
async def test_on_subscribe_no_existing_task_error(self) -> None:

0 commit comments

Comments
 (0)