|
1 | 1 | import asyncio |
2 | | -import collections |
3 | | -import json |
4 | 2 | import socket |
5 | | -import time |
6 | 3 | from typing import Any, Dict, Optional |
7 | 4 |
|
8 | 5 | import httpx |
9 | 6 | from resource_secretary.providers import discover_providers |
10 | 7 | from resource_secretary.providers.mock import discover_mock_providers |
11 | 8 | from rich import print |
12 | 9 |
|
13 | | -import mcpserver.utils as utils |
14 | 10 | from mcpserver.logger import logger |
15 | 11 |
|
| 12 | +from .base import WorkerBase |
16 | 13 |
|
17 | | -class WorkerManager: |
| 14 | + |
| 15 | +class WorkerManager(WorkerBase): |
18 | 16 | """ |
19 | 17 | A generic worker mcpserver that discovers its own capabilities |
20 | 18 | and context using the resource-secretary library. |
@@ -93,92 +91,6 @@ def parse_labels(self, label_list: Optional[list]) -> dict: |
93 | 91 | labels[k.strip()] = v.strip() |
94 | 92 | return labels |
95 | 93 |
|
96 | | - def register_agent_tools(self): |
97 | | - """ |
98 | | - Registers the core negotiation tools with the FastMCP instance. |
99 | | - """ |
100 | | - |
101 | | - @self.mcp.tool(name="get_status") |
102 | | - async def get_status() -> dict: |
103 | | - """ |
104 | | - Returns the Level 1 Static Manifest of this cluster. |
105 | | - Use this to verify hardware, software providers, and site info. |
106 | | - """ |
107 | | - return { |
108 | | - "worker_id": self.worker_id, |
109 | | - "timestamp": time.time(), |
110 | | - "manifest": self.manifest, |
111 | | - } |
112 | | - |
113 | | - @self.mcp.tool(name="ask_secretary") |
114 | | - async def ask_secretary(request: str) -> dict: |
115 | | - """ |
116 | | - Wakes up the local Secretary Agent to perform a Level 2 investigation. |
117 | | - Use this to ask about specific software availability, queue depth, or node health. |
118 | | - """ |
119 | | - from resource_secretary.agents.secretary import SecretaryAgent |
120 | | - |
121 | | - # Flatten the catalog into a list of active provider instances |
122 | | - active_providers = [inst for category in self.catalog.values() for inst in category] |
123 | | - |
124 | | - # Verbose mode returns a second block with CALLS |
125 | | - agent = SecretaryAgent(active_providers, verbose=self.verbose) |
126 | | - proposal = await agent.negotiate(request) |
127 | | - return {"worker_id": self.worker_id, "proposal": proposal} |
128 | | - |
129 | | - @self.mcp.tool(name="submit") |
130 | | - async def receive_job(request: str) -> dict: |
131 | | - """ |
132 | | - Receive a job. Accepts a job request, invokes the local Secretary to |
133 | | - generate a spec, submit it, and verify the job ID. |
134 | | - """ |
135 | | - from resource_secretary.agents.secretary import SecretaryAgent |
136 | | - |
137 | | - active_providers = [inst for cat in self.catalog.values() for inst in cat] |
138 | | - |
139 | | - agent = SecretaryAgent(active_providers) |
140 | | - raw_result = await agent.submit(request) |
141 | | - try: |
142 | | - receipt = json.loads(utils.extract_code_block(raw_result)) |
143 | | - except: |
144 | | - receipt = {"status": "FAILED", "reasoning": raw_result} |
145 | | - |
146 | | - return {"worker_id": self.worker_id, "receipt": receipt} |
147 | | - |
148 | | - @self.mcp.tool(name="export_provider_metadata") |
149 | | - def export_provider_metadata() -> str: |
150 | | - """ |
151 | | - Iterates through all providers and returns their internal 'truth' state. |
152 | | - This tool is 'hidden' from the Secretary Agent but used by the Hub. |
153 | | - """ |
154 | | - truth_map = {} |
155 | | - tool_registry = collections.defaultdict(list) |
156 | | - |
157 | | - # Self.catalog is a dict: {"software": [MockSpackProvider, ...]} |
158 | | - for category, providers in self.catalog.items(): |
159 | | - truth_map[category] = {} |
160 | | - for p in providers: |
161 | | - # We check if the provider has the export_truth method |
162 | | - if hasattr(p, "export_truth"): |
163 | | - truth_map[category][p.name] = p.export_truth() |
164 | | - else: |
165 | | - # Fallback to standard metadata if not a mock |
166 | | - truth_map[category][p.name] = p.metadata |
167 | | - |
168 | | - # Capture all Secretary Tools for this provider |
169 | | - # We can use this for simulations to assess what the agent |
170 | | - # should have called (vs. what it did) |
171 | | - manifest = p.discover_tools(tool_types=["secretary"]) |
172 | | - for tool_name in manifest.keys(): |
173 | | - tool_registry[category].append(f"{p.name}.{tool_name}") |
174 | | - |
175 | | - metadata = {"truth": truth_map, "registry": dict(tool_registry)} |
176 | | - |
177 | | - # If we have an archetype (mocking something) save it |
178 | | - if hasattr(p, "archetype"): |
179 | | - metadata["metadata"] = {"archetype": p.archetype.name} |
180 | | - return json.dumps(metadata, indent=2) |
181 | | - |
182 | 94 | async def run_registration(self): |
183 | 95 | """ |
184 | 96 | Registers the worker with the Hub. |
|
0 commit comments