Skip to content

Commit 4164914

Browse files
committed
implemented multi-agent support in effectful using a basic choreographic programming primitive
1 parent 8f58b07 commit 4164914

4 files changed

Lines changed: 2074 additions & 0 deletions

File tree

docs/source/multi_agent_example.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
"""Multi-agent system using choreographic endpoint projection.
2+
3+
Demonstrates:
4+
- Choreographic programming: one function describes the entire workflow
5+
- Automatic endpoint projection: each agent gets its own thread
6+
- Crash tolerance: Ctrl-C and restart, agents resume where they left off
7+
- Scatter: two coder agents share the implementation work via claim-based pull
8+
- PersistentAgent for automatic checkpointing and context compaction
9+
10+
The scenario: a team of agents collaboratively builds a small Python library.
11+
An architect agent breaks the project into module specs, two coder agents
12+
implement the modules in parallel (via scatter), and two reviewer agents
13+
review modules in parallel and request fixes if needed.
14+
15+
Usage::
16+
17+
# First run — agents start working
18+
python docs/source/multi_agent_example.py
19+
20+
# Ctrl-C mid-run, then restart — agents pick up where they left off
21+
python docs/source/multi_agent_example.py
22+
23+
Requirements:
24+
pip install effectful[llm]
25+
export OPENAI_API_KEY=... # or any LiteLLM-supported provider
26+
27+
"""
28+
29+
import json
30+
import logging
31+
from pathlib import Path
32+
from typing import Literal, TypedDict
33+
34+
from effectful.handlers.llm import Template, Tool
35+
from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler
36+
from effectful.handlers.llm.multi import Choreography, ChoreographyError, scatter
37+
from effectful.handlers.llm.persistence import PersistenceHandler, PersistentAgent
38+
from effectful.ops.types import NotHandled
39+
40+
logging.basicConfig(
41+
level=logging.INFO,
42+
format="%(asctime)s [%(threadName)s] %(message)s",
43+
datefmt="%H:%M:%S",
44+
)
45+
log = logging.getLogger(__name__)
46+
47+
# ---------------------------------------------------------------------------
48+
# Configuration
49+
# ---------------------------------------------------------------------------
50+
51+
WORKSPACE = Path("./multi_agent_workspace")
52+
STATE_DIR = WORKSPACE / ".state"
53+
OUTPUT_DIR = WORKSPACE / "output"
54+
MODEL = "gpt-4o-mini"
55+
56+
# The project to build
57+
PROJECT_SPEC = """\
58+
Build a small Python utility library called 'textkit' with these modules:
59+
1. textkit/slugify.py — convert strings to URL-safe slugs
60+
2. textkit/wrap.py — word-wrap text to a given width
61+
3. textkit/redact.py — redact email addresses and phone numbers from text
62+
Each module should have a clear public API, docstrings, and at least 3
63+
test cases written as a separate test_<module>.py file.
64+
"""
65+
66+
67+
# ---------------------------------------------------------------------------
68+
# Structured types — constrained decoding for LLM output
69+
# ---------------------------------------------------------------------------
70+
71+
72+
class ModuleSpec(TypedDict):
73+
"""Schema for architect planning output — constrained decoding ensures valid shape."""
74+
75+
module_path: str
76+
description: str
77+
public_api: str
78+
test_path: str
79+
80+
81+
class PlanResult(TypedDict):
82+
"""Wrapper for list output — LiteLLM requires a root object, not bare array."""
83+
84+
modules: list[ModuleSpec]
85+
86+
87+
class ReviewResult(TypedDict):
88+
"""Schema for reviewer output — verdict constrained to PASS or NEEDS_FIXES."""
89+
90+
verdict: Literal["PASS", "NEEDS_FIXES"]
91+
feedback: str
92+
93+
94+
# ---------------------------------------------------------------------------
95+
# Agents
96+
# ---------------------------------------------------------------------------
97+
98+
99+
class ArchitectAgent(PersistentAgent):
100+
"""You are a software architect. Given a project specification, you break
101+
it into individual module implementation tasks. Each task should specify
102+
the module filename, its public API, and what tests to write.
103+
Be concrete and specific — the coder will follow your spec exactly.
104+
"""
105+
106+
def __init__(self, **kwargs):
107+
super().__init__(**kwargs)
108+
self._output_dir = OUTPUT_DIR
109+
110+
@Tool.define
111+
def read_existing_files(self) -> str:
112+
"""List files already written to the output directory."""
113+
if not self._output_dir.exists():
114+
return "No files yet."
115+
files = sorted(self._output_dir.rglob("*.py"))
116+
if not files:
117+
return "No Python files yet."
118+
return "\n".join(str(f.relative_to(self._output_dir)) for f in files)
119+
120+
@Template.define
121+
def plan_modules(self, project_spec: str) -> PlanResult:
122+
"""Given this project specification, output a plan with a "modules" list.
123+
Each module spec has: module_path, description, public_api, test_path.
124+
125+
Use `read_existing_files` to check what's already been written
126+
and skip those.
127+
128+
Project spec:
129+
{project_spec}"""
130+
raise NotHandled
131+
132+
133+
class CoderAgent(PersistentAgent):
134+
"""You are an expert Python developer. Given a module specification,
135+
you write clean, well-documented Python code. You also write thorough
136+
test files. Output ONLY the Python source code, no markdown fences.
137+
"""
138+
139+
def __init__(self, **kwargs):
140+
super().__init__(**kwargs)
141+
self._output_dir = OUTPUT_DIR
142+
143+
@Tool.define
144+
def read_file(self, path: str) -> str:
145+
"""Read a file from the output directory."""
146+
full = self._output_dir / path
147+
if full.exists():
148+
return full.read_text()
149+
return f"File not found: {path}"
150+
151+
@Tool.define
152+
def write_file(self, path: str, content: str) -> str:
153+
"""Write a file to the output directory."""
154+
full = self._output_dir / path
155+
full.parent.mkdir(parents=True, exist_ok=True)
156+
full.write_text(content)
157+
return f"Wrote {len(content)} chars to {path}"
158+
159+
@Template.define
160+
def implement_module(self, module_spec: str) -> str:
161+
"""Implement the following module specification. Use `write_file`
162+
to write both the module and its test file. Use `read_file` to
163+
check existing code if needed.
164+
165+
Specification:
166+
{module_spec}"""
167+
raise NotHandled
168+
169+
170+
class ReviewerAgent(PersistentAgent):
171+
"""You are a senior code reviewer. You review Python modules for
172+
correctness, style, edge cases, and test coverage. Be specific
173+
about issues and provide actionable feedback.
174+
"""
175+
176+
def __init__(self, **kwargs):
177+
super().__init__(**kwargs)
178+
self._output_dir = OUTPUT_DIR
179+
180+
@Tool.define
181+
def read_file(self, path: str) -> str:
182+
"""Read a file from the output directory."""
183+
full = self._output_dir / path
184+
if full.exists():
185+
return full.read_text()
186+
return f"File not found: {path}"
187+
188+
@Template.define
189+
def review_module(self, module_path: str, test_path: str) -> ReviewResult:
190+
"""Review the module at {module_path} and its tests at {test_path}.
191+
Use `read_file` to read them. Return verdict "PASS" or "NEEDS_FIXES"
192+
and feedback. If NEEDS_FIXES, explain exactly what to change."""
193+
raise NotHandled
194+
195+
196+
# ---------------------------------------------------------------------------
197+
# Choreographic program — the entire multi-agent workflow in one function
198+
# ---------------------------------------------------------------------------
199+
200+
201+
def build_project(
202+
project_spec: str,
203+
architect: ArchitectAgent,
204+
coder: CoderAgent,
205+
reviewer: ReviewerAgent,
206+
) -> list[ReviewResult]:
207+
"""Choreographic program describing the full build workflow.
208+
209+
1. Architect breaks the project into module specs.
210+
2. Coders implement modules in parallel (scatter distributes via claim-based pull).
211+
3. Reviewers review modules in parallel; coders fix in parallel until all pass.
212+
"""
213+
# Step 1: Architect plans modules
214+
plan = architect.plan_modules(project_spec)
215+
216+
# Step 2: Scatter implementation across coders
217+
# Each module becomes a task in the queue; coders claim until none remain.
218+
scatter(
219+
plan["modules"],
220+
coder,
221+
lambda c, mod: c.implement_module(json.dumps(mod, indent=2)),
222+
)
223+
224+
# Step 3: Review loop — keep fixing until reviewers accept all modules
225+
while True:
226+
reviews: list[ReviewResult] = scatter(
227+
plan["modules"],
228+
reviewer,
229+
lambda r, mod: r.review_module(mod["module_path"], mod["test_path"]),
230+
)
231+
232+
needs_fixes = [
233+
(mod, review)
234+
for mod, review in zip(plan["modules"], reviews)
235+
if review["verdict"] == "NEEDS_FIXES"
236+
]
237+
238+
if not needs_fixes:
239+
return reviews
240+
241+
# Scatter fixes across coders, then re-review
242+
scatter(
243+
needs_fixes,
244+
coder,
245+
lambda c, pair: c.implement_module(
246+
json.dumps(
247+
{**pair[0], "fix_feedback": pair[1]["feedback"]},
248+
indent=2,
249+
)
250+
),
251+
)
252+
253+
254+
# ---------------------------------------------------------------------------
255+
# Main
256+
# ---------------------------------------------------------------------------
257+
258+
259+
def main() -> None:
260+
WORKSPACE.mkdir(parents=True, exist_ok=True)
261+
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
262+
263+
# Create agents
264+
architect = ArchitectAgent(agent_id="architect")
265+
coder1 = CoderAgent(agent_id="coder-1")
266+
coder2 = CoderAgent(agent_id="coder-2")
267+
reviewer1 = ReviewerAgent(agent_id="reviewer-1")
268+
reviewer2 = ReviewerAgent(agent_id="reviewer-2")
269+
270+
# Build the choreography — all boilerplate (threads, queues, signal
271+
# handling, crash recovery) is handled automatically.
272+
choreo = Choreography(
273+
build_project,
274+
agents=[architect, coder1, coder2, reviewer1, reviewer2],
275+
state_dir=STATE_DIR,
276+
handlers=[
277+
LiteLLMProvider(model=MODEL),
278+
RetryLLMHandler(),
279+
PersistenceHandler(STATE_DIR),
280+
],
281+
)
282+
283+
log.info("Starting multi-agent build (Ctrl-C to pause, re-run to resume)")
284+
285+
try:
286+
reviews = choreo.run(
287+
project_spec=PROJECT_SPEC,
288+
architect=architect,
289+
coder=[coder1, coder2],
290+
reviewer=[reviewer1, reviewer2],
291+
)
292+
except ChoreographyError as e:
293+
log.error("Choreography failed: %s", e)
294+
return
295+
296+
# Summary
297+
output_files = list(OUTPUT_DIR.rglob("*.py"))
298+
passed = sum(1 for r in reviews if r["verdict"] == "PASS")
299+
log.info(
300+
"Done: %d modules reviewed (%d passed), %d output files",
301+
len(reviews),
302+
passed,
303+
len(output_files),
304+
)
305+
for f in output_files:
306+
log.info(" %s", f.relative_to(WORKSPACE))
307+
308+
309+
if __name__ == "__main__":
310+
main()

0 commit comments

Comments
 (0)