From c46e84fbf6f3066139d231b58857f7f5a48694ad Mon Sep 17 00:00:00 2001 From: hitartrh Date: Tue, 31 Mar 2026 19:05:45 +0200 Subject: [PATCH] Add automatic agent-visibility dashboard integration - visibility_bridge.py: VisibilityLogger subclass that mirrors all workflow events (NODE_START/END, TOOL_CALL, MODEL_CALL, EDGE_PROCESS, WORKFLOW_START/END) to the agent-visibility dashboard at localhost:4242 - WebSocketLogger now extends VisibilityLogger so the web UI path gets automatic visibility without any YAML changes - workflow_run_service: resets dashboard before each run so switching or relaunching workflows always shows a clean slate - execute_sync: GET /api/workflow/preview/{file} endpoint populates the dashboard graph immediately when a workflow is selected (before launch) - LaunchView: calls the preview endpoint on workflow selection - functions/visibility.py + demo_visibility.yaml: opt-in function tools for workflows that want explicit control over dashboard events --- frontend/src/pages/LaunchView.vue | 3 + functions/function_calling/visibility.py | 194 +++++++++++++++ server/routes/execute_sync.py | 73 +++++- server/services/websocket_executor.py | 6 +- server/services/websocket_logger.py | 16 +- server/services/workflow_run_service.py | 3 + utils/visibility_bridge.py | 286 +++++++++++++++++++++++ yaml_instance/demo_visibility.yaml | 172 ++++++++++++++ 8 files changed, 744 insertions(+), 9 deletions(-) create mode 100644 functions/function_calling/visibility.py create mode 100644 utils/visibility_bridge.py create mode 100644 yaml_instance/demo_visibility.yaml diff --git a/frontend/src/pages/LaunchView.vue b/frontend/src/pages/LaunchView.vue index 5f61c1d5d9..1bee3de14c 100755 --- a/frontend/src/pages/LaunchView.vue +++ b/frontend/src/pages/LaunchView.vue @@ -1299,6 +1299,9 @@ const handleYAMLSelection = async (fileName) => { return } + // Preview workflow graph in visibility dashboard (fire-and-forget) + fetch(`/api/workflow/preview/${encodeURIComponent(fileName)}`).catch(() => {}) + // Clear the chat chatMessages.value = [] diff --git a/functions/function_calling/visibility.py b/functions/function_calling/visibility.py new file mode 100644 index 0000000000..92c2863cff --- /dev/null +++ b/functions/function_calling/visibility.py @@ -0,0 +1,194 @@ +""" +Agent-visibility integration for ChatDev workflows. + +Provides function tools that stream agent events to the agent-visibility +dashboard (https://github.com/yourusername/agent-visibility-cli). + +Start the dashboard before running your workflow: + node src/server.js # dashboard on http://localhost:4242 + +Then add any of the functions below to your agent's tooling list. +All calls are fire-and-forget — a missing dashboard never crashes the workflow. +""" + +import json +import os +import urllib.error +import urllib.request +from typing import Annotated, Literal, Optional + +from utils.function_catalog import ParamMeta + +_DASHBOARD_URL = os.environ.get("VISIBILITY_URL", "http://localhost:4242") + + +def _post(tool: str, args: dict) -> str: + """Fire-and-forget POST to the visibility dashboard. Returns status string.""" + body = json.dumps({"tool": tool, "args": args}).encode() + try: + req = urllib.request.Request( + f"{_DASHBOARD_URL}/tool", + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=2) + return "ok" + except Exception as exc: + return f"visibility dashboard unreachable: {exc}" + + +def set_goal( + goal: Annotated[str, ParamMeta(description="Short description of the overall run goal or task")], + run_id: Annotated[ + Optional[str], + ParamMeta(description="Optional unique run identifier; auto-generated if omitted"), + ] = None, +) -> str: + """ + Set the run goal and mark the workflow as started on the visibility dashboard. + + Call this once at the beginning of a workflow run, before registering agents. + The goal text is displayed prominently in the dashboard header. + """ + args: dict = {"goal": goal} + if run_id: + args["run_id"] = run_id + return _post("set_goal", args) + + +def register_agent( + agent_id: Annotated[str, ParamMeta(description="Unique identifier for this agent (e.g. 'pm', 'coder')")], + label: Annotated[str, ParamMeta(description="Human-readable display name shown on the dashboard canvas")], + role: Annotated[ + Literal["orchestrator", "worker", "researcher", "coder", "critic", "synthesiser"], + ParamMeta(description="Agent role, controls the icon and colour on the canvas"), + ], + model: Annotated[Optional[str], ParamMeta(description="Model name, e.g. 'gpt-4o'")] = None, + reports_to: Annotated[ + Optional[str], ParamMeta(description="agent_id of the parent/supervisor agent") + ] = None, + color: Annotated[ + Optional[str], ParamMeta(description="Override hex colour for this agent's node, e.g. '#7c3aed'") + ] = None, +) -> str: + """ + Register an agent with the visibility dashboard. + + Creates a node on the canvas for the agent. Call this for every agent in + the workflow, ideally before the agent starts its first task. + """ + args: dict = {"id": agent_id, "label": label, "role": role} + if model: + args["model"] = model + if reports_to: + args["reports_to"] = reports_to + if color: + args["color"] = color + return _post("register_agent", args) + + +def log_event( + agent: Annotated[str, ParamMeta(description="agent_id of the agent generating this event")], + event_type: Annotated[ + Literal[ + "start", "plan", "route", "reply", "tool", + "result", "pass", "fail", "retry", "warn", "error", "done", + ], + ParamMeta(description="Event category; controls icon and colour in the event log"), + ], + message: Annotated[str, ParamMeta(description="Human-readable description of what happened")], + tokens: Annotated[ + Optional[int], ParamMeta(description="Token count for this step, if known") + ] = None, + latency_ms: Annotated[ + Optional[float], ParamMeta(description="Wall-clock latency of this step in milliseconds") + ] = None, +) -> str: + """ + Log an agent event to the visibility dashboard event feed. + + Use this liberally to narrate what an agent is doing — planning, calling a + tool, producing a result, hitting an error, and so on. + """ + args: dict = {"agent": agent, "event_type": event_type, "message": message} + if tokens is not None: + args["tokens"] = tokens + if latency_ms is not None: + args["latency_ms"] = latency_ms + return _post("log_event", args) + + +def trace_step( + from_agent: Annotated[str, ParamMeta(description="agent_id of the sender")], + to_agent: Annotated[str, ParamMeta(description="agent_id of the receiver")], + label: Annotated[ + Optional[str], ParamMeta(description="Short label shown on the arrow, e.g. 'handoff plan'") + ] = None, + arrow_type: Annotated[ + Literal["msg", "result", "retry", "tool"], + ParamMeta(description="Arrow style: msg=blue, result=green, retry=orange, tool=purple"), + ] = "msg", +) -> str: + """ + Draw a directed arrow between two agents on the visibility canvas. + + Call this whenever one agent hands work to another, so the dashboard shows + the message flow in real time. + """ + args: dict = {"from_agent": from_agent, "to_agent": to_agent, "arrow_type": arrow_type} + if label: + args[label] = label + return _post("trace_step", args) + + +def set_agent_state( + agent_id: Annotated[str, ParamMeta(description="agent_id to update")], + status: Annotated[ + Literal["idle", "running", "active", "done", "error"], + ParamMeta(description="New status; controls the badge colour on the canvas node"), + ], +) -> str: + """ + Update an agent's status badge on the visibility dashboard. + + Use 'active' or 'running' when the agent starts working, and 'done' or + 'error' when it finishes. + """ + return _post("set_agent_state", {"agent_id": agent_id, "status": status}) + + +def set_plan( + tasks: Annotated[ + list, + ParamMeta( + description=( + "List of task objects, each with at minimum an 'id' and 'label' field " + "and optionally 'agent' (agent_id responsible) and 'status'." + ) + ), + ], +) -> str: + """ + Publish the workflow task plan to the Plan tab on the visibility dashboard. + + Pass a list of task dicts, e.g.: + [{"id": "t1", "label": "Write code", "agent": "coder"}, + {"id": "t2", "label": "Review code", "agent": "reviewer"}] + """ + return _post("set_plan", {"tasks": tasks}) + + +def finish_run( + status: Annotated[ + Literal["done", "error"], + ParamMeta(description="Final run status"), + ] = "done", +) -> str: + """ + Mark the current workflow run as complete on the visibility dashboard. + + Call this in the last agent or a teardown step. The dashboard will stop + its live timer and display the final status badge. + """ + return _post("finish_run", {"status": status}) diff --git a/server/routes/execute_sync.py b/server/routes/execute_sync.py index 8426a1a8f6..dd9072422a 100644 --- a/server/routes/execute_sync.py +++ b/server/routes/execute_sync.py @@ -18,6 +18,7 @@ from runtime.bootstrap.schema import ensure_schema_registry_populated from runtime.sdk import OUTPUT_ROOT, run_workflow from server.models import WorkflowRunRequest +from utils.visibility_bridge import VisibilityLogger, _reset_sync, _post, _slug, _infer_role from server.settings import YAML_DIR from utils.attachments import AttachmentStore from utils.exceptions import ValidationError, WorkflowExecutionError @@ -107,7 +108,7 @@ def _run_workflow_with_logger( graph_context = GraphContext(config=graph_config) task_input = _build_task_input(graph_context, task_prompt, attachments) - class _StreamingWorkflowLogger(WorkflowLogger): + class _StreamingWorkflowLogger(VisibilityLogger): def add_log(self, *args, **kwargs): entry = super().add_log(*args, **kwargs) if entry: @@ -124,6 +125,7 @@ def _create_logger(self) -> WorkflowLogger: level, use_structured_logging=True, log_to_console=False, + task_prompt=task_prompt, ) executor = _StreamingExecutor(graph_context, session_id=normalized_session) @@ -250,4 +252,71 @@ async def stream(): if done_event.is_set(): break - return StreamingResponse(stream(), media_type=_SSE_CONTENT_TYPE) \ No newline at end of file + return StreamingResponse(stream(), media_type=_SSE_CONTENT_TYPE) + + +@router.get("/api/workflow/preview/{yaml_file:path}") +async def preview_workflow(yaml_file: str): + """ + Load a workflow's graph definition and send it to the visibility dashboard + so the user sees the agent graph before launching the workflow. + """ + try: + yaml_path = _resolve_yaml_path(yaml_file) + if not yaml_path.exists(): + raise HTTPException(status_code=404, detail=f"Workflow not found: {yaml_file}") + + design = load_config(yaml_path) + graph_config = GraphConfig.from_definition( + design.graph, + name=yaml_path.stem, + output_root=OUTPUT_ROOT, + source_path=str(yaml_path), + vars=design.vars, + ) + + def _send(): + _reset_sync() + + nodes = graph_config.get_node_definitions() + edges = graph_config.get_edge_definitions() + + for node in nodes: + agent_id = _slug(node.id) + role = _infer_role(node.id, getattr(node, "type", "agent")) + _post("register_agent", {"id": agent_id, "label": node.id, "role": role}) + _post("set_agent_state", {"agent_id": agent_id, "status": "idle"}) + + for edge in edges: + source = getattr(edge, "source", None) + target = getattr(edge, "target", None) + if source and target: + _post("trace_step", { + "from_agent": _slug(source), + "to_agent": _slug(target), + "arrow_type": "msg", + }) + + node_index = {node.id: i for i, node in enumerate(nodes)} + plan_tasks = [] + for i, node in enumerate(nodes): + incoming = [ + node_index[e.source] + for e in edges + if getattr(e, "target", None) == node.id and e.source in node_index + ] + plan_tasks.append({ + "agent": _slug(node.id), + "task": node.id, + "depends_on": incoming, + }) + _post("set_plan", {"tasks": plan_tasks}) + _post("set_goal", {"goal": f"Preview: {yaml_path.stem}"}) + + await run_in_threadpool(_send) + return {"status": "ok"} + + except HTTPException: + raise + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) \ No newline at end of file diff --git a/server/services/websocket_executor.py b/server/services/websocket_executor.py index f26c8cfaf7..84b535dffc 100755 --- a/server/services/websocket_executor.py +++ b/server/services/websocket_executor.py @@ -27,6 +27,7 @@ def __init__( websocket_manager, session_store: WorkflowSessionStore, cancel_event=None, + task_prompt: str = None, ): self.session_id = session_id self.session_controller = session_controller @@ -34,6 +35,7 @@ def __init__( self.websocket_manager = websocket_manager self.session_store = session_store self.results = {} + self.task_prompt = task_prompt self.artifact_dispatcher = ArtifactDispatcher(session_id, session_store, websocket_manager) def hook_factory(runtime_context): @@ -60,7 +62,9 @@ def hook_factory(runtime_context): def _create_logger(self) -> WorkflowLogger: from server.services.websocket_logger import WebSocketLogger - return WebSocketLogger(self.websocket_manager, self.session_id, self.graph.name, self.graph.log_level) + return WebSocketLogger(self.websocket_manager, self.session_id, self.graph.name, + self.graph.log_level, task_prompt=self.task_prompt, + graph_config=self.graph.config) async def execute_graph_async(self, task_prompt): await asyncio.get_event_loop().run_in_executor(None, self._execute, task_prompt) diff --git a/server/services/websocket_logger.py b/server/services/websocket_logger.py index 29afe4f6d8..16bd7c8fea 100755 --- a/server/services/websocket_logger.py +++ b/server/services/websocket_logger.py @@ -1,16 +1,20 @@ import asyncio -from typing import Any, Dict +from typing import Any, Dict, Optional from entity.enums import LogLevel, EventType -from utils.logger import WorkflowLogger, LogEntry +from utils.logger import LogEntry +from utils.visibility_bridge import VisibilityLogger from utils.structured_logger import get_workflow_logger -class WebSocketLogger(WorkflowLogger): +class WebSocketLogger(VisibilityLogger): """Workflow logger that also pushes entries via WebSocket.""" - def __init__(self, websocket_manager, session_id: str, workflow_id: str = None, log_level: LogLevel = LogLevel.DEBUG): - super().__init__(workflow_id, log_level, log_to_console=False) + def __init__(self, websocket_manager, session_id: str, workflow_id: str = None, + log_level: LogLevel = LogLevel.DEBUG, task_prompt: Optional[str] = None, + graph_config=None): + super().__init__(workflow_id, log_level, log_to_console=False, + task_prompt=task_prompt, graph_config=graph_config) self.websocket_manager = websocket_manager self.session_id = session_id @@ -26,5 +30,5 @@ def add_log(self, level: LogLevel, message: str = None, node_id: str = None, "type": "log", "data": log_entry.to_dict() }) - + return log_entry diff --git a/server/services/workflow_run_service.py b/server/services/workflow_run_service.py index 9b0689b904..e727c2b3cb 100755 --- a/server/services/workflow_run_service.py +++ b/server/services/workflow_run_service.py @@ -9,6 +9,7 @@ from entity.messages import Message from entity.enums import LogLevel from utils.exceptions import ValidationError, WorkflowCancelledError +from utils.visibility_bridge import _reset_sync from utils.structured_logger import get_server_logger, LogType from utils.task_input import TaskInputBuilder from workflow.graph_context import GraphContext @@ -166,6 +167,7 @@ async def _execute_workflow_async( websocket_manager, self.session_store, cancel_event=cancel_event, + task_prompt=task_prompt, ) if session: @@ -182,6 +184,7 @@ async def _execute_workflow_async( executor.attachment_store, ) + _reset_sync() # clear dashboard before every run await executor.execute_graph_async(task_input) # If cancellation was requested during execution but not raised inside threads, diff --git a/utils/visibility_bridge.py b/utils/visibility_bridge.py new file mode 100644 index 0000000000..b2a4abe7b2 --- /dev/null +++ b/utils/visibility_bridge.py @@ -0,0 +1,286 @@ +""" +Automatic agent-visibility integration for ChatDev workflows. + +Subclasses WorkflowLogger so that every workflow run — regardless of which +YAML is used — streams live events to the agent-visibility dashboard at +http://localhost:4242 (override with VISIBILITY_URL env var). + +Usage: injected automatically via execute_sync.py; no YAML changes needed. +""" + +import json +import os +import re +import threading +import urllib.request +from typing import Any, Dict, List, Optional + +from entity.enums import EventType, LogLevel +from utils.logger import WorkflowLogger + +_DASHBOARD_URL = os.environ.get("VISIBILITY_URL", "http://localhost:4242") + +# Map node types to dashboard roles +_TYPE_TO_ROLE = { + "agent": "worker", + "human": "worker", + "python_runner": "coder", + "orchestrator": "orchestrator", +} + +# Heuristic role upgrade based on node label keywords +_LABEL_ROLE_HINTS = { + "ceo": "orchestrator", + "chief": "orchestrator", + "manager": "orchestrator", + "director": "orchestrator", + "orchestrat": "orchestrator", + "coder": "coder", + "programmer": "coder", + "developer": "coder", + "engineer": "coder", + "reviewer": "critic", + "critic": "critic", + "tester": "critic", + "qa": "critic", + "research": "researcher", + "analyst": "researcher", +} + + +def _post(tool: str, args: dict) -> None: + """Fire-and-forget POST to the visibility dashboard in a background thread.""" + clean = {k: v for k, v in args.items() if v is not None} + + def _do(): + try: + body = json.dumps({"tool": tool, "args": clean}, default=str).encode() + req = urllib.request.Request( + f"{_DASHBOARD_URL}/tool", + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=2) + except Exception: + pass # dashboard being down must never crash a workflow + + threading.Thread(target=_do, daemon=True).start() + + +def _reset_sync() -> None: + """Synchronous reset — call this before queuing any new-run events.""" + try: + req = urllib.request.Request( + f"{_DASHBOARD_URL}/reset", + data=b"", + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=2) + except Exception: + pass + + +def _slug(label: str) -> str: + """'Chief Executive Officer' → 'chief_executive_officer'""" + return re.sub(r"[^a-z0-9]+", "_", (label or "").lower()).strip("_") or "unknown" + + +def _infer_role(node_id: str, node_type: str) -> str: + """Infer a dashboard role from node label and type.""" + label_lower = (node_id or "").lower() + for keyword, role in _LABEL_ROLE_HINTS.items(): + if keyword in label_lower: + return role + return _TYPE_TO_ROLE.get(node_type or "agent", "worker") + + +class VisibilityLogger(WorkflowLogger): + """ + Drop-in replacement for WorkflowLogger that automatically mirrors all + workflow events to the agent-visibility dashboard. + + Intercepts add_log() *before* the parent's log-level filter so that + EDGE_PROCESS (DEBUG) events are never silently dropped. + """ + + def __init__( + self, + workflow_id: str = None, + log_level: LogLevel = LogLevel.DEBUG, + use_structured_logging: bool = True, + log_to_console: bool = True, + *, + task_prompt: str = None, + graph_config=None, + ) -> None: + super().__init__(workflow_id, log_level, use_structured_logging, log_to_console) + self._vis_task = task_prompt or "Workflow run" + self._vis_registered: set = set() + self._graph_config = graph_config + + def add_log( + self, + level: LogLevel, + message: str = None, + node_id: str = None, + event_type: EventType = None, + details: Dict[str, Any] = None, + duration: float = None, + ): + # Forward to the dashboard BEFORE the parent's level filter so we + # never miss DEBUG-level events (e.g. EDGE_PROCESS / trace arrows). + self._forward(level, message, node_id, event_type, details or {}, duration) + return super().add_log(level, message, node_id, event_type, details, duration) + + # ------------------------------------------------------------------ + # Internal event mapping + # ------------------------------------------------------------------ + + def _forward( + self, + level: LogLevel, + message: Optional[str], + node_id: Optional[str], + event_type: Optional[EventType], + details: Dict[str, Any], + duration: Optional[float], + ) -> None: + ms = round(duration * 1000) if duration else None + + if event_type == EventType.WORKFLOW_START: + self._vis_registered.clear() + self._send_initial_state(self._vis_task) + + elif event_type == EventType.NODE_START and node_id: + agent_id = _slug(node_id) + # Lazy-register if not already done via graph definition + if agent_id not in self._vis_registered: + _post("register_agent", {"id": agent_id, "label": node_id, "role": "worker"}) + self._vis_registered.add(agent_id) + _post("set_agent_state", {"agent_id": agent_id, "status": "active"}) + _post("log_event", { + "agent": agent_id, + "event_type": "start", + "message": message or f"{node_id} started", + }) + + elif event_type == EventType.TOOL_CALL and node_id: + agent_id = _slug(node_id) + tool_name = details.get("tool_name", "tool") + success = details.get("success", True) + output = details.get("tool_result") + # log_tool_call populates the Tools tab + _post("log_tool_call", { + "agent": agent_id, + "tool_name": tool_name, + "output": str(output)[:2000] if output else None, + "error": None if success is not False else str(output), + "latency_ms": ms, + }) + # Also send a human-readable event to the log feed + _post("log_event", { + "agent": agent_id, + "event_type": "tool" if success is not False else "fail", + "message": f"Called {tool_name}", + "latency_ms": ms, + }) + + elif event_type == EventType.MODEL_CALL and node_id: + agent_id = _slug(node_id) + model = details.get("model_name", "llm") + response = details.get("output") + # log_generation populates the Tools tab with the LLM turn + _post("log_generation", { + "agent": agent_id, + "model": model, + "response": str(response)[:4000] if response else None, + "latency_ms": ms, + }) + _post("log_event", { + "agent": agent_id, + "event_type": "reply", + "message": f"LLM call ({model})", + "latency_ms": ms, + }) + + elif event_type == EventType.EDGE_PROCESS and node_id: + to_node = details.get("to_node") + if to_node: + _post("trace_step", { + "from_agent": _slug(node_id), + "to_agent": _slug(to_node), + "arrow_type": "result", + }) + + elif event_type == EventType.NODE_END and node_id: + agent_id = _slug(node_id) + _post("log_event", { + "agent": agent_id, + "event_type": "done", + "message": message or f"{node_id} finished", + "latency_ms": ms, + }) + _post("set_agent_state", {"agent_id": agent_id, "status": "done"}) + # Store the node output in the Memory tab + output = details.get("output") if details else None + if output: + _post("set_memory", { + "key": node_id, + "value": str(output)[:300], + "op": "write", + }) + + elif event_type == EventType.WORKFLOW_END: + success = details.get("success", True) + _post("finish_run", {"status": "done" if success else "error"}) + + def _send_initial_state(self, task: str) -> None: + """ + Called synchronously after reset. Posts set_goal then registers the + full graph so the canvas is populated before any node runs. + All calls are inline (not threaded) to preserve ordering. + """ + _post("set_goal", {"goal": task}) + + if not self._graph_config: + return + + try: + nodes: List = self._graph_config.get_node_definitions() + edges: List = self._graph_config.get_edge_definitions() + except Exception: + return + + for node in nodes: + agent_id = _slug(node.id) + role = _infer_role(node.id, getattr(node, "type", "agent")) + _post("register_agent", {"id": agent_id, "label": node.id, "role": role}) + self._vis_registered.add(agent_id) + + for edge in edges: + source = getattr(edge, "source", None) + target = getattr(edge, "target", None) + if source and target: + _post("trace_step", { + "from_agent": _slug(source), + "to_agent": _slug(target), + "arrow_type": "msg", + }) + + # Build plan with correct schema: {agent, task, depends_on} + node_index = {node.id: i for i, node in enumerate(nodes)} + plan_tasks = [] + for i, node in enumerate(nodes): + incoming = [ + node_index[e.source] + for e in edges + if getattr(e, "target", None) == node.id and e.source in node_index + ] + plan_tasks.append({ + "agent": _slug(node.id), + "task": node.id, + "depends_on": incoming, + }) + _post("set_plan", {"tasks": plan_tasks}) diff --git a/yaml_instance/demo_visibility.yaml b/yaml_instance/demo_visibility.yaml new file mode 100644 index 0000000000..b5c758b153 --- /dev/null +++ b/yaml_instance/demo_visibility.yaml @@ -0,0 +1,172 @@ +# demo_visibility.yaml — agent-visibility integration demo +# +# Shows how to stream ChatDev multi-agent activity to the agent-visibility +# dashboard using the built-in visibility function tools. +# +# Prerequisites: +# 1. Clone and start the visibility dashboard: +# git clone https://github.com/yourusername/agent-visibility-cli +# node src/server.js +# 2. Open http://localhost:4242 in your browser. +# 3. Submit this workflow via the ChatDev web UI or API. +# +# How it works: +# visibility.py in functions/function_calling/ is auto-discovered by ChatDev. +# Agent roles instruct them to call register_agent, log_event, trace_step, +# etc. alongside their regular tools. No changes to the workflow engine needed. + +version: 0.4.0 + +vars: + COMMON_PROMPT: >- + ChatDev is a software company powered by multiple intelligent agents with + a mission of "changing the digital world through programming". + +graph: + id: visibility_demo + description: >- + Three-agent software development workflow (CEO → Programmer → Reviewer) + streamed live to the agent-visibility dashboard. + log_level: INFO + is_majority_voting: false + + start: + - Chief Executive Officer + + end: + - Code Reviewer + + nodes: + + # ── CEO: plans the project and bootstraps the dashboard ────────────────── + - id: Chief Executive Officer + type: agent + description: Breaks the task into a plan and registers all agents on the dashboard + config: + provider: openai + name: gpt-4o + base_url: ${BASE_URL} + api_key: ${API_KEY} + role: |- + ${COMMON_PROMPT} + You are the Chief Executive Officer at ChatDev. + + VISIBILITY SETUP — do these first, in order: + 1. Call set_goal with a one-sentence description of the user's task. + 2. Call register_agent three times: + register_agent(agent_id="ceo", label="CEO", role="orchestrator") + register_agent(agent_id="coder", label="Programmer", role="coder") + register_agent(agent_id="reviewer", label="Code Reviewer", role="critic") + 3. Call set_plan with the task breakdown, e.g.: + [{"id":"design","label":"Design solution","agent":"ceo"}, + {"id":"code", "label":"Write code", "agent":"coder"}, + {"id":"review","label":"Review code", "agent":"reviewer"}] + 4. Call log_event(agent="ceo", event_type="plan", message="") + + THEN do your actual work: + - Analyse the user's request. + - Produce a clear, concise technical design: architecture, file names, key classes/functions. + - End your response with the full design document so the Programmer can implement it. + + AFTER your response: + 5. Call trace_step(from_agent="ceo", to_agent="coder", label="design handoff") + 6. Call set_agent_state(agent_id="ceo", status="done") + tooling: + - type: function + config: + tools: + - name: set_goal + - name: register_agent + - name: set_plan + - name: log_event + - name: trace_step + - name: set_agent_state + + # ── Programmer: implements the design ──────────────────────────────────── + - id: Programmer + type: agent + description: Implements the CEO's design and writes all code to disk + config: + provider: openai + name: gpt-4o + base_url: ${BASE_URL} + api_key: ${API_KEY} + role: |- + ${COMMON_PROMPT} + You are the Programmer at ChatDev. + + VISIBILITY — call these as you work: + - log_event(agent="coder", event_type="start", message="Starting implementation") + - set_agent_state(agent_id="coder", status="active") + - log_event(agent="coder", event_type="tool", message="") + - log_event(agent="coder", event_type="result", message="Implementation complete") + - trace_step(from_agent="coder", to_agent="reviewer", label="code review", arrow_type="result") + - set_agent_state(agent_id="coder", status="done") + + THEN do your actual work: + - Read the design document from the CEO. + - Set up the Python environment and install any required packages. + - Write all source files to disk using save_file. + - Ensure every file is complete and runnable — no placeholders. + tooling: + - type: function + config: + tools: + - name: log_event + - name: trace_step + - name: set_agent_state + - name: uv_related:All + - name: apply_text_edits + - name: create_folder + - name: describe_available_files + - name: delete_path + - name: read_file_segment + - name: search_in_files + - name: save_file + - name: list_directory + + # ── Code Reviewer: reviews and finishes the run ────────────────────────── + - id: Code Reviewer + type: agent + description: Reviews the code and marks the run as complete on the dashboard + config: + provider: openai + name: gpt-4o + base_url: ${BASE_URL} + api_key: ${API_KEY} + role: |- + ${COMMON_PROMPT} + You are the Code Reviewer at ChatDev. + + VISIBILITY — call these as you work: + - log_event(agent="reviewer", event_type="start", message="Starting code review") + - set_agent_state(agent_id="reviewer", status="active") + - log_event(agent="reviewer", event_type="result", message="") + - set_agent_state(agent_id="reviewer", status="done") + - finish_run(status="done") ← call this last + + THEN do your actual work: + - Load all source files and review them carefully. + - Check for: correctness, missing imports, unimplemented methods, bugs, style. + - Output your review with priority-ordered issues. + - If the code is acceptable, end with " Finished". + tooling: + - type: function + config: + tools: + - name: log_event + - name: set_agent_state + - name: finish_run + - name: describe_available_files + - name: read_file_segment + - name: search_in_files + - name: list_directory + + edges: + - from: Chief Executive Officer + to: Programmer + carry_data: true + + - from: Programmer + to: Code Reviewer + carry_data: true