-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathagent.py
More file actions
176 lines (131 loc) · 4.46 KB
/
agent.py
File metadata and controls
176 lines (131 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import json
import os
from enum import Enum
from typing import Any
from ai import get_completion
from chat_proto import chat_proto
from uagents import Agent, Context, Model
from uagents.experimental.quota import QuotaProtocol, RateLimit, AccessControlList
from uagents_core.models import ErrorMessage
AGENT_SEED = os.getenv("AGENT_SEED", "gemini-test-agent")
AGENT_NAME = os.getenv("AGENT_NAME", "Google Gemini Agent")
BYPASS_RATE_LIMIT = set([item for item in os.getenv("BYPASS_RATE_LIMIT", "").split(",") if item])
class TextPrompt(Model):
text: str
class CodePrompt(Model):
text: str
class TextResponse(Model):
text: str
class CodeResponse(Model):
text: str
class StructuredOutputPrompt(Model):
prompt: str
output_schema: dict[str, Any]
class StructuredOutputResponse(Model):
output: dict[str, Any]
PORT = 8000
agent = Agent(
name=AGENT_NAME,
seed=AGENT_SEED,
port=PORT,
endpoint=f"http://localhost:{PORT}/submit",
)
acl = AccessControlList(
default=False,
bypass_rate_limit=BYPASS_RATE_LIMIT,
)
text_proto = QuotaProtocol(
storage_reference=agent.storage,
name="LLM-Text-Response",
version="0.1.0",
default_rate_limit=RateLimit(window_size_minutes=60, max_requests=6),
default_acl=acl,
)
code_proto = QuotaProtocol(
storage_reference=agent.storage,
name="LLM-Code-Generator",
version="0.1.0",
default_rate_limit=RateLimit(window_size_minutes=60, max_requests=6),
default_acl=acl,
)
struct_proto = QuotaProtocol(
storage_reference=agent.storage,
name="LLM-Structured-Response",
version="0.1.0",
default_rate_limit=RateLimit(window_size_minutes=60, max_requests=6),
default_acl=acl,
)
@text_proto.on_message(TextPrompt, replies={TextResponse, ErrorMessage})
async def handle_request(ctx: Context, sender: str, msg: TextPrompt):
response = get_completion(msg.text, False)
if response is None:
await ctx.send(
sender,
ErrorMessage(
error="An error occurred while processing the request. Please try again later."
),
)
await ctx.send(sender, TextResponse(text=response))
@code_proto.on_message(CodePrompt, replies={CodeResponse, ErrorMessage})
async def handle_codegen_request(ctx: Context, sender: str, msg: CodePrompt):
response = get_completion(msg.text, True)
if response is None:
await ctx.send(
sender,
ErrorMessage(
error="An error occurred while processing the request. Please try again later."
),
)
await ctx.send(sender, CodeResponse(text=response))
@struct_proto.on_message(
StructuredOutputPrompt, replies={StructuredOutputResponse, ErrorMessage}
)
async def handle_structured_request(
ctx: Context, sender: str, msg: StructuredOutputPrompt
):
response = get_completion(msg.prompt, False, msg.output_schema)
if response is None:
await ctx.send(
sender,
ErrorMessage(
error="An error occurred while processing the request. Please try again later."
),
)
await ctx.send(sender, StructuredOutputResponse(output=json.loads(response)))
agent.include(text_proto, publish_manifest=True)
agent.include(code_proto, publish_manifest=True)
agent.include(struct_proto, publish_manifest=True)
agent.include(chat_proto, publish_manifest=True)
### Health check related code
def agent_is_healthy() -> bool:
"""
Implement the actual health check logic here.
For example, check if the agent can connect to a third party API,
check if the agent has enough resources, etc.
"""
condition = True # TODO: logic here
return bool(condition)
class HealthCheck(Model):
pass
class HealthStatus(str, Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
class AgentHealth(Model):
agent_name: str
status: HealthStatus
health_protocol = QuotaProtocol(
storage_reference=agent.storage, name="HealthProtocol", version="0.1.0"
)
@health_protocol.on_message(HealthCheck, replies={AgentHealth})
async def handle_health_check(ctx: Context, sender: str, msg: HealthCheck):
status = HealthStatus.UNHEALTHY
try:
if agent_is_healthy():
status = HealthStatus.HEALTHY
except Exception as err:
ctx.logger.error(err)
finally:
await ctx.send(sender, AgentHealth(agent_name=AGENT_NAME, status=status))
agent.include(health_protocol, publish_manifest=True)
if __name__ == "__main__":
agent.run()