Skip to content

Commit fdb156b

Browse files
committed
feat: implement gRPC rich error details for A2A errors, including new error types and client-side parsing.
1 parent 942f4ae commit fdb156b

5 files changed

Lines changed: 197 additions & 18 deletions

File tree

src/a2a/client/transports/grpc.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import logging
22

3-
from collections.abc import AsyncGenerator, Callable
3+
from collections.abc import AsyncGenerator, Callable, Iterable
44
from functools import wraps
5-
from typing import Any, NoReturn
5+
from typing import Any, NoReturn, cast
66

7+
from a2a.client.errors import A2AClientError, A2AClientTimeoutError
78
from a2a.client.middleware import ClientCallContext
8-
from a2a.utils.errors import JSON_RPC_ERROR_CODE_MAP
9+
from a2a.utils.errors import JSON_RPC_ERROR_CODE_MAP, A2AError
910

1011

1112
try:
@@ -18,8 +19,12 @@
1819
) from e
1920

2021

22+
from google.rpc import ( # type: ignore[reportMissingModuleSource]
23+
error_details_pb2,
24+
status_pb2,
25+
)
26+
2127
from a2a.client.client import ClientConfig
22-
from a2a.client.errors import A2AClientError, A2AClientTimeoutError
2328
from a2a.client.middleware import ClientCallInterceptor
2429
from a2a.client.optionals import Channel
2530
from a2a.client.transports.base import ClientTransport
@@ -44,6 +49,7 @@
4449
TaskPushNotificationConfig,
4550
)
4651
from a2a.utils.constants import PROTOCOL_VERSION_CURRENT, VERSION_HEADER
52+
from a2a.utils.errors import A2A_REASON_TO_ERROR
4753
from a2a.utils.telemetry import SpanKind, trace_class
4854

4955

@@ -54,14 +60,44 @@
5460
}
5561

5662

63+
def _parse_rich_grpc_error(
64+
value: bytes, original_error: grpc.aio.AioRpcError
65+
) -> None:
66+
try:
67+
status = status_pb2.Status.FromString(value)
68+
for detail in status.details:
69+
if detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
70+
error_info = error_details_pb2.ErrorInfo()
71+
detail.Unpack(error_info)
72+
73+
if error_info.domain == 'a2a-protocol.org':
74+
exception_cls = A2A_REASON_TO_ERROR.get(error_info.reason)
75+
if exception_cls:
76+
raise exception_cls(status.message) from original_error # noqa: TRY301
77+
except Exception as parse_e:
78+
# Don't swallow A2A errors generated above
79+
if isinstance(parse_e, (A2AError, A2AClientError)):
80+
raise parse_e
81+
logger.warning(
82+
'Failed to parse grpc-status-details-bin', exc_info=parse_e
83+
)
84+
85+
5786
def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn:
5887
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
5988
raise A2AClientTimeoutError('Client Request timed out') from e
6089

90+
metadata = e.trailing_metadata()
91+
if metadata:
92+
iterable_metadata = cast('Iterable[tuple[str, str | bytes]]', metadata)
93+
for key, value in iterable_metadata:
94+
if key == 'grpc-status-details-bin' and isinstance(value, bytes):
95+
_parse_rich_grpc_error(value, e)
96+
6197
details = e.details()
6298
if isinstance(details, str) and ': ' in details:
6399
error_type_name, error_message = details.split(': ', 1)
64-
# TODO(#723): Resolving imports by name is temporary until proper error handling structure is added in #723.
100+
# Leaving as fallback for errors that don't use the rich error details.
65101
exception_cls = _A2A_ERROR_NAME_TO_CLS.get(error_type_name)
66102
if exception_cls:
67103
raise exception_cls(error_message) from e

src/a2a/server/request_handlers/grpc_handler.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import logging
44

55
from abc import ABC, abstractmethod
6-
from collections.abc import AsyncIterable, Awaitable
6+
from collections.abc import AsyncIterable, Awaitable, Callable
7+
from typing import cast
78

89

910
try:
@@ -16,9 +17,8 @@
1617
"'pip install a2a-sdk[grpc]'"
1718
) from e
1819

