Skip to content

Commit 690bb5d

Browse files
committed
Dedupe stream request in jsonrpc.py client
1 parent ae70024 commit 690bb5d

1 file changed

Lines changed: 61 additions & 68 deletions

File tree

src/a2a/client/transports/jsonrpc.py

Lines changed: 61 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -120,46 +120,12 @@ async def send_message_streaming(
120120
modified_kwargs,
121121
context,
122122
)
123-
modified_kwargs.setdefault(
124-
'timeout', self.httpx_client.timeout.as_dict().get('read', None)
125-
)
126-
headers = dict(self.httpx_client.headers.items())
127-
headers.update(modified_kwargs.get('headers', {}))
128-
modified_kwargs['headers'] = headers
129-
130-
async with aconnect_sse(
131-
self.httpx_client,
132-
'POST',
133-
self.url,
134-
json=payload,
135-
**modified_kwargs,
136-
) as event_source:
137-
try:
138-
event_source.response.raise_for_status()
139-
async for sse in event_source.aiter_sse():
140-
if not sse.data:
141-
continue
142-
json_rpc_response = JSONRPC20Response.from_json(sse.data)
143-
if json_rpc_response.error:
144-
self._handle_jsonrpc_error(json_rpc_response.error)
145-
response: StreamResponse = json_format.ParseDict(
146-
json_rpc_response.result, StreamResponse()
147-
)
148-
yield response
149-
except httpx.TimeoutException as e:
150-
raise A2AClientError('Client Request timed out') from e
151-
except httpx.HTTPStatusError as e:
152-
raise A2AClientError(
153-
f'HTTP Error {e.response.status_code}: {e}'
154-
) from e
155-
except SSEError as e:
156-
raise A2AClientError(
157-
f'Invalid SSE response or protocol error: {e}'
158-
) from e
159-
except json.JSONDecodeError as e:
160-
raise A2AClientError(str(e)) from e
161-
except httpx.RequestError as e:
162-
raise A2AClientError(f'Network communication error: {e}') from e
123+
async for event in self._send_stream_request(
124+
payload,
125+
http_kwargs=modified_kwargs,
126+
timeout=self.httpx_client.timeout.as_dict().get('read', None),
127+
):
128+
yield event
163129

164130
async def get_task(
165131
self,
@@ -403,34 +369,11 @@ async def subscribe(
403369
modified_kwargs,
404370
context,
405371
)
406-
modified_kwargs.setdefault('timeout', None)
407-
408-
async with aconnect_sse(
409-
self.httpx_client,
410-
'POST',
411-
self.url,
412-
json=payload,
413-
**modified_kwargs,
414-
) as event_source:
415-
try:
416-
async for sse in event_source.aiter_sse():
417-
json_rpc_response = JSONRPC20Response.from_json(sse.data)
418-
if json_rpc_response.error:
419-
self._handle_jsonrpc_error(json_rpc_response.error)
420-
response: StreamResponse = json_format.ParseDict(
421-
json_rpc_response.result, StreamResponse()
422-
)
423-
yield response
424-
except httpx.TimeoutException as e:
425-
raise A2AClientError('Client Request timed out') from e
426-
except SSEError as e:
427-
raise A2AClientError(
428-
f'Invalid SSE response or protocol error: {e}'
429-
) from e
430-
except json.JSONDecodeError as e:
431-
raise A2AClientError(str(e)) from e
432-
except httpx.RequestError as e:
433-
raise A2AClientError(f'Network communication error: {e}') from e
372+
async for event in self._send_stream_request(
373+
payload,
374+
http_kwargs=modified_kwargs,
375+
):
376+
yield event
434377

435378
async def get_extended_agent_card(
436379
self,
@@ -543,3 +486,53 @@ async def _send_request(
543486
raise A2AClientError(str(e)) from e
544487
except httpx.RequestError as e:
545488
raise A2AClientError(f'Network communication error: {e}') from e
489+
490+
async def _send_stream_request(
491+
self,
492+
rpc_request_payload: dict[str, Any],
493+
http_kwargs: dict[str, Any] | None = None,
494+
**kwargs: Any,
495+
) -> AsyncGenerator[StreamResponse]:
496+
final_kwargs = dict(http_kwargs or {})
497+
final_kwargs.update(kwargs)
498+
final_kwargs.setdefault('timeout', None)
499+
headers = dict(self.httpx_client.headers.items())
500+
headers.update(final_kwargs.get('headers', {}))
501+
final_kwargs['headers'] = headers
502+
503+
try:
504+
async with aconnect_sse(
505+
self.httpx_client,
506+
'POST',
507+
self.url,
508+
json=rpc_request_payload,
509+
**final_kwargs,
510+
) as event_source:
511+
try:
512+
event_source.response.raise_for_status()
513+
async for sse in event_source.aiter_sse():
514+
if not sse.data:
515+
continue
516+
json_rpc_response = JSONRPC20Response.from_json(
517+
sse.data
518+
)
519+
if json_rpc_response.error:
520+
self._handle_jsonrpc_error(json_rpc_response.error)
521+
response: StreamResponse = json_format.ParseDict(
522+
json_rpc_response.result, StreamResponse()
523+
)
524+
yield response
525+
except httpx.HTTPStatusError as e:
526+
raise A2AClientError(
527+
f'HTTP Error {e.response.status_code}: {e}'
528+
) from e
529+
except SSEError as e:
530+
raise A2AClientError(
531+
f'Invalid SSE response or protocol error: {e}'
532+
) from e
533+
except httpx.TimeoutException as e:
534+
raise A2AClientError('Client Request timed out') from e
535+
except httpx.RequestError as e:
536+
raise A2AClientError(f'Network communication error: {e}') from e
537+
except json.JSONDecodeError as e:
538+
raise A2AClientError(str(e)) from e

0 commit comments

Comments
 (0)