-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathagent.py
More file actions
124 lines (86 loc) · 2.85 KB
/
agent.py
File metadata and controls
124 lines (86 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json
import os
from enum import Enum
from typing import Any
from ai import get_completion
from chat_proto import chat_proto
from uagents import Agent, Context, Model
from uagents.experimental.quota import Protocol, QuotaProtocol
from uagents_core.models import ErrorMessage
AGENT_SEED = os.getenv("AGENT_SEED", "openai-test-agent")
AGENT_NAME = os.getenv("AGENT_NAME", "OpenAI Agent")
class ContextPrompt(Model):
context: str
text: str
class Response(Model):
text: str
class StructuredOutputPrompt(Model):
prompt: str
output_schema: dict[str, Any]
class StructuredOutputResponse(Model):
output: dict[str, Any]
PORT = 8000
agent = Agent(
name=AGENT_NAME,
seed=AGENT_SEED,
port=PORT,
endpoint=f"http://localhost:{PORT}/submit",
)
proto = Protocol(
name="OpenRouter-LLM-Context-Response",
version="0.1.0",
)
struct_proto = Protocol(
name="OpenRouter-LLM-Structured-Response",
version="0.1.0",
)
@proto.on_message(ContextPrompt, replies={Response, ErrorMessage})
async def handle_request(ctx: Context, sender: str, msg: ContextPrompt):
response = get_completion(context=msg.context, prompt=msg.text)
await ctx.send(sender, Response(text=response))
@struct_proto.on_message(
StructuredOutputPrompt, replies={StructuredOutputResponse, ErrorMessage}
)
async def handle_structured_request(
ctx: Context, sender: str, msg: StructuredOutputPrompt
):
response = get_completion(
context="", prompt=msg.prompt, response_schema=msg.output_schema
)
await ctx.send(sender, StructuredOutputResponse(output=json.loads(response)))
agent.include(proto, publish_manifest=True)
agent.include(struct_proto, publish_manifest=True)
agent.include(chat_proto, publish_manifest=True)
### Health check related code
def agent_is_healthy() -> bool:
"""
Implement the actual health check logic here.
For example, check if the agent can connect to a third party API,
check if the agent has enough resources, etc.
"""
condition = True # TODO: logic here
return bool(condition)
class HealthCheck(Model):
pass
class HealthStatus(str, Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
class AgentHealth(Model):
agent_name: str
status: HealthStatus
health_protocol = QuotaProtocol(
storage_reference=agent.storage, name="HealthProtocol", version="0.1.0"
)
@health_protocol.on_message(HealthCheck, replies={AgentHealth})
async def handle_health_check(ctx: Context, sender: str, msg: HealthCheck):
status = HealthStatus.UNHEALTHY
try:
if agent_is_healthy():
status = HealthStatus.HEALTHY
except Exception as err:
ctx.logger.error(err)
finally:
await ctx.send(sender, AgentHealth(agent_name=AGENT_NAME, status=status))
agent.include(health_protocol, publish_manifest=True)
if __name__ == "__main__":
agent.run()