Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions frontend/src/pages/LaunchView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,9 @@ const handleYAMLSelection = async (fileName) => {
return
}

// Preview workflow graph in visibility dashboard (fire-and-forget)
fetch(`/api/workflow/preview/${encodeURIComponent(fileName)}`).catch(() => {})

// Clear the chat
chatMessages.value = []

Expand Down
194 changes: 194 additions & 0 deletions functions/function_calling/visibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
Agent-visibility integration for ChatDev workflows.

Provides function tools that stream agent events to the agent-visibility
dashboard (https://github.com/yourusername/agent-visibility-cli).

Start the dashboard before running your workflow:
node src/server.js # dashboard on http://localhost:4242

Then add any of the functions below to your agent's tooling list.
All calls are fire-and-forget — a missing dashboard never crashes the workflow.
"""

import json
import os
import urllib.error
import urllib.request
from typing import Annotated, Literal, Optional

from utils.function_catalog import ParamMeta

_DASHBOARD_URL = os.environ.get("VISIBILITY_URL", "http://localhost:4242")


def _post(tool: str, args: dict) -> str:
"""Fire-and-forget POST to the visibility dashboard. Returns status string."""
body = json.dumps({"tool": tool, "args": args}).encode()
try:
req = urllib.request.Request(
f"{_DASHBOARD_URL}/tool",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=2)
return "ok"
except Exception as exc:
return f"visibility dashboard unreachable: {exc}"


def set_goal(
goal: Annotated[str, ParamMeta(description="Short description of the overall run goal or task")],
run_id: Annotated[
Optional[str],
ParamMeta(description="Optional unique run identifier; auto-generated if omitted"),
] = None,
) -> str:
"""
Set the run goal and mark the workflow as started on the visibility dashboard.

Call this once at the beginning of a workflow run, before registering agents.
The goal text is displayed prominently in the dashboard header.
"""
args: dict = {"goal": goal}
if run_id:
args["run_id"] = run_id
return _post("set_goal", args)


def register_agent(
agent_id: Annotated[str, ParamMeta(description="Unique identifier for this agent (e.g. 'pm', 'coder')")],
label: Annotated[str, ParamMeta(description="Human-readable display name shown on the dashboard canvas")],
role: Annotated[
Literal["orchestrator", "worker", "researcher", "coder", "critic", "synthesiser"],
ParamMeta(description="Agent role, controls the icon and colour on the canvas"),
],
model: Annotated[Optional[str], ParamMeta(description="Model name, e.g. 'gpt-4o'")] = None,
reports_to: Annotated[
Optional[str], ParamMeta(description="agent_id of the parent/supervisor agent")
] = None,
color: Annotated[
Optional[str], ParamMeta(description="Override hex colour for this agent's node, e.g. '#7c3aed'")
] = None,
) -> str:
"""
Register an agent with the visibility dashboard.

Creates a node on the canvas for the agent. Call this for every agent in
the workflow, ideally before the agent starts its first task.
"""
args: dict = {"id": agent_id, "label": label, "role": role}
if model:
args["model"] = model
if reports_to:
args["reports_to"] = reports_to
if color:
args["color"] = color
return _post("register_agent", args)


def log_event(
agent: Annotated[str, ParamMeta(description="agent_id of the agent generating this event")],
event_type: Annotated[
Literal[
"start", "plan", "route", "reply", "tool",
"result", "pass", "fail", "retry", "warn", "error", "done",
],
ParamMeta(description="Event category; controls icon and colour in the event log"),
],
message: Annotated[str, ParamMeta(description="Human-readable description of what happened")],
tokens: Annotated[
Optional[int], ParamMeta(description="Token count for this step, if known")
] = None,
latency_ms: Annotated[
Optional[float], ParamMeta(description="Wall-clock latency of this step in milliseconds")
] = None,
) -> str:
"""
Log an agent event to the visibility dashboard event feed.

Use this liberally to narrate what an agent is doing — planning, calling a
tool, producing a result, hitting an error, and so on.
"""
args: dict = {"agent": agent, "event_type": event_type, "message": message}
if tokens is not None:
args["tokens"] = tokens
if latency_ms is not None:
args["latency_ms"] = latency_ms
return _post("log_event", args)


def trace_step(
from_agent: Annotated[str, ParamMeta(description="agent_id of the sender")],
to_agent: Annotated[str, ParamMeta(description="agent_id of the receiver")],
label: Annotated[
Optional[str], ParamMeta(description="Short label shown on the arrow, e.g. 'handoff plan'")
] = None,
arrow_type: Annotated[
Literal["msg", "result", "retry", "tool"],
ParamMeta(description="Arrow style: msg=blue, result=green, retry=orange, tool=purple"),
] = "msg",
) -> str:
"""
Draw a directed arrow between two agents on the visibility canvas.

Call this whenever one agent hands work to another, so the dashboard shows
the message flow in real time.
"""
args: dict = {"from_agent": from_agent, "to_agent": to_agent, "arrow_type": arrow_type}
if label:
args[label] = label
return _post("trace_step", args)


def set_agent_state(
agent_id: Annotated[str, ParamMeta(description="agent_id to update")],
status: Annotated[
Literal["idle", "running", "active", "done", "error"],
ParamMeta(description="New status; controls the badge colour on the canvas node"),
],
) -> str:
"""
Update an agent's status badge on the visibility dashboard.

Use 'active' or 'running' when the agent starts working, and 'done' or
'error' when it finishes.
"""
return _post("set_agent_state", {"agent_id": agent_id, "status": status})


def set_plan(
tasks: Annotated[
list,
ParamMeta(
description=(
"List of task objects, each with at minimum an 'id' and 'label' field "
"and optionally 'agent' (agent_id responsible) and 'status'."
)
),
],
) -> str:
"""
Publish the workflow task plan to the Plan tab on the visibility dashboard.

Pass a list of task dicts, e.g.:
[{"id": "t1", "label": "Write code", "agent": "coder"},
{"id": "t2", "label": "Review code", "agent": "reviewer"}]
"""
return _post("set_plan", {"tasks": tasks})


def finish_run(
status: Annotated[
Literal["done", "error"],
ParamMeta(description="Final run status"),
] = "done",
) -> str:
"""
Mark the current workflow run as complete on the visibility dashboard.

Call this in the last agent or a teardown step. The dashboard will stop
its live timer and display the final status badge.
"""
return _post("finish_run", {"status": status})
73 changes: 71 additions & 2 deletions server/routes/execute_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from runtime.bootstrap.schema import ensure_schema_registry_populated
from runtime.sdk import OUTPUT_ROOT, run_workflow
from server.models import WorkflowRunRequest
from utils.visibility_bridge import VisibilityLogger, _reset_sync, _post, _slug, _infer_role
from server.settings import YAML_DIR
from utils.attachments import AttachmentStore
from utils.exceptions import ValidationError, WorkflowExecutionError
Expand Down Expand Up @@ -107,7 +108,7 @@ def _run_workflow_with_logger(
graph_context = GraphContext(config=graph_config)
task_input = _build_task_input(graph_context, task_prompt, attachments)

class _StreamingWorkflowLogger(WorkflowLogger):
class _StreamingWorkflowLogger(VisibilityLogger):
def add_log(self, *args, **kwargs):
entry = super().add_log(*args, **kwargs)
if entry:
Expand All @@ -124,6 +125,7 @@ def _create_logger(self) -> WorkflowLogger:
level,
use_structured_logging=True,
log_to_console=False,
task_prompt=task_prompt,
)

executor = _StreamingExecutor(graph_context, session_id=normalized_session)
Expand Down Expand Up @@ -250,4 +252,71 @@ async def stream():
if done_event.is_set():
break

return StreamingResponse(stream(), media_type=_SSE_CONTENT_TYPE)
return StreamingResponse(stream(), media_type=_SSE_CONTENT_TYPE)


@router.get("/api/workflow/preview/{yaml_file:path}")
async def preview_workflow(yaml_file: str):
"""
Load a workflow's graph definition and send it to the visibility dashboard
so the user sees the agent graph before launching the workflow.
"""
try:
yaml_path = _resolve_yaml_path(yaml_file)
if not yaml_path.exists():
raise HTTPException(status_code=404, detail=f"Workflow not found: {yaml_file}")

design = load_config(yaml_path)
graph_config = GraphConfig.from_definition(
design.graph,
name=yaml_path.stem,
output_root=OUTPUT_ROOT,
source_path=str(yaml_path),
vars=design.vars,
)

def _send():
_reset_sync()

nodes = graph_config.get_node_definitions()
edges = graph_config.get_edge_definitions()

for node in nodes:
agent_id = _slug(node.id)
role = _infer_role(node.id, getattr(node, "type", "agent"))
_post("register_agent", {"id": agent_id, "label": node.id, "role": role})
_post("set_agent_state", {"agent_id": agent_id, "status": "idle"})

for edge in edges:
source = getattr(edge, "source", None)
target = getattr(edge, "target", None)
if source and target:
_post("trace_step", {
"from_agent": _slug(source),
"to_agent": _slug(target),
"arrow_type": "msg",
})

node_index = {node.id: i for i, node in enumerate(nodes)}
plan_tasks = []
for i, node in enumerate(nodes):
incoming = [
node_index[e.source]
for e in edges
if getattr(e, "target", None) == node.id and e.source in node_index
]
plan_tasks.append({
"agent": _slug(node.id),
"task": node.id,
"depends_on": incoming,
})
_post("set_plan", {"tasks": plan_tasks})
_post("set_goal", {"goal": f"Preview: {yaml_path.stem}"})

await run_in_threadpool(_send)
return {"status": "ok"}

except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
6 changes: 5 additions & 1 deletion server/services/websocket_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ def __init__(
websocket_manager,
session_store: WorkflowSessionStore,
cancel_event=None,
task_prompt: str = None,
):
self.session_id = session_id
self.session_controller = session_controller
self.attachment_service = attachment_service
self.websocket_manager = websocket_manager
self.session_store = session_store
self.results = {}
self.task_prompt = task_prompt
self.artifact_dispatcher = ArtifactDispatcher(session_id, session_store, websocket_manager)

def hook_factory(runtime_context):
Expand All @@ -60,7 +62,9 @@ def hook_factory(runtime_context):
def _create_logger(self) -> WorkflowLogger:
from server.services.websocket_logger import WebSocketLogger

return WebSocketLogger(self.websocket_manager, self.session_id, self.graph.name, self.graph.log_level)
return WebSocketLogger(self.websocket_manager, self.session_id, self.graph.name,
self.graph.log_level, task_prompt=self.task_prompt,
graph_config=self.graph.config)

async def execute_graph_async(self, task_prompt):
await asyncio.get_event_loop().run_in_executor(None, self._execute, task_prompt)
Expand Down
16 changes: 10 additions & 6 deletions server/services/websocket_logger.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import asyncio
from typing import Any, Dict
from typing import Any, Dict, Optional

from entity.enums import LogLevel, EventType
from utils.logger import WorkflowLogger, LogEntry
from utils.logger import LogEntry
from utils.visibility_bridge import VisibilityLogger
from utils.structured_logger import get_workflow_logger


class WebSocketLogger(WorkflowLogger):
class WebSocketLogger(VisibilityLogger):
"""Workflow logger that also pushes entries via WebSocket."""

def __init__(self, websocket_manager, session_id: str, workflow_id: str = None, log_level: LogLevel = LogLevel.DEBUG):
super().__init__(workflow_id, log_level, log_to_console=False)
def __init__(self, websocket_manager, session_id: str, workflow_id: str = None,
log_level: LogLevel = LogLevel.DEBUG, task_prompt: Optional[str] = None,
graph_config=None):
super().__init__(workflow_id, log_level, log_to_console=False,
task_prompt=task_prompt, graph_config=graph_config)
self.websocket_manager = websocket_manager
self.session_id = session_id

Expand All @@ -26,5 +30,5 @@ def add_log(self, level: LogLevel, message: str = None, node_id: str = None,
"type": "log",
"data": log_entry.to_dict()
})

return log_entry
3 changes: 3 additions & 0 deletions server/services/workflow_run_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from entity.messages import Message
from entity.enums import LogLevel
from utils.exceptions import ValidationError, WorkflowCancelledError
from utils.visibility_bridge import _reset_sync
from utils.structured_logger import get_server_logger, LogType
from utils.task_input import TaskInputBuilder
from workflow.graph_context import GraphContext
Expand Down Expand Up @@ -166,6 +167,7 @@ async def _execute_workflow_async(
websocket_manager,
self.session_store,
cancel_event=cancel_event,
task_prompt=task_prompt,
)

if session:
Expand All @@ -182,6 +184,7 @@ async def _execute_workflow_async(
executor.attachment_store,
)

_reset_sync() # clear dashboard before every run
await executor.execute_graph_async(task_input)

# If cancellation was requested during execution but not raised inside threads,
Expand Down
Loading