Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c57140e
refactor: rename add_node/update_node_init_args, fix edge deletion on…
xingjianll Apr 4, 2026
883ee8c
refactor: separate graph topology, channel topology, and wiring
xingjianll Apr 4, 2026
0d8fb68
refactor: three-layer separation of graph topology, channel topology,…
xingjianll Apr 4, 2026
afff429
refactor: extract UI channels into UIChannelBridge, simplify WS contr…
xingjianll Apr 4, 2026
60daaa3
refactor: extract UI channels into UIChannelBridge, simplify WS contr…
xingjianll Apr 4, 2026
8534dc3
style: ruff format
xingjianll Apr 4, 2026
5e3619f
fix: bridge.run() stores event loop, wire() schedules tasks via call_…
xingjianll Apr 5, 2026
ec43e41
fix: handle WS disconnect in recv loop, fix event loop issue in wire()
xingjianll Apr 5, 2026
47897b1
fix: sidebar groups by functionality then IO, fix tag names (vision/m…
xingjianll Apr 5, 2026
c9baf40
fix: pass onEdgesDelete through GraphCanvas to ReactFlow
xingjianll Apr 5, 2026
3f887f1
fix: update profiling scripts for Receiver(channel, stop_event) signa…
xingjianll Apr 5, 2026
4347722
fix: update frontend tests for sidebar grouping and edge deletion cha…
xingjianll Apr 5, 2026
3d19e5b
style: ruff format ttfa_profile.py
xingjianll Apr 5, 2026
d6a17d7
fix: sidebar test - Misc only exists in composites, not primitives
xingjianll Apr 5, 2026
32a7d2d
fix: mock fetchEdges after node deletion in App test
xingjianll Apr 5, 2026
fe0ab56
docs: update DEVELOPER.md for three-layer architecture and UI bridge
xingjianll Apr 5, 2026
0839bf0
fix: dev script ROCm flag, add --amd alias, update README
xingjianll Apr 5, 2026
1fa0660
fix: component stop latency — DoNothing None check, Throttle cooperat…
xingjianll Apr 5, 2026
97614de
perf: batch subtype checking — single request instead of N^2
xingjianll Apr 5, 2026
4c4d3bc
fix: update throttle test for stop_event.wait instead of time.sleep
xingjianll Apr 5, 2026
70929d9
fix: update typecheck tests to use batch fetchSubtypePairs mock
xingjianll Apr 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 39 additions & 22 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,36 @@ graph LR
- `_stopped` flag makes `send()` a no-op after pipeline stop

**`Receiver[T]`** — reads from one channel as an iterator:
- Registers with the channel on construction (`__init__` takes `channel` and `stop_event`)
- **Blocking mode** (default): blocks on `__next__()` until data arrives or `stop_event` fires
- **Non-blocking mode**: returns `None` immediately if no data
- **Newest mode**: fast-forwards cursor to latest item (essential for video to prevent lag)
- Tracks `_msg_count`, `_byte_count`, `lag` for metrics
- Handles are **ephemeral** — created fresh on each `run()`, stored on `Node.senders`/`Node.receivers`, discarded on `stop()`

#### Channel Reconciliation
#### Channel Topology

When edges are added/removed, `GraphManager._reconcile()` recomputes the optimal channel layout:
The system separates **graph topology** (nodes + edges), **channel topology** (the wiring plan), and **wiring** (live handle creation) into three layers:

1. **Graph topology** — trivial CRUD on nodes and edges
2. **Channel topology** — `_reconcile()` computes a `(sender_plan, receiver_plan)` from the current edges. It only produces a plan, never creates Sender/Receiver handles.
3. **Wiring** — `run()` reads the plan and creates fresh handles, storing them on `Node.senders` and `Node.receivers`. `stop()` discards them.

```mermaid
graph TD
A[Collect all edges] --> B[Group receivers by identical sender set]
B --> C[Diff against existing channels]
C --> D[Reuse unchanged channels]
C --> E[Create new channels for new groups]
D & E --> F[Rebuild Sender/Receiver handles]
F --> G[Ensure every output slot has a Sender]
D & E --> F["Return (sender_plan, receiver_plan)"]
F --> G["run() creates fresh handles from plan"]
```

Receivers sharing the same set of upstream senders share a single `Channel` instance, minimizing memory and synchronization overhead.

#### UI Channels

UI channels are type-system markers that route data to/from the WebSocket instead of inter-component edges:
UI channels are type-system markers that route data to/from the WebSocket instead of inter-component edges. They are managed by `UIChannelBridge` (in `src/api/ui/bridge.py`), **not** by GraphManager:

| Marker Class | Direction | Use Case |
|---|---|---|
Expand All @@ -200,6 +206,8 @@ UI channels are type-system markers that route data to/from the WebSocket instea
| `UITextReceiver` | frontend → component | Text input from node UI |
| `UIKeystrokeReceiver` | frontend → component | Individual keystrokes from node UI |

The bridge creates UI channels via `wire(manager)`, which returns overrides passed to `GraphManager.run()`. It owns the WebSocket lifecycle via `run(ws)` — spawning outbound tasks per UI output receiver and handling inbound messages in a receive loop.

---

### Frame Types
Expand Down Expand Up @@ -273,22 +281,29 @@ Key design decisions:

### GraphManager

The `GraphManager` is the runtime orchestrator. It owns the graph definition, component instances, and all channel/handle state.
The `GraphManager` is the runtime orchestrator. It owns the graph definition, component instances, and channel topology. Sender/Receiver handles are stored on `Node` objects, not on the manager.

```mermaid
graph TD
subgraph GraphManager
Graph["Graph (nodes + edges)"]
CompMap["Component instances"]
ChanMap["Channel map"]
Senders["Sender handles"]
Receivers["Receiver handles"]
UIChan["UI channels"]
end

subgraph Node
Senders["senders: dict"]
Receivers["receivers: dict"]
end

subgraph UIChannelBridge
UISend["UI senders (server-side)"]
UIRecv["UI receivers (server-side)"]
end

Graph -->|"_reconcile()"| ChanMap
ChanMap --> Senders
ChanMap --> Receivers
ChanMap -->|"run(overrides)"| Node
UIChannelBridge -->|"wire() → overrides"| ChanMap
CompMap -->|"run()"| Threads["Daemon threads"]
```

Expand All @@ -297,34 +312,34 @@ graph TD
```mermaid
sequenceDiagram
participant Client
participant Bridge as UIChannelBridge
participant GM as GraphManager
participant Comp as Components

Client->>GM: run()
Client->>Bridge: wire(manager)
Bridge-->>Client: (recv_overrides, send_overrides)
Client->>GM: run(recv_overrides, send_overrides)
GM->>Comp: stop() all (if running)
GM->>GM: Clear UI channels
GM->>GM: _reconcile() → (sender_plan, receiver_plan)
loop For each node
GM->>GM: Build input/output handles
GM->>GM: Create UI channels
GM->>GM: Wire receivers (register cursors)
GM->>GM: Create fresh Sender/Receiver from plan + overrides
GM->>GM: Store on node.senders / node.receivers
end
GM->>Comp: setup(outputs) — all components, sequential
GM->>Comp: start(inputs, outputs) — spawns daemon threads
GM->>GM: Register threads with log store
GM->>GM: Notify WebSocket watchers

Client->>GM: stop()
GM->>GM: Set _stopped on all senders
GM->>GM: Set _stopped on all node senders
GM->>Comp: stop() — sets stop_event on each thread
```

#### Node CRUD

| Method | Effect |
|---|---|
| `add_node(type, init_args)` | Instantiate component, add to graph, reconcile channels |
| `delete_node(id)` | Stop component + connected neighbors, remove edges, reconcile |
| `update_node_init_args(id, args)` | Recreate component; if graph was running, auto-restart (hot-reload) |
| `add_primitive_node(type_, init_args)` | Instantiate component, add to graph, reconcile channels |
| `delete_node(id)` | Stop component + downstream nodes, remove edges, reconcile |
| `update_primitive_node_init_args(id, args)` | Recreate component; returns `(node, was_running)` — caller handles restart |
| `add_edge(edge)` / `delete_edge(edge)` | Modify graph topology, reconcile channels |
| `reset(graph)` | Replace entire graph — stop everything, re-instantiate all components |

Expand Down Expand Up @@ -416,6 +431,8 @@ graph TD

**State management** is local React hooks — no Redux or Zustand. ReactFlow manages node/edge state via `useNodesState` / `useEdgesState`. A single `UIChannelContext` provides the WebSocket manager.

The frontend calls the backend directly at `http://localhost:8000` via `API_BASE` / `WS_BASE` constants in `src/lib/api.ts` — no Vite proxy.

**Key hooks:**

| Hook | Purpose |
Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ cd frontend && bun install && cd ..

# Start both backend + frontend
bun run dev

# For AMD GPUs (ROCm):
bun run dev -- --amd
```

This runs the backend (FastAPI on `:8000`) and frontend (Vite on `:5173`) concurrently.
Expand Down Expand Up @@ -139,7 +142,18 @@ cd backend
uv sync --group cuda12
```

For **AMD GPUs** (ROCm), PyTorch's ROCm builds are mapped to `cuda` internally — install the appropriate ROCm wheels for your platform.
For **AMD GPUs** (ROCm):

```bash
cd backend
uv sync --group rocm --no-default-groups
```

Or use the dev script flag:

```bash
bun run dev -- --amd
```

### Environment Variables

Expand Down
8 changes: 4 additions & 4 deletions backend/profiling/pipeline_hop_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,20 @@ def run_test(model: str):
# Wire: FileSource -> VAD -> ASR -> Adapter -> LLM -> NullSink
ch1: Channel[AudioFrame] = Channel()
s1 = Sender(ch1)
r1 = Receiver(ch1)
r1 = Receiver(ch1, threading.Event())
ch2: Channel[AudioFrame] = Channel()
s2 = Sender(ch2)
r2 = Receiver(ch2)
r2 = Receiver(ch2, threading.Event())
ch3: Channel[TextFrame] = Channel()
ch4: Channel[list[MessageFrame]] = Channel()
s4 = Sender(ch4)
r4 = Receiver(ch4)
r4 = Receiver(ch4, threading.Event())

# Adapter thread
def adapter():
comp = _Adapter()
comp._stop_event = threading.Event()
for text_frame in Receiver(ch3)(comp):
for text_frame in Receiver(ch3, threading.Event()):
if text_frame is None:
break
if comp.stop_event.is_set():
Expand Down
15 changes: 9 additions & 6 deletions backend/profiling/ttfa_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ def wrap_audio(item):
def adapter3():
comp = _Stub()
comp._stop_event = threading.Event()
for tf in Receiver(ch3)(comp):
for tf in Receiver(ch3, threading.Event()):
if tf is None:
break
msgs = [
Expand All @@ -607,19 +607,22 @@ def adapter3():
vad_2 = VAD(config=VADConfig())
file_source_2 = FileSource(wav_path=wav_path)

null_sink_2.start(NullSinkInputs(audio=Receiver(ch6)), ())
null_sink_2.start(NullSinkInputs(audio=Receiver(ch6, threading.Event())), ())
tts_2.start(
tts_mod.TTSInputs(text=Receiver(ch5)),
tts_mod.TTSInputs(text=Receiver(ch5, threading.Event())),
tts_mod.TTSOutputs(audio=Sender(ch6), text=Sender()),
)
llm_2.start(
llm_mod.LLMInputs(messages=Receiver(ch4)), llm_mod.LLMOutputs(token=Sender(ch5))
llm_mod.LLMInputs(messages=Receiver(ch4, threading.Event())),
llm_mod.LLMOutputs(token=Sender(ch5)),
)
asr_2.start(
asr_mod.ASRInputs(audio=Receiver(ch2)), asr_mod.ASROutputs(text=Sender(ch3))
asr_mod.ASRInputs(audio=Receiver(ch2, threading.Event())),
asr_mod.ASROutputs(text=Sender(ch3)),
)
vad_2.start(
vad_mod.VADInputs(audio=Receiver(ch1)), vad_mod.VADOutputs(audio=Sender(ch2))
vad_mod.VADInputs(audio=Receiver(ch1, threading.Event())),
vad_mod.VADOutputs(audio=Sender(ch2)),
)
file_source_2.start((), FileSourceOutputs(audio=Sender(ch1)))

Expand Down
16 changes: 16 additions & 0 deletions backend/src/api/component/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ def is_subtype(sub: str = Query(), sup: str = Query()) -> bool:
return False


@router.post("/subtype-pairs")
def subtype_pairs(names: list[str]) -> list[list[str]]:
"""Return all [sub, sup] pairs where sub is a subtype of sup."""
result: list[list[str]] = []
for a in names:
for b in names:
if a == b:
continue
try:
if issubclass(_resolve_type(a), _resolve_type(b)):
result.append([a, b])
except ValueError:
pass
return result


@router.post("/{component_name}/options")
def get_options(
component_name: str,
Expand Down
5 changes: 5 additions & 0 deletions backend/src/api/dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

from fastapi import Request

from src.api.ui.bridge import UIChannelBridge
from src.core.graph import GraphManager


def get_manager(request: Request) -> GraphManager:
return request.app.state.manager


def get_ui_bridge(request: Request) -> UIChannelBridge:
return request.app.state.ui_bridge
10 changes: 7 additions & 3 deletions backend/src/api/graph/node/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from fastapi import APIRouter, Depends, HTTPException

from src.api.dep import get_manager
from src.api.dep import get_manager, get_ui_bridge
from src.api.ui.bridge import UIChannelBridge
from src.api.graph.node.dto import (
NodeInitArgsUpdateRequest,
NodeCreateRequest,
Expand Down Expand Up @@ -79,12 +80,15 @@ def update_node(


@router.patch("/nodes/{node_id}/init-args")
def update_node_init_args(
def update_primitive_node_init_args(
node_id: str,
req: NodeInitArgsUpdateRequest,
manager: GraphManager = Depends(get_manager),
ui_bridge: UIChannelBridge = Depends(get_ui_bridge),
) -> NodeResponse:
node = service.update_node_init_args(manager, node_id, req.init_args)
node = service.update_primitive_node_init_args(
manager, ui_bridge, node_id, req.init_args
)
if node is None:
raise HTTPException(status_code=404, detail=f"Node not found: {node_id}")
return _node_response(node_id, node, manager)
Expand Down
16 changes: 11 additions & 5 deletions backend/src/api/graph/node/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from src.api.graph.node.dto import NodeUpdateRequest
from src.api.ui.bridge import UIChannelBridge
from src.core.config import PROJECTS_DIR
from src.core.graph import Edge, Graph, GraphManager, Node

Expand All @@ -17,14 +18,14 @@ def get_node(manager: GraphManager, node_id: str) -> Node | None:


def create_node(
manager: GraphManager, node_type: str, init_args: dict[str, Any]
manager: GraphManager, type_: str, init_args: dict[str, Any]
) -> tuple[str, Node]:
try:
return manager.add_node(node_type, init_args)
return manager.add_primitive_node(type_, init_args)
except ValueError:
pass
# Fallback: try loading as a project
return create_from_project(manager, node_type)
return create_from_project(manager, type_)


def update_node(
Expand All @@ -37,12 +38,17 @@ def delete_node(manager: GraphManager, node_id: str) -> None:
manager.delete_node(node_id)


def update_node_init_args(
def update_primitive_node_init_args(
manager: GraphManager,
ui_bridge: UIChannelBridge,
node_id: str,
init_args: dict[str, Any],
) -> Node | None:
return manager.update_node_init_args(node_id, init_args)
node, was_running = manager.update_primitive_node_init_args(node_id, init_args)
if was_running and node is not None:
recv_overrides, send_overrides = ui_bridge.wire(manager)
manager.run(recv_overrides, send_overrides)
return node


def create_subgraph(
Expand Down
10 changes: 7 additions & 3 deletions backend/src/api/graph/run/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

from fastapi import APIRouter, Depends

from src.api.dep import get_manager
from src.api.dep import get_manager, get_ui_bridge
from src.api.graph.run import service
from src.api.ui.bridge import UIChannelBridge
from src.core.graph import GraphManager

router = APIRouter(prefix="/graph")


@router.post("/start", status_code=204)
def start_all(manager: GraphManager = Depends(get_manager)) -> None:
service.start_all(manager)
def start_all(
manager: GraphManager = Depends(get_manager),
ui_bridge: UIChannelBridge = Depends(get_ui_bridge),
) -> None:
service.start_all(manager, ui_bridge)


@router.post("/stop", status_code=204)
Expand Down
6 changes: 4 additions & 2 deletions backend/src/api/graph/run/service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

from src.api.ui.bridge import UIChannelBridge
from src.core.graph import GraphManager


def start_all(manager: GraphManager) -> None:
manager.run()
def start_all(manager: GraphManager, ui_bridge: UIChannelBridge) -> None:
recv_overrides, send_overrides = ui_bridge.wire(manager)
manager.run(recv_overrides, send_overrides)


def stop_all(manager: GraphManager) -> None:
Expand Down
3 changes: 2 additions & 1 deletion backend/src/api/metrics/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
ReceiverSnapshot,
SenderSnapshot,
)
from src.core.graph import GraphManager, ReceiverKey, SenderKey
from src.core.graph import GraphManager
from src.core.utils import ReceiverKey, SenderKey


class MetricsCollector:
Expand Down
Loading
Loading