-
Notifications
You must be signed in to change notification settings - Fork 425
Expand file tree
/
Copy pathbase.py
More file actions
128 lines (113 loc) · 3.64 KB
/
base.py
File metadata and controls
128 lines (113 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Callable
from types import TracebackType
from typing_extensions import Self
from a2a.client.middleware import ClientCallContext
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
Message,
MessageSendParams,
Task,
TaskArtifactUpdateEvent,
TaskIdParams,
TaskPushNotificationConfig,
TaskQueryParams,
TaskStatusUpdateEvent,
)
class ClientTransport(ABC):
"""Abstract base class for a client transport."""
async def __aenter__(self) -> Self:
"""Enters the async context manager, returning the transport itself."""
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exits the async context manager, ensuring close() is called."""
await self.close()
@abstractmethod
async def send_message(
self,
request: MessageSendParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task | Message:
"""Sends a non-streaming message request to the agent."""
@abstractmethod
async def send_message_streaming(
self,
request: MessageSendParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncGenerator[
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
]:
"""Sends a streaming message request to the agent and yields responses as they arrive."""
return
yield
@abstractmethod
async def get_task(
self,
request: TaskQueryParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Retrieves the current state and history of a specific task."""
@abstractmethod
async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""
@abstractmethod
async def set_task_callback(
self,
request: TaskPushNotificationConfig,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task."""
@abstractmethod
async def get_task_callback(
self,
request: GetTaskPushNotificationConfigParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task."""
@abstractmethod
async def resubscribe(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncGenerator[
Task | Message | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
]:
"""Reconnects to get task updates."""
return
yield
@abstractmethod
async def get_card(
self,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
signature_verifier: Callable[[AgentCard], None] | None = None,
) -> AgentCard:
"""Retrieves the AgentCard."""
@abstractmethod
async def close(self) -> None:
"""Closes the transport."""