Skip to content

Commit db4e84e

Browse files
committed
feat: update README and refactor module imports for worker pool
- Added documentation for the multi-process pool feature in the README, detailing usage with Rust workers and local pool management. - Refactored import functions in main.py to load worker pool submodules from the `devsper.pool` namespace, improving clarity and organization. - Updated Redis URL resolution and pool configuration loading to align with the new import structure.
1 parent 7995c61 commit db4e84e

25 files changed

Lines changed: 1258 additions & 18 deletions

Dockerfile.local-pool

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Local pool supervisor: runs `devsper.pool.local_pool`, which spawns N `devsper-worker` processes.
2+
# Build: docker build -f Dockerfile.local-pool -t devsper-local-pool .
3+
# Context must be the `runtime/` directory.
4+
5+
FROM rust:1.82-slim AS rust-builder
6+
RUN apt-get update && apt-get install -y --no-install-recommends pkg-config libssl-dev \
7+
&& rm -rf /var/lib/apt/lists/*
8+
WORKDIR /build
9+
COPY worker/Cargo.toml worker/Cargo.lock ./
10+
COPY worker/src ./src
11+
RUN cargo build --release --features subprocess-executor
12+
13+
FROM python:3.12-slim
14+
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates libssl3 curl \
15+
&& rm -rf /var/lib/apt/lists/*
16+
COPY --from=rust-builder /build/target/release/devsper-worker /usr/local/bin/
17+
WORKDIR /app
18+
COPY pyproject.toml README.md LICENSE ./
19+
COPY devsper ./devsper
20+
RUN pip install --no-cache-dir -e ".[distributed]"
21+
ENV DEVSPER_PROFILE=local
22+
CMD ["python", "-m", "devsper.pool.local_pool", "--workers", "2"]

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,8 @@ uv run python examples/distributed/run_controller.py "Your task" --parallel
271271

272272
Rust workers: set `DEVSPER_WORKER_MODEL=github:gpt-4o` (or your model), `DEVSPER_PYTHON_BIN=.venv/bin/python`, `DEVSPER_RPC_PORT=0` for multiple workers on one host. Credentials load from keychain in the subprocess.
273273

274+
**Multi-process pool (Rust default):** with `devsper[distributed]` installed, `python -m devsper.pool.local_pool --workers N` supervises *N* `devsper-worker` processes against Redis (`REDIS_URL`). Override the worker command in `devsper/pool/profiles/local.toml` (`local_worker_cmd`) if you need the Python `devsper.pool.worker_runner` for debugging. The monorepo builds a combined image via `runtime/Dockerfile.local-pool` (used as `pool-manager` in the root `docker-compose.yml`).
275+
274276
---
275277

276278
## Examples

devsper/cli/main.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,9 @@ def _project_root() -> Path:
4040
return Path(__file__).resolve().parent.parent.parent
4141

4242

43-
def _import_platform_pool(submodule: str):
44-
"""Import ``platform.pool.<submodule>`` with repo root on sys.path; avoid stdlib ``platform``."""
45-
root = _project_root().parent
46-
if str(root) not in sys.path:
47-
sys.path.insert(0, str(root))
48-
if "platform" in sys.modules:
49-
m = sys.modules["platform"]
50-
if not getattr(m, "__path__", None):
51-
del sys.modules["platform"]
52-
return importlib.import_module(f"platform.pool.{submodule}")
43+
def _import_pool_submodule(submodule: str):
44+
"""Import ``devsper.pool.<submodule>`` (distributed worker pool; lives in the runtime package)."""
45+
return importlib.import_module(f"devsper.pool.{submodule}")
5346

5447

5548
def _resolve_pool_redis_url(cli_override: str | None = None, profile: str | None = None) -> str:
@@ -61,7 +54,7 @@ def _resolve_pool_redis_url(cli_override: str | None = None, profile: str | None
6154
prof = (profile or os.environ.get("DEVSPER_PROFILE") or "").strip().lower()
6255
if prof == "local":
6356
try:
64-
config_mod = _import_platform_pool("config")
57+
config_mod = _import_pool_submodule("config")
6558
return config_mod.load_pool_config("local").redis_url
6659
except Exception:
6760
return "redis://127.0.0.1:6379"
@@ -544,12 +537,12 @@ def _run_swarm_via_local_pool(args: object) -> int:
544537
user_id = os.environ.get("DEVSPER_USER_ID", "local-user")
545538
redis_url = _resolve_pool_redis_url(profile="local")
546539

547-
pool_cfg = _import_platform_pool("config").load_pool_config("local")
540+
pool_cfg = _import_pool_submodule("config").load_pool_config("local")
548541

549-
crypto_mod = _import_platform_pool("crypto")
550-
manager_mod = _import_platform_pool("manager")
551-
models_mod = _import_platform_pool("models")
552-
store_mod = _import_platform_pool("store")
542+
crypto_mod = _import_pool_submodule("crypto")
543+
manager_mod = _import_pool_submodule("manager")
544+
models_mod = _import_pool_submodule("models")
545+
store_mod = _import_pool_submodule("store")
553546
encrypt_payload = crypto_mod.encrypt_payload
554547
generate_org_keypair = crypto_mod.generate_org_keypair
555548
PoolManager = manager_mod.PoolManager
@@ -1704,14 +1697,14 @@ def _run_pool_start(args) -> int:
17041697
env["PYTHONPATH"] = root + os.pathsep + env.get("PYTHONPATH", "")
17051698
if not env.get("REDIS_URL"):
17061699
env["REDIS_URL"] = _resolve_pool_redis_url(profile="local")
1707-
cmd = [sys.executable, "-m", "platform.pool.local_pool", "--workers", str(workers)]
1700+
cmd = [sys.executable, "-m", "devsper.pool.local_pool", "--workers", str(workers)]
17081701
return subprocess.call(cmd, env=env)
17091702

17101703

17111704
def _run_org_keygen(args) -> int:
17121705
"""Generate org E2EE keypair and store private key in keyring."""
17131706
try:
1714-
crypto_mod = _import_platform_pool("crypto")
1707+
crypto_mod = _import_pool_submodule("crypto")
17151708
generate_org_keypair = crypto_mod.generate_org_keypair
17161709
from devsper.credentials.store import CredentialStore
17171710
except Exception as e:

devsper/pool/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""Worker pool manager (platform-side)."""
2+

devsper/pool/config.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
import os
5+
from pathlib import Path
6+
import tomllib
7+
8+
9+
@dataclass
10+
class PoolConfig:
11+
profile: str = "prod"
12+
redis_url: str = "redis://localhost:6379"
13+
max_tasks_per_minute: int = 60
14+
heartbeat_interval: int = 30
15+
worker_timeout_secs: int = 90
16+
max_payload_bytes: int = 1_048_576
17+
max_queue_depth: int = 100
18+
local_workers: int = 0
19+
local_worker_cmd: str = "devsper-worker"
20+
21+
22+
def load_pool_config(profile_override: str | None = None) -> PoolConfig:
23+
profile = (profile_override or os.getenv("DEVSPER_PROFILE") or "").strip().lower() or "prod"
24+
profile_path = Path(__file__).resolve().parent / "profiles" / f"{profile}.toml"
25+
if not profile_path.exists():
26+
profile = "prod"
27+
profile_path = Path(__file__).resolve().parent / "profiles" / "prod.toml"
28+
29+
data = tomllib.loads(profile_path.read_text(encoding="utf-8"))
30+
pool = data.get("pool", {})
31+
limits = pool.get("limits", {}) if isinstance(pool.get("limits"), dict) else data.get("pool.limits", {})
32+
# Our profile TOML uses [pool.limits] etc; tomllib nests them inside pool dict as "limits"
33+
limits = pool.get("limits", {})
34+
35+
return PoolConfig(
36+
profile=pool.get("profile", profile),
37+
redis_url=pool.get("redis_url", "redis://localhost:6379"),
38+
max_tasks_per_minute=int(pool.get("max_tasks_per_minute", 60)),
39+
heartbeat_interval=int(pool.get("heartbeat_interval", 30)),
40+
worker_timeout_secs=int(pool.get("worker_timeout_secs", 90)),
41+
max_payload_bytes=int(limits.get("max_payload_bytes", 1_048_576)),
42+
max_queue_depth=int(limits.get("max_queue_depth", 100)),
43+
local_workers=int(pool.get("local_workers", 0)),
44+
local_worker_cmd=str(pool.get("local_worker_cmd", "python -m devsper.agents.run_agent")),
45+
)
46+

devsper/pool/crypto.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""
2+
E2EE for task payloads.
3+
4+
Encryption scheme:
5+
- Per-org X25519 key pair (private key stored in org keyring, public key registered with platform).
6+
- For each task: ephemeral X25519 keypair -> shared secret -> HKDF-SHA256 -> AES-256-GCM.
7+
- Wire format: ephemeral_pub (32B) || nonce (12B) || ciphertext+tag (len(pt)+16B)
8+
9+
The pool stores and forwards ciphertext only; it never decrypts.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import os
15+
16+
from cryptography.hazmat.primitives import hashes, serialization
17+
from cryptography.hazmat.primitives.asymmetric.x25519 import (
18+
X25519PrivateKey,
19+
X25519PublicKey,
20+
)
21+
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
22+
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
23+
24+
25+
def generate_org_keypair() -> tuple[bytes, bytes]:
26+
priv = X25519PrivateKey.generate()
27+
priv_b = priv.private_bytes(
28+
serialization.Encoding.Raw,
29+
serialization.PrivateFormat.Raw,
30+
serialization.NoEncryption(),
31+
)
32+
pub_b = priv.public_key().public_bytes(
33+
serialization.Encoding.Raw,
34+
serialization.PublicFormat.Raw,
35+
)
36+
return priv_b, pub_b
37+
38+
39+
def encrypt_payload(plaintext: bytes, org_public_key_bytes: bytes) -> bytes:
40+
eph_priv = X25519PrivateKey.generate()
41+
eph_pub = eph_priv.public_key()
42+
org_pub = X25519PublicKey.from_public_bytes(org_public_key_bytes)
43+
shared = eph_priv.exchange(org_pub)
44+
key = HKDF(
45+
algorithm=hashes.SHA256(),
46+
length=32,
47+
salt=None,
48+
info=b"devsper-task-v1",
49+
).derive(shared)
50+
nonce = os.urandom(12)
51+
ct = AESGCM(key).encrypt(nonce, plaintext, None)
52+
eph_pub_b = eph_pub.public_bytes(
53+
serialization.Encoding.Raw,
54+
serialization.PublicFormat.Raw,
55+
)
56+
return eph_pub_b + nonce + ct
57+
58+
59+
def decrypt_payload(ciphertext: bytes, org_private_key_bytes: bytes) -> bytes:
60+
if len(ciphertext) < 32 + 12 + 16:
61+
raise ValueError("ciphertext too short")
62+
eph_pub_b = ciphertext[:32]
63+
nonce = ciphertext[32:44]
64+
ct = ciphertext[44:]
65+
org_priv = X25519PrivateKey.from_private_bytes(org_private_key_bytes)
66+
eph_pub = X25519PublicKey.from_public_bytes(eph_pub_b)
67+
shared = org_priv.exchange(eph_pub)
68+
key = HKDF(
69+
algorithm=hashes.SHA256(),
70+
length=32,
71+
salt=None,
72+
info=b"devsper-task-v1",
73+
).derive(shared)
74+
return AESGCM(key).decrypt(nonce, ct, None)
75+

devsper/pool/local_pool.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import os
5+
import subprocess
6+
import uuid
7+
import logging
8+
import shlex
9+
import sys
10+
11+
from .models import PoolTier, WorkerRecord, WorkerStatus
12+
from .manager import PoolManager
13+
from .store import RedisPoolStore, InMemoryPoolStore
14+
from .config import load_pool_config
15+
16+
log = logging.getLogger("devsper.local_pool")
17+
18+
19+
class LocalWorkerPool:
20+
def __init__(self, pool, config, redis_url: str):
21+
self.pool = pool
22+
self.config = config
23+
self._redis_url = redis_url
24+
self._procs: dict[str, subprocess.Popen] = {}
25+
26+
async def start(self, n: int | None = None):
27+
n = n or getattr(self.config, "local_workers", 0) or 0
28+
for _ in range(n):
29+
await self._spawn_worker()
30+
31+
async def _spawn_worker(self):
32+
worker_id = str(uuid.uuid4())
33+
env = {
34+
**os.environ,
35+
"DEVSPER_WORKER_ID": worker_id,
36+
"DEVSPER_PROFILE": "local",
37+
# Workers must use the same Redis as registration / `devsper run` (defaults differ per module).
38+
"REDIS_URL": self._redis_url,
39+
}
40+
cmd = shlex.split(str(getattr(self.config, "local_worker_cmd", "") or ""))
41+
if not cmd:
42+
cmd = ["python", "-c", "import time; time.sleep(3600)"]
43+
if cmd and cmd[0] in ("python", "python3"):
44+
cmd[0] = sys.executable
45+
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
46+
self._procs[worker_id] = proc
47+
48+
org_id = os.environ.get("DEVSPER_ORG_ID") or getattr(self.config, "org_id", None)
49+
worker = WorkerRecord(
50+
worker_id=worker_id,
51+
node_id="local",
52+
org_id=org_id,
53+
tier=PoolTier.LOCAL,
54+
status=WorkerStatus.IDLE,
55+
profile="local",
56+
)
57+
await self.pool.register_worker(worker)
58+
asyncio.create_task(self._monitor(worker_id, proc))
59+
60+
async def _monitor(self, worker_id: str, proc: subprocess.Popen):
61+
while True:
62+
await asyncio.sleep(5)
63+
if proc.poll() is not None:
64+
await self.pool.deregister_worker(worker_id)
65+
self._procs.pop(worker_id, None)
66+
await self._spawn_worker()
67+
return
68+
await self.pool.store.heartbeat(worker_id, ttl_secs=getattr(self.config, "worker_timeout_secs", 90))
69+
70+
async def stop(self):
71+
for wid, proc in list(self._procs.items()):
72+
try:
73+
proc.terminate()
74+
except Exception:
75+
pass
76+
await self.pool.deregister_worker(wid)
77+
self._procs.clear()
78+
79+
80+
def main():
81+
import argparse
82+
83+
parser = argparse.ArgumentParser(description="Start local worker pool (dev/test).")
84+
parser.add_argument("--workers", type=int, default=None, help="Number of local workers to spawn")
85+
args = parser.parse_args()
86+
87+
if not os.getenv("DEVSPER_PROFILE"):
88+
os.environ["DEVSPER_PROFILE"] = "local"
89+
cfg = load_pool_config()
90+
# Prefer REDIS_URL env override for local dev/compose.
91+
redis_url = os.getenv("REDIS_URL") or cfg.redis_url
92+
os.environ.setdefault("REDIS_URL", redis_url)
93+
94+
use_memory = (os.getenv("DEVSPER_POOL_BACKEND", "").strip().lower() == "memory")
95+
if use_memory:
96+
store: RedisPoolStore | InMemoryPoolStore = InMemoryPoolStore()
97+
log.warning("DEVSPER_POOL_BACKEND=memory: workers are in-process only; use TCP Redis for real E2E.")
98+
else:
99+
store = RedisPoolStore(redis_url)
100+
101+
class _Bus:
102+
def __init__(self, url: str):
103+
import redis as sync_redis
104+
105+
self._r = sync_redis.Redis.from_url(url, decode_responses=True)
106+
107+
def publish(self, channel: str, payload: dict):
108+
import json as _json
109+
110+
self._r.publish(channel, _json.dumps(payload))
111+
112+
pool = PoolManager(store=store, bus=_Bus(redis_url), config=cfg)
113+
lp = LocalWorkerPool(pool, cfg, redis_url=redis_url)
114+
115+
async def run():
116+
await lp.start(args.workers)
117+
# Run forever.
118+
while True:
119+
await asyncio.sleep(60)
120+
121+
asyncio.run(run())
122+
123+
124+
if __name__ == "__main__":
125+
main()
126+

0 commit comments

Comments
 (0)