19-
from collections.abc import Callable
20-
21-
from google.protobuf import empty_pb2, message
20+
from google.protobuf import any_pb2, empty_pb2, message
21+
from google.rpc import error_details_pb2, status_pb2
2222

2323
import a2a.types.a2a_pb2_grpc as a2a_grpc
2424

@@ -33,7 +33,7 @@
3333
from a2a.types import a2a_pb2
3434
from a2a.types.a2a_pb2 import AgentCard
3535
from a2a.utils import proto_utils
36-
from a2a.utils.errors import A2AError, TaskNotFoundError
36+
from a2a.utils.errors import A2A_ERROR_REASONS, A2AError, TaskNotFoundError
3737
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
3838

3939

@@ -419,10 +419,37 @@ async def abort_context(
419419
) -> None:
420420
"""Sets the grpc errors appropriately in the context."""
421421
code = _ERROR_CODE_MAP.get(type(error))
422+
423+
status_value = code.value if code else grpc.StatusCode.UNKNOWN.value
424+
status_code = (
425+
status_value[0] if isinstance(status_value, tuple) else status_value
426+
)
427+
error_msg = error.message if hasattr(error, 'message') else str(error)
428+
status = status_pb2.Status(code=status_code, message=error_msg)
429+
430+
if code:
431+
reason = A2A_ERROR_REASONS.get(type(error), 'UNKNOWN_ERROR')
432+
433+
error_info = error_details_pb2.ErrorInfo(
434+
reason=reason,
435+
domain='a2a-protocol.org',
436+
)
437+
438+
detail = any_pb2.Any()
439+
detail.Pack(error_info)
440+
status.details.append(detail)
441+
442+
context.set_trailing_metadata(
443+
cast(
444+
'tuple[tuple[str, str | bytes], ...]',
445+
(('grpc-status-details-bin', status.SerializeToString()),),
446+
)
447+
)
448+
422449
if code:
423450
await context.abort(
424451
code,
425-
f'{type(error).__name__}: {error.message}',
452+
status.message,
426453
)
427454
else:
428455
await context.abort(

src/a2a/utils/errors.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,26 @@ class MethodNotFoundError(A2AError):
8282
message = 'Method not found'
8383

8484

85+
class ExtensionSupportRequiredError(A2AError):
86+
"""Exception raised when extension support is required but not present."""
87+
88+
message = 'Extension support required'
89+
90+
91+
class VersionNotSupportedError(A2AError):
92+
"""Exception raised when the requested version is not supported."""
93+
94+
message = 'Version not supported'
95+
96+
8597
# For backward compatibility if needed, or just aliases for clean refactor
8698
# We remove the Pydantic models here.
8799

88100
__all__ = [
101+
'A2A_ERROR_REASONS',
102+
'A2A_REASON_TO_ERROR',
89103
'JSON_RPC_ERROR_CODE_MAP',
104+
'ExtensionSupportRequiredError',
90105
'InternalError',
91106
'InvalidAgentResponseError',
92107
'InvalidParamsError',
@@ -96,6 +111,7 @@ class MethodNotFoundError(A2AError):
96111
'TaskNotCancelableError',
97112
'TaskNotFoundError',
98113
'UnsupportedOperationError',
114+
'VersionNotSupportedError',
99115
]
100116

101117

@@ -112,3 +128,22 @@ class MethodNotFoundError(A2AError):
112128
MethodNotFoundError: -32601,
113129
InternalError: -32603,
114130
}
131+
132+
133+
A2A_ERROR_REASONS = {
134+
TaskNotFoundError: 'TASK_NOT_FOUND',
135+
TaskNotCancelableError: 'TASK_NOT_CANCELABLE',
136+
PushNotificationNotSupportedError: 'PUSH_NOTIFICATION_NOT_SUPPORTED',
137+
UnsupportedOperationError: 'UNSUPPORTED_OPERATION',
138+
ContentTypeNotSupportedError: 'CONTENT_TYPE_NOT_SUPPORTED',
139+
InvalidAgentResponseError: 'INVALID_AGENT_RESPONSE',
140+
AuthenticatedExtendedCardNotConfiguredError: 'EXTENDED_AGENT_CARD_NOT_CONFIGURED',
141+
ExtensionSupportRequiredError: 'EXTENSION_SUPPORT_REQUIRED',
142+
VersionNotSupportedError: 'VERSION_NOT_SUPPORTED',
143+
InvalidParamsError: 'INVALID_PARAMS',
144+
InvalidRequestError: 'INVALID_REQUEST',
145+
MethodNotFoundError: 'METHOD_NOT_FOUND',
146+
InternalError: 'INTERNAL_ERROR',
147+
}
148+
149+
A2A_REASON_TO_ERROR = {reason: cls for cls, reason in A2A_ERROR_REASONS.items()}

