Skip to content

Commit 49042b2

Browse files
committed
chore: add sample client and server
1 parent a0827d0 commit 49042b2

3 files changed

Lines changed: 371 additions & 0 deletions

File tree

samples/__init__.py

Whitespace-only changes.

samples/cli.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import argparse
2+
import asyncio
3+
import contextlib
4+
import uuid
5+
6+
import httpx
7+
8+
from a2a.client import A2ACardResolver, ClientConfig, ClientFactory
9+
from a2a.types import Message, Part, Role, SendMessageRequest, TaskState
10+
11+
12+
async def main() -> None:
13+
"""Run the A2A terminal client."""
14+
parser = argparse.ArgumentParser(description='A2A Terminal Client')
15+
parser.add_argument(
16+
'--url', default='http://127.0.0.1:41241', help='Agent base URL'
17+
)
18+
parser.add_argument(
19+
'--transport',
20+
default=None,
21+
help='Preferred transport (JSONRPC, HTTP+JSON, GRPC)',
22+
)
23+
args = parser.parse_args()
24+
25+
config = ClientConfig()
26+
if args.transport:
27+
config.supported_protocol_bindings = [args.transport]
28+
29+
print(
30+
f'Connecting to {args.url} (preferred transport: {args.transport or "Any"})'
31+
)
32+
33+
async with httpx.AsyncClient() as httpx_client:
34+
resolver = A2ACardResolver(httpx_client, args.url)
35+
card = await resolver.get_agent_card()
36+
print('\n✓ Agent Card Found:')
37+
print(f' Name: {card.name}')
38+
39+
client = await ClientFactory.connect(card, client_config=config)
40+
41+
actual_transport = getattr(client, '_transport', client)
42+
print(f' Picked Transport: {actual_transport.__class__.__name__}')
43+
44+
print('\nConnected! Send a message or type /quit to exit.')
45+
46+
current_task_id = None
47+
current_context_id = str(uuid.uuid4())
48+
49+
while True:
50+
try:
51+
user_input = input('You: ')
52+
except KeyboardInterrupt:
53+
break
54+
55+
if user_input.lower() in ('/quit', '/exit'):
56+
break
57+
if not user_input.strip():
58+
continue
59+
60+
message = Message(
61+
role=Role.ROLE_USER,
62+
message_id=str(uuid.uuid4()),
63+
parts=[Part(text=user_input)],
64+
task_id=current_task_id,
65+
context_id=current_context_id,
66+
)
67+
68+
request = SendMessageRequest(message=message)
69+
70+
try:
71+
stream = client.send_message(request)
72+
async for event, task in stream:
73+
if not task:
74+
continue
75+
if not current_task_id:
76+
current_task_id = task.id
77+
78+
if event:
79+
if event.HasField('status_update'):
80+
state_name = TaskState.Name(
81+
event.status_update.status.state
82+
)
83+
print(f'TaskStatusUpdate [{state_name}]:', end=' ')
84+
if event.status_update.status.HasField('message'):
85+
for (
86+
part
87+
) in event.status_update.status.message.parts:
88+
if part.text:
89+
print(part.text, end=' ')
90+
print()
91+
92+
if (
93+
event.status_update.status.state
94+
== TaskState.TASK_STATE_COMPLETED
95+
):
96+
current_task_id = None
97+
print('--- Task Completed ---')
98+
99+
elif event.HasField('artifact_update'):
100+
print(
101+
f'TaskArtifactUpdate [{event.artifact_update.artifact.name}]:',
102+
end=' ',
103+
)
104+
for part in event.artifact_update.artifact.parts:
105+
if part.text:
106+
print(part.text, end=' ')
107+
print()
108+
109+
except Exception as e:
110+
print(f'Error communicating with agent: {e}')
111+
112+
await client.close()
113+
114+
115+
if __name__ == '__main__':
116+
with contextlib.suppress(KeyboardInterrupt, asyncio.CancelledError):
117+
asyncio.run(main())

