Skip to content

Commit 55dc2fa

Browse files
committed
design: common base for worker and hub
We should have a common base with worker functions to make it cleaner for a hub to serve as a worker. Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 3640aac commit 55dc2fa

4 files changed

Lines changed: 139 additions & 104 deletions

File tree

mcpserver/cli/start.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from mcpserver.app import init_mcp
1616
from mcpserver.cli.manager import get_manager
1717
from mcpserver.core.config import MCPConfig
18-
from mcpserver.core.hub import HubManager
18+
from mcpserver.core.hub import DualHubManager, HubManager
1919
from mcpserver.core.worker import WorkerManager
2020
from mcpserver.logger import logger
2121

@@ -64,7 +64,9 @@ def main(args, extra, **kwargs):
6464
app = FastAPI(title="MCP Server", lifespan=mcp_app.lifespan)
6565

6666
# Setup Hub (parent role)
67-
if args.hub:
67+
if args.dual:
68+
mcp.hub_manager = DualHubManager.from_args(mcp, args)
69+
elif args.hub:
6870
mcp.hub_manager = HubManager.from_args(mcp, args)
6971

7072
# Setup Worker (child role) - triggered by --join. We require join secret.
@@ -86,8 +88,8 @@ async def lifespan(app: FastAPI):
8688

8789
app = FastAPI(title="MCP Server", lifespan=lifespan)
8890

89-
# Bind the /register endpoint if we are a Hub
90-
if args.hub:
91+
# Bind the /register endpoint if we are a Hub (or Hub and Worker)
92+
if args.hub or args.dual:
9193
mcp.hub_manager.bind_to_app(app)
9294

9395
# Mount the MCP server. Note from V: we can use mount with antother FastMCP

mcpserver/core/base.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import collections
2+
import json
3+
import time
4+
5+
import mcpserver.utils as utils
6+
7+
8+
class WorkerBase:
9+
"""
10+
A WorkerBase provides worker interaction functions, e.g., negotiate, status,
11+
ask secretary. We provide it here so that a hub can use it to generate
12+
its dual mode (acting as worker AND hub.)
13+
"""
14+
15+
def register_agent_tools(self):
16+
"""
17+
Registers the core negotiation tools with the FastMCP instance.
18+
"""
19+
20+
@self.mcp.tool(name="get_status")
21+
async def get_status() -> dict:
22+
"""
23+
Returns the Level 1 Static Manifest of this cluster.
24+
Use this to verify hardware, software providers, and site info.
25+
"""
26+
return {
27+
"worker_id": self.worker_id,
28+
"timestamp": time.time(),
29+
"manifest": self.manifest,
30+
}
31+
32+
@self.mcp.tool(name="ask_secretary")
33+
async def ask_secretary(request: str) -> dict:
34+
"""
35+
Wakes up the local Secretary Agent to perform a Level 2 investigation.
36+
Use this to ask about specific software availability, queue depth, or node health.
37+
"""
38+
from resource_secretary.agents.secretary import SecretaryAgent
39+
40+
# Flatten the catalog into a list of active provider instances
41+
active_providers = [inst for category in self.catalog.values() for inst in category]
42+
43+
# Verbose mode returns a second block with CALLS
44+
agent = SecretaryAgent(active_providers, verbose=self.verbose)
45+
proposal = await agent.negotiate(request)
46+
return {"worker_id": self.worker_id, "proposal": proposal}
47+
48+
@self.mcp.tool(name="submit")
49+
async def receive_job(request: str) -> dict:
50+
"""
51+
Receive a job. Accepts a job request, invokes the local Secretary to
52+
generate a spec, submit it, and verify the job ID.
53+
"""
54+
from resource_secretary.agents.secretary import SecretaryAgent
55+
56+
active_providers = [inst for cat in self.catalog.values() for inst in cat]
57+
58+
agent = SecretaryAgent(active_providers)
59+
raw_result = await agent.submit(request)
60+
try:
61+
receipt = json.loads(utils.extract_code_block(raw_result))
62+
except:
63+
receipt = {"status": "FAILED", "reasoning": raw_result}
64+
65+
return {"worker_id": self.worker_id, "receipt": receipt}
66+
67+
@self.mcp.tool(name="export_provider_metadata")
68+
def export_provider_metadata() -> str:
69+
"""
70+
Iterates through all providers and returns their internal 'truth' state.
71+
This tool is 'hidden' from the Secretary Agent but used by the Hub.
72+
"""
73+
truth_map = {}
74+
tool_registry = collections.defaultdict(list)
75+
76+
# Self.catalog is a dict: {"software": [MockSpackProvider, ...]}
77+
for category, providers in self.catalog.items():
78+
truth_map[category] = {}
79+
for p in providers:
80+
# We check if the provider has the export_truth method
81+
if hasattr(p, "export_truth"):
82+
truth_map[category][p.name] = p.export_truth()
83+
else:
84+
# Fallback to standard metadata if not a mock
85+
truth_map[category][p.name] = p.metadata
86+
87+
# Capture all Secretary Tools for this provider
88+
# We can use this for simulations to assess what the agent
89+
# should have called (vs. what it did)
90+
manifest = p.discover_tools(tool_types=["secretary"])
91+
for tool_name in manifest.keys():
92+
tool_registry[category].append(f"{p.name}.{tool_name}")
93+
94+
metadata = {"truth": truth_map, "registry": dict(tool_registry)}
95+
96+
# If we have an archetype (mocking something) save it
97+
if hasattr(p, "archetype"):
98+
metadata["metadata"] = {"archetype": p.archetype.name}
99+
return json.dumps(metadata, indent=2)