tests/client/transports/test_grpc_client.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
import grpc
44
import pytest
55

6+
from google.protobuf import any_pb2
7+
from google.rpc import error_details_pb2, status_pb2
8+
69
from a2a.client.middleware import ClientCallContext
710
from a2a.client.transports.grpc import GrpcTransport
811
from a2a.extensions.common import HTTP_EXTENSION_HEADER
912
from a2a.utils.constants import VERSION_HEADER, PROTOCOL_VERSION_CURRENT
13+
from a2a.utils.errors import A2A_ERROR_REASONS
1014
from a2a.types import a2a_pb2
1115
from a2a.types.a2a_pb2 import (
1216
AgentCapabilities,
@@ -257,16 +261,15 @@ async def test_send_message_with_timeout_context(
257261

258262
@pytest.mark.parametrize('error_cls', list(JSON_RPC_ERROR_CODE_MAP.keys()))
259263
@pytest.mark.asyncio
260-
async def test_grpc_mapped_errors(
264+
async def test_grpc_mapped_errors_legacy(
261265
grpc_transport: GrpcTransport,
262266
mock_grpc_stub: AsyncMock,
263267
sample_message_send_params: SendMessageRequest,
264268
error_cls,
265269
) -> None:
266-
"""Test handling of mapped gRPC error responses."""
270+
"""Test handling of legacy gRPC error responses."""
267271
error_details = f'{error_cls.__name__}: Mapped Error'
268272

269-
# We must trigger it from a standard transport method call, for example `send_message`.
270273
mock_grpc_stub.SendMessage.side_effect = grpc.aio.AioRpcError(
271274
code=grpc.StatusCode.INTERNAL,
272275
initial_metadata=grpc.aio.Metadata(),
@@ -278,6 +281,46 @@ async def test_grpc_mapped_errors(
278281
await grpc_transport.send_message(sample_message_send_params)
279282

280283

284+
@pytest.mark.parametrize('error_cls', list(JSON_RPC_ERROR_CODE_MAP.keys()))
285+
@pytest.mark.asyncio
286+
async def test_grpc_mapped_errors_rich(
287+
grpc_transport: GrpcTransport,
288+
mock_grpc_stub: AsyncMock,
289+
sample_message_send_params: SendMessageRequest,
290+
error_cls,
291+
) -> None:
292+
"""Test handling of rich gRPC error responses with Status metadata."""
293+
294+
reason = A2A_ERROR_REASONS.get(error_cls, 'UNKNOWN_ERROR')
295+
296+
error_info = error_details_pb2.ErrorInfo(
297+
reason=reason,
298+
domain='a2a-protocol.org',
299+
)
300+
301+
error_details = f'{error_cls.__name__}: Mapped Error'
302+
status = status_pb2.Status(
303+
code=grpc.StatusCode.INTERNAL.value[0], message=error_details
304+
)
305+
detail = any_pb2.Any()
306+
detail.Pack(error_info)
307+
status.details.append(detail)
308+
309+
mock_grpc_stub.SendMessage.side_effect = grpc.aio.AioRpcError(
310+
code=grpc.StatusCode.INTERNAL,
311+
initial_metadata=grpc.aio.Metadata(),
312+
trailing_metadata=grpc.aio.Metadata(
313+
('grpc-status-details-bin', status.SerializeToString()),
314+
),
315+
details='A generic error message',
316+
)
317+
318+
with pytest.raises(error_cls) as excinfo:
319+
await grpc_transport.send_message(sample_message_send_params)
320+
321+
assert str(excinfo.value) == error_details
322+
323+
281324
@pytest.mark.asyncio
282325
async def test_send_message_message_response(
283326
grpc_transport: GrpcTransport,

tests/server/request_handlers/test_grpc_handler.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import grpc.aio
66
import pytest
77

8+
from google.rpc import error_details_pb2, status_pb2
89
from a2a import types
910
from a2a.extensions.common import HTTP_EXTENSION_HEADER
1011
from a2a.server.context import ServerCallContext
@@ -99,7 +100,7 @@ async def test_send_message_server_error(
99100
await grpc_handler.SendMessage(request_proto, mock_grpc_context)
100101

101102
mock_grpc_context.abort.assert_awaited_once_with(
102-
grpc.StatusCode.INVALID_ARGUMENT, 'InvalidParamsError: Bad params'
103+
grpc.StatusCode.INVALID_ARGUMENT, 'Bad params'
103104
)
104105

105106

@@ -138,7 +139,7 @@ async def test_get_task_not_found(
138139
await grpc_handler.GetTask(request_proto, mock_grpc_context)
139140

140141
mock_grpc_context.abort.assert_awaited_once_with(
141-
grpc.StatusCode.NOT_FOUND, 'TaskNotFoundError: Task not found'
142+
grpc.StatusCode.NOT_FOUND, 'Task not found'
142143
)
143144

144145

@@ -157,7 +158,7 @@ async def test_cancel_task_server_error(
157158

158159
mock_grpc_context.abort.assert_awaited_once_with(
159160
grpc.StatusCode.UNIMPLEMENTED,
160-
'TaskNotCancelableError: Task cannot be canceled',
161+
'Task cannot be canceled',
161162
)
162163

163164

@@ -379,7 +380,44 @@ async def test_abort_context_error_mapping( # noqa: PLR0913
379380
mock_grpc_context.abort.assert_awaited_once()
380381
call_args, _ = mock_grpc_context.abort.call_args
381382
assert call_args[0] == grpc_status_code
382-
assert error_message_part in call_args[1]
383+
384+
# We shouldn't rely on the legacy ExceptionName: message string format
385+
# But for backward compatability fallback it shouldn't fail
386+
mock_grpc_context.set_trailing_metadata.assert_called_once()
387+
metadata = mock_grpc_context.set_trailing_metadata.call_args[0][0]
388+
389+
assert any(key == 'grpc-status-details-bin' for key, _ in metadata)
390+
391+
392+
@pytest.mark.asyncio
393+
async def test_abort_context_rich_error_format(
394+
grpc_handler: GrpcHandler,
395+
mock_request_handler: AsyncMock,
396+
mock_grpc_context: AsyncMock,
397+
) -> None:
398+
399+
error = types.TaskNotFoundError('Could not find the task')
400+
mock_request_handler.on_get_task.side_effect = error
401+
request_proto = a2a_pb2.GetTaskRequest(id='any')
402+
await grpc_handler.GetTask(request_proto, mock_grpc_context)
403+
404+
mock_grpc_context.set_trailing_metadata.assert_called_once()
405+
metadata = mock_grpc_context.set_trailing_metadata.call_args[0][0]
406+
407+
bin_values = [v for k, v in metadata if k == 'grpc-status-details-bin']
408+
assert len(bin_values) == 1
409+
410+
status = status_pb2.Status.FromString(bin_values[0])
411+
assert status.code == grpc.StatusCode.NOT_FOUND.value[0]
412+
assert status.message == 'Could not find the task'
413+
414+
assert len(status.details) == 1
415+
416+
error_info = error_details_pb2.ErrorInfo()
417+
status.details[0].Unpack(error_info)
418+
419+
assert error_info.reason == 'TASK_NOT_FOUND'
420+
assert error_info.domain == 'a2a-protocol.org'
383421

384422

385423
@pytest.mark.asyncio

0 commit comments

Comments
 (0)