samples/hello_world_agent.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
import asyncio
2+
import contextlib
3+
import logging
4+
5+
from typing import Any
6+
7+
import grpc
8+
import uvicorn
9+
10+
from fastapi import FastAPI
11+
from google.protobuf.json_format import MessageToDict
12+
13+
from a2a.compat.v0_3 import a2a_v0_3_pb2_grpc
14+
from a2a.compat.v0_3.grpc_handler import CompatGrpcHandler
15+
from a2a.server.agent_execution.agent_executor import AgentExecutor
16+
from a2a.server.agent_execution.context import RequestContext
17+
from a2a.server.apps import A2AFastAPIApplication, A2ARESTFastAPIApplication
18+
from a2a.server.events.event_queue import EventQueue
19+
from a2a.server.request_handlers import GrpcHandler
20+
from a2a.server.request_handlers.default_request_handler import (
21+
DefaultRequestHandler,
22+
)
23+
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
24+
from a2a.server.tasks.task_updater import TaskUpdater
25+
from a2a.types import (
26+
AgentCapabilities,
27+
AgentCard,
28+
AgentInterface,
29+
AgentProvider,
30+
AgentSkill,
31+
Part,
32+
a2a_pb2_grpc,
33+
)
34+
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
class SampleAgentExecutor(AgentExecutor):
40+
"""Sample agent executor logic similar to the a2a-js sample."""
41+
42+
def __init__(self) -> None:
43+
self.running_tasks: set[str] = set()
44+
45+
async def cancel(
46+
self, context: RequestContext, event_queue: EventQueue
47+
) -> None:
48+
"""Cancels a task."""
49+
task_id = context.task_id
50+
if task_id in self.running_tasks:
51+
self.running_tasks.remove(task_id)
52+
53+
updater = TaskUpdater(
54+
event_queue=event_queue,
55+
task_id=task_id or '',
56+
context_id=context.context_id or '',
57+
)
58+
await updater.cancel()
59+
60+
async def execute(
61+
self, context: RequestContext, event_queue: EventQueue
62+
) -> None:
63+
"""Executes a task inline."""
64+
user_message = context.message
65+
task_id = context.task_id
66+
context_id = context.context_id
67+
68+
if not user_message or not task_id or not context_id:
69+
return
70+
71+
self.running_tasks.add(task_id)
72+
73+
logger.info(
74+
'[SampleAgentExecutor] Processing message %s for task %s (context: %s)',
75+
user_message.message_id,
76+
task_id,
77+
context_id,
78+
)
79+
80+
updater = TaskUpdater(
81+
event_queue=event_queue,
82+
task_id=task_id,
83+
context_id=context_id,
84+
)
85+
86+
working_message = updater.new_agent_message(
87+
parts=[Part(text='Processing your question...')]
88+
)
89+
await updater.start_work(message=working_message)
90+
91+
query = context.get_user_input()
92+
93+
agent_reply_text = self._parse_input(query)
94+
await asyncio.sleep(1)
95+
96+
if task_id not in self.running_tasks:
97+
return
98+
99+
await updater.add_artifact(
100+
parts=[Part(text=agent_reply_text)],
101+
name='response',
102+
last_chunk=True,
103+
)
104+
await updater.complete()
105+
106+
logger.info(
107+
'[SampleAgentExecutor] Task %s finished with state: completed',
108+
task_id,
109+
)
110+
111+
def _parse_input(self, query: str) -> str:
112+
if not query:
113+
return 'Hello! Please provide a message for me to respond to.'
114+
115+
ql = query.lower()
116+
if 'hello' in ql or 'hi' in ql:
117+
return 'Hello World! Nice to meet you!'
118+
if 'how are you' in ql:
119+
return (
120+
"I'm doing great! Thanks for asking. How can I help you today?"
121+
)
122+
if 'goodbye' in ql or 'bye' in ql:
123+
return 'Goodbye! Have a wonderful day!'
124+
return f"Hello World! You said: '{query}'. Thanks for your message!"
125+
126+
127+
async def serve(
128+
host: str = '127.0.0.1',
129+
port: int = 41241,
130+
grpc_port: int = 50051,
131+
compat_grpc_port: int = 50052,
132+
) -> None:
133+
"""Run the Sample Agent server with mounted JSON-RPC, HTTP+JSON and gRPC transports."""
134+
agent_card = AgentCard(
135+
name='Sample Agent',
136+
description='A sample agent to test the stream functionality.',
137+
provider=AgentProvider(
138+
organization='A2A Samples', url='https://example.com'
139+
),
140+
version='1.0.0',
141+
capabilities=AgentCapabilities(
142+
streaming=True, push_notifications=False
143+
),
144+
default_input_modes=['text'],
145+
default_output_modes=['text', 'task-status'],
146+
skills=[
147+
AgentSkill(
148+
id='sample_agent',
149+
name='Sample Agent',
150+
description='Say hi.',
151+
tags=['sample'],
152+
examples=['hi'],
153+
input_modes=['text'],
154+
output_modes=['text', 'task-status'],
155+
)
156+
],
157+
supported_interfaces=[
158+
AgentInterface(
159+
protocol_binding='GRPC',
160+
protocol_version='1.0',
161+
url=f'{host}:{grpc_port}',
162+
),
163+
AgentInterface(
164+
protocol_binding='GRPC',
165+
protocol_version='0.3',
166+
url=f'{host}:{compat_grpc_port}',
167+
),
168+
AgentInterface(
169+
protocol_binding='JSONRPC',
170+
protocol_version='1.0',
171+
url=f'http://{host}:{port}/a2a/jsonrpc/',
172+
),
173+
AgentInterface(
174+
protocol_binding='JSONRPC',
175+
protocol_version='0.3',
176+
url=f'http://{host}:{port}/a2a/jsonrpc/',
177+
),
178+
AgentInterface(
179+
protocol_binding='HTTP+JSON',
180+
protocol_version='1.0',
181+
url=f'http://{host}:{port}/a2a/rest/',
182+
),
183+
AgentInterface(
184+
protocol_binding='HTTP+JSON',
185+
protocol_version='0.3',
186+
url=f'http://{host}:{port}/a2a/rest/',
187+
),
188+
],
189+
)
190+
191+
task_store = InMemoryTaskStore()
192+
request_handler = DefaultRequestHandler(
193+
agent_executor=SampleAgentExecutor(), task_store=task_store
194+
)
195+
196+
rest_app_builder = A2ARESTFastAPIApplication(
197+
agent_card=agent_card,
198+
http_handler=request_handler,
199+
enable_v0_3_compat=True,
200+
)
201+
rest_app = rest_app_builder.build()
202+
203+
jsonrpc_app_builder = A2AFastAPIApplication(
204+
agent_card=agent_card,
205+
http_handler=request_handler,
206+
enable_v0_3_compat=True,
207+
)
208+
jsonrpc_app = jsonrpc_app_builder.build()
209+
210+
main_app = FastAPI()
211+
main_app.mount('/a2a/jsonrpc', jsonrpc_app)
212+
main_app.mount('/a2a/rest', rest_app)
213+
214+
@main_app.get('/.well-known/agent-card.json')
215+
def get_agent_card() -> Any:
216+
"""Return the agent card metadata used for automatic client connection."""
217+
return MessageToDict(agent_card, preserving_proto_field_name=False)
218+
219+
grpc_server = grpc.aio.server()
220+
grpc_server.add_insecure_port(f'{host}:{grpc_port}')
221+
servicer = GrpcHandler(agent_card, request_handler)
222+
a2a_pb2_grpc.add_A2AServiceServicer_to_server(servicer, grpc_server)
223+
224+
compat_grpc_server = grpc.aio.server()
225+
compat_grpc_server.add_insecure_port(f'{host}:{compat_grpc_port}')
226+
compat_servicer = CompatGrpcHandler(agent_card, request_handler)
227+
a2a_v0_3_pb2_grpc.add_A2AServiceServicer_to_server(
228+
compat_servicer, compat_grpc_server
229+
)
230+
231+
config = uvicorn.Config(main_app, host=host, port=port)
232+
uvicorn_server = uvicorn.Server(config)
233+
234+
logger.info('Starting Sample Agent servers:')
235+
logger.info(' - HTTP on http://%s:%s', host, port)
236+
logger.info(' - gRPC on %s:%s', host, grpc_port)
237+
logger.info(' - gRPC (v0.3 compat) on %s:%s', host, compat_grpc_port)
238+
logger.info(
239+
'Agent Card available at http://%s:%s/.well-known/agent-card.json',
240+
host,
241+
port,
242+
)
243+
244+
await asyncio.gather(
245+
grpc_server.start(),
246+
compat_grpc_server.start(),
247+
uvicorn_server.serve(),
248+
)
249+
250+
251+
if __name__ == '__main__':
252+
logging.basicConfig(level=logging.INFO)
253+
with contextlib.suppress(KeyboardInterrupt):
254+
asyncio.run(serve())

0 commit comments

Comments
 (0)