mcpserver/core/hub.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import mcpserver.utils as utils
1414
from mcpserver.logger import logger
1515

16+
from .base import WorkerBase
17+
1618

1719
class HubManager:
1820
"""
@@ -30,10 +32,12 @@ def __init__(
3032
serial=False,
3133
dual=False,
3234
hub_id=None,
35+
path="/mcp",
3336
):
3437
self.mcp = mcp
3538
self.host = host
3639
self.port = port
40+
self.path = path
3741
self.secret = secret or secrets.token_urlsafe(32)
3842
self.workers: Dict[str, Dict[str, Any]] = {}
3943
self.hub_id = hub_id or socket.gethostname()
@@ -81,21 +85,15 @@ def set_running_mode(self, batch_size=None, serial=False, dual=False):
8185
logger.info(f"🚦 Hub initialized with Batch Size: {batch_size}")
8286

8387
# If we are also running as a worker, add ourselves to the fleet
84-
if not dual:
85-
return
86-
87-
hub_id = self.hub_id or socket.gethostname()
88-
self.workers[hub_id] = {
89-
"url": self.registration_url,
90-
"client": Client(self.registration_url),
91-
}
88+
self.dual = dual
9289

9390
@classmethod
9491
def from_args(cls, mcp, args) -> Optional["HubManager"]:
9592
"""
9693
Create a HubManager from CLI arguments.
9794
"""
98-
if not getattr(args, "hub", False):
95+
# Running in hub or dual mode?
96+
if not getattr(args, "hub", False) and not getattr(args, "dual", False):
9997
return None
10098
return cls(
10199
mcp,
@@ -105,6 +103,8 @@ def from_args(cls, mcp, args) -> Optional["HubManager"]:
105103
batch=args.batch,
106104
serial=args.serial,
107105
dual=args.dual,
106+
# server path
107+
path=args.path,
108108
)
109109

110110
def _print_banner(self):
@@ -386,3 +386,25 @@ def _create_proxy(self, worker_id: str, tool: Tool):
386386

387387
except Exception as e:
388388
logger.error(f"❌ Failed to generate dynamic proxy for {tool.name}: {e}")
389+
390+
391+
class DualHubManager(WorkerBase, HubManager):
392+
"""
393+
Combined hub and worker base. Aka, a hub that also serves as a worker
394+
"""
395+
396+
def __init__(self, *args, **kwargs):
397+
# Calls super on the HubManager. WorkerBase has no init
398+
super().__init__(*args, **kwargs)
399+
self.setup_dual()
400+
401+
def setup_dual(self):
402+
"""
403+
Setup dual mode, which means adding ourselves to the fleet.
404+
"""
405+
hub_id = self.hub_id or socket.gethostname()
406+
default_url = f"http://{self.host}:{self.port}{self.path}"
407+
self.workers[hub_id] = {
408+
"url": self.registration_url,
409+
"client": Client(default_url),
410+
}

mcpserver/core/worker.py

Lines changed: 3 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
import asyncio
2-
import collections
3-
import json
42
import socket
5-
import time
63
from typing import Any, Dict, Optional
74

85
import httpx
96
from resource_secretary.providers import discover_providers
107
from resource_secretary.providers.mock import discover_mock_providers
118
from rich import print
129

13-
import mcpserver.utils as utils
1410
from mcpserver.logger import logger
1511

12+
from .base import WorkerBase
1613

17-
class WorkerManager:
14+
15+
class WorkerManager(WorkerBase):
1816
"""
1917
A generic worker mcpserver that discovers its own capabilities
2018
and context using the resource-secretary library.
@@ -93,92 +91,6 @@ def parse_labels(self, label_list: Optional[list]) -> dict:
9391
labels[k.strip()] = v.strip()
9492
return labels
9593

96-
def register_agent_tools(self):
97-
"""
98-
Registers the core negotiation tools with the FastMCP instance.
99-
"""
100-
101-
@self.mcp.tool(name="get_status")
102-
async def get_status() -> dict:
103-
"""
104-
Returns the Level 1 Static Manifest of this cluster.
105-
Use this to verify hardware, software providers, and site info.
106-
"""
107-
return {
108-
"worker_id": self.worker_id,
109-
"timestamp": time.time(),
110-
"manifest": self.manifest,
111-
}
112-
113-
@self.mcp.tool(name="ask_secretary")
114-
async def ask_secretary(request: str) -> dict:
115-
"""
116-
Wakes up the local Secretary Agent to perform a Level 2 investigation.
117-
Use this to ask about specific software availability, queue depth, or node health.
118-
"""
119-
from resource_secretary.agents.secretary import SecretaryAgent
120-
121-
# Flatten the catalog into a list of active provider instances
122-
active_providers = [inst for category in self.catalog.values() for inst in category]
123-
124-
# Verbose mode returns a second block with CALLS
125-
agent = SecretaryAgent(active_providers, verbose=self.verbose)
126-
proposal = await agent.negotiate(request)
127-
return {"worker_id": self.worker_id, "proposal": proposal}
128-
129-
@self.mcp.tool(name="submit")
130-
async def receive_job(request: str) -> dict:
131-
"""
132-
Receive a job. Accepts a job request, invokes the local Secretary to
133-
generate a spec, submit it, and verify the job ID.
134-
"""
135-
from resource_secretary.agents.secretary import SecretaryAgent
136-
137-
active_providers = [inst for cat in self.catalog.values() for inst in cat]
138-
139-
agent = SecretaryAgent(active_providers)
140-
raw_result = await agent.submit(request)
141-
try:
142-
receipt = json.loads(utils.extract_code_block(raw_result))
143-
except:
144-
receipt = {"status": "FAILED", "reasoning": raw_result}
145-
146-
return {"worker_id": self.worker_id, "receipt": receipt}
147-
148-
@self.mcp.tool(name="export_provider_metadata")
149-
def export_provider_metadata() -> str:
150-
"""
151-
Iterates through all providers and returns their internal 'truth' state.
152-
This tool is 'hidden' from the Secretary Agent but used by the Hub.
153-
"""
154-
truth_map = {}
155-
tool_registry = collections.defaultdict(list)
156-
157-
# Self.catalog is a dict: {"software": [MockSpackProvider, ...]}
158-
for category, providers in self.catalog.items():
159-
truth_map[category] = {}
160-
for p in providers:
161-
# We check if the provider has the export_truth method
162-
if hasattr(p, "export_truth"):
163-
truth_map[category][p.name] = p.export_truth()
164-
else:
165-
# Fallback to standard metadata if not a mock
166-
truth_map[category][p.name] = p.metadata
167-
168-
# Capture all Secretary Tools for this provider
169-
# We can use this for simulations to assess what the agent
170-
# should have called (vs. what it did)
171-
manifest = p.discover_tools(tool_types=["secretary"])
172-
for tool_name in manifest.keys():
173-
tool_registry[category].append(f"{p.name}.{tool_name}")
174-
175-
metadata = {"truth": truth_map, "registry": dict(tool_registry)}
176-
177-
# If we have an archetype (mocking something) save it
178-
if hasattr(p, "archetype"):
179-
metadata["metadata"] = {"archetype": p.archetype.name}
180-
return json.dumps(metadata, indent=2)
181-
18294
async def run_registration(self):
18395
"""
18496
Registers the worker with the Hub.

0 commit comments

Comments
 (0)