-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_manager.py
More file actions
163 lines (138 loc) · 5.96 KB
/
agent_manager.py
File metadata and controls
163 lines (138 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Agent Manager
Singleton class to manage agent state and synchronization.
Replaces global state and locks with a cleaner encapsulated approach.
"""
import threading
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional, List, Set, Any
from collections import OrderedDict
class AgentManager:
def __init__(self, max_completed: int = 100):
self._lock = threading.Lock()
self._running: Dict[str, Dict[str, Any]] = {}
self._completed: OrderedDict[str, Dict[str, Any]] = OrderedDict()
self._threads: Dict[str, threading.Thread] = {}
self._events: Dict[str, threading.Event] = {} # Events for sync
self._max_completed = max_completed
def register_start(self, agent_type: str, model: str, task: str) -> str:
"""Register a new agent start. Returns agent_id."""
agent_id = uuid.uuid4().hex[:8]
now_iso = datetime.now(timezone.utc).isoformat().split('.')[0]
with self._lock:
self._running[agent_id] = {
"agent_id": agent_id,
"agent_type": agent_type,
"model": model,
"started_at": now_iso,
"task": task[:100].replace('\n', ' ')
}
self._events[agent_id] = threading.Event()
return agent_id
def register_thread(self, agent_id: str, thread: threading.Thread):
"""Associate a background thread with an agent."""
with self._lock:
self._threads[agent_id] = thread
def register_complete(self, agent_id: str, result_data: Dict[str, Any]):
"""Move agent from running to completed."""
with self._lock:
if agent_id in self._running:
# Merge basic info with result data
info = self._running.pop(agent_id)
# Keep critical start info if not in result
if "started_at" not in result_data:
result_data["started_at"] = info["started_at"]
if "task" not in result_data:
result_data["task"] = info["task"]
# Ensure ID matches
result_data["agent_id"] = agent_id
result_data["completed_at"] = datetime.now(timezone.utc).isoformat().split('.')[0]
self._completed[agent_id] = result_data
# Prune old history
while len(self._completed) > self._max_completed:
self._completed.popitem(last=False)
# Clean up thread ref
self._threads.pop(agent_id, None)
# Signal completion
if agent_id in self._events:
self._events[agent_id].set()
# Don't delete event yet, waiter might need it
def get_running_list(self) -> List[Dict[str, Any]]:
"""Get snapshot of running agents."""
now = datetime.now(timezone.utc)
results = []
with self._lock:
for aid, info in self._running.items():
elapsed = 0
try:
start = datetime.fromisoformat(info["started_at"].replace("Z", "+00:00"))
elapsed = int((now - start).total_seconds())
except Exception:
pass
results.append({"id": aid, "sec": elapsed, "type": info.get("agent_type")})
return results
def get_recent_completed_ids(self, limit: int = 3) -> List[str]:
with self._lock:
keys = list(self._completed.keys())
return keys[-limit:]
def get_result(self, agent_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
return self._completed.get(agent_id)
def get_running_info(self, agent_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
return self._running.get(agent_id)
def wait_for_all(self, timeout: float = 300) -> Dict[str, Any]:
"""
Efficiently wait for all currently running agents.
Returns result dict suitable for tool output.
"""
# Snapshot current running IDs and their events
with self._lock:
waiting_ids = list(self._running.keys())
events = [self._events[aid] for aid in waiting_ids if aid in self._events]
if not waiting_ids:
# Nothing running, return recent
with self._lock:
recent = list(self._completed.values())[-5:]
return {
"status": "no_agents_running",
"recent_results": recent
}
# Wait for all events
# Note: threading.Event.wait is blocking, but we need to wait for MULTIPLE.
# Simple approach: wait for each sequentially with deadline
start_time = datetime.now()
remaining = timeout
for evt in events:
if remaining <= 0:
break
if evt.wait(timeout=remaining):
# Recalculate remaining time
elapsed = (datetime.now() - start_time).total_seconds()
remaining = timeout - elapsed
else:
# Timeout happened on this event
remaining = 0
# Collection time
with self._lock:
results = []
still_running = []
for aid in waiting_ids:
if aid in self._completed:
results.append(self._completed[aid])
else:
still_running.append(aid)
if still_running:
return {
"status": "timeout",
"still_running": still_running,
"completed_results": results
}
else:
return {
"status": "all_completed",
"results": results
}
# Singleton
agent_manager = AgentManager()