Skip to content

Commit 2205e3d

Browse files
feat: voice dashboard, barge-in detection, and kernel integration
Dashboard: - Browser-based voice/text chat at /dashboard via WebSocket (/ws/chat) - Silero VAD v5 in-browser, PCM audio streaming, auto-reconnect - Three inference providers: MlxProvider (local Gemma), OllamaProvider, CogOSProvider - Agent loop with tool-calling (speak/send_text) — Gemma 4 E4B works natively Barge-in: - SuperWhisper recording detection via filesystem watching (bargein-producer.py) - speak() returns "held" when user is recording — no zombie queued jobs - Interrupt context written back to signal file (spoken_pct, delivered_text) - 150ms poll, pure stdlib Python, zero external dependencies Kernel integration: - Agent loop pulls CogOS kernel context per turn (identity, state, barge-in history) - Exchanges logged to CogOS bus for observation by higher-level agents - CogOSProvider routes through kernel /v1/chat/completions Reliability: - Non-blocking speak() in agent loop (fire-and-forget via bus.act) - Kokoro pre-warm on server startup (eliminates 60s cold start) - WebSocket cleanup: cancel pending TTS jobs on disconnect - Graceful /shutdown endpoint with drain + SIGINT - Standardized /health with uptime, engine status, queue state - /capabilities endpoint with dynamic voice lists New files: agent_loop.py, channels.py, providers.py, dashboard/, integrations/bargein-producer.py Modified: http_api.py, server.py, output_queue.py, requirements.txt Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 99a700a commit 2205e3d

16 files changed

Lines changed: 2663 additions & 23 deletions

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,9 @@ __pycache__/
33
*.pyc
44
.DS_Store
55
.claude/
6+
7+
# ONNX Runtime WASM binaries (large, download via setup.sh)
8+
dashboard/vad/*.wasm
9+
dashboard/vad/*.mjs
10+
dashboard/vad/ort.js
11+
dashboard/vad/ort.min.mjs

agent_loop.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
"""Agent loop — receives percepts, calls LLM with tools, dispatches actions.
2+
3+
The agent loop is the bridge between the ModalityBus (perception/action)
4+
and the InferenceProvider (thinking). It maintains conversation history
5+
and routes tool calls through the bus.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import json as _json
11+
import logging
12+
import os
13+
import time
14+
from typing import TYPE_CHECKING, Any
15+
16+
import httpx
17+
18+
from bus import ModalityBus
19+
from modality import CognitiveEvent, CognitiveIntent, ModalityType
20+
from pipeline_state import PipelineState
21+
from providers import AGENT_TOOLS, InferenceProvider
22+
23+
if TYPE_CHECKING:
24+
from channels import BrowserChannel
25+
26+
logger = logging.getLogger("mod3.agent_loop")
27+
28+
# Base system prompt — kernel context is appended dynamically
29+
_BASE_SYSTEM_PROMPT = (
30+
"You are Cog, a voice assistant running on Mod³ (Apple Silicon, fully local). "
31+
"You respond using tool calls. Use speak() for conversational voice responses — "
32+
"keep them concise, 1-3 sentences. Use send_text() only when the content is "
33+
"better read than heard (code, lists, links, structured data). "
34+
"No markdown in speak() text. Speak naturally. "
35+
"If the user asks something you can't do, say so briefly."
36+
)
37+
38+
# CogOS kernel endpoint for context enrichment
39+
_COGOS_ENDPOINT = os.environ.get("COGOS_ENDPOINT", "http://localhost:6931")
40+
41+
# Bus endpoint for logging exchanges (observation channel)
42+
_COGOS_BUS_ENDPOINT = f"{_COGOS_ENDPOINT}/v1/bus"
43+
44+
45+
def _fetch_kernel_context() -> str:
46+
"""Pull active context from CogOS kernel to enrich the system prompt.
47+
48+
Returns a context block string, or empty string if kernel unavailable.
49+
This is the afferent path: kernel → local model.
50+
"""
51+
try:
52+
resp = httpx.get(f"{_COGOS_ENDPOINT}/health", timeout=2.0)
53+
if resp.status_code != 200:
54+
return ""
55+
health = resp.json()
56+
57+
parts = []
58+
identity = health.get("identity", "cog")
59+
state = health.get("state", "unknown")
60+
parts.append(f"Kernel identity: {identity}, state: {state}")
61+
62+
# Try to get active session context
63+
try:
64+
ctx_resp = httpx.get(f"{_COGOS_ENDPOINT}/v1/context", timeout=2.0)
65+
if ctx_resp.status_code == 200:
66+
ctx = ctx_resp.json()
67+
nucleus = ctx.get("nucleus", "")
68+
if nucleus:
69+
parts.append(f"Active nucleus: {nucleus}")
70+
process_state = ctx.get("state", "")
71+
if process_state:
72+
parts.append(f"Process state: {process_state}")
73+
except Exception:
74+
pass
75+
76+
# Check for barge-in context (what was Claude saying when interrupted?)
77+
signal_file = os.environ.get("BARGEIN_SIGNAL", "/tmp/mod3-barge-in.json")
78+
try:
79+
if os.path.exists(signal_file):
80+
with open(signal_file) as f:
81+
signal = _json.load(f)
82+
interrupted = signal.get("interrupted")
83+
if interrupted:
84+
delivered = interrupted.get("delivered_text", "")
85+
full = interrupted.get("full_text", "")
86+
pct = interrupted.get("spoken_pct", 0)
87+
parts.append(
88+
f"[barge-in] Claude's speech was interrupted at {pct*100:.0f}%. "
89+
f"Delivered: \"{delivered}\". "
90+
f"The user interrupted to say something — acknowledge and respond to them."
91+
)
92+
except Exception:
93+
pass
94+
95+
if parts:
96+
return "\n\nKernel context:\n" + "\n".join(f"- {p}" for p in parts)
97+
return ""
98+
except Exception:
99+
return ""
100+
101+
102+
def _log_exchange_to_bus(user_text: str, assistant_text: str, provider_name: str):
103+
"""Log the local model exchange to the CogOS bus (observation channel).
104+
105+
This is the efferent path: local model → kernel → Claude can observe.
106+
"""
107+
try:
108+
payload = {
109+
"type": "modality.voice.exchange",
110+
"from": f"mod3-reflex:{provider_name}",
111+
"payload": {
112+
"user": user_text,
113+
"assistant": assistant_text,
114+
"provider": provider_name,
115+
"timestamp": time.time(),
116+
},
117+
}
118+
httpx.post(
119+
_COGOS_BUS_ENDPOINT,
120+
json=payload,
121+
timeout=2.0,
122+
)
123+
except Exception as e:
124+
logger.debug("Failed to log exchange to bus: %s", e)
125+
126+
MAX_HISTORY = 50
127+
128+
129+
class AgentLoop:
130+
"""Conversational agent that receives percepts and acts through the bus."""
131+
132+
def __init__(
133+
self,
134+
bus: ModalityBus,
135+
provider: InferenceProvider,
136+
pipeline_state: PipelineState,
137+
channel_id: str = "",
138+
):
139+
self.bus = bus
140+
self.provider = provider
141+
self.pipeline_state = pipeline_state
142+
self.channel_id = channel_id
143+
self.conversation: list[dict[str, str]] = []
144+
self._channel_ref: BrowserChannel | None = None
145+
self._processing = False
146+
147+
async def handle_event(self, event: CognitiveEvent) -> None:
148+
"""Called when a CognitiveEvent arrives from the channel."""
149+
if not event.content.strip():
150+
return
151+
152+
if self._processing:
153+
logger.warning("agent busy, dropping: %s", event.content[:50])
154+
return
155+
156+
self._processing = True
157+
try:
158+
await self._process(event)
159+
except Exception as e:
160+
logger.error("agent_loop error: %s", e, exc_info=True)
161+
try:
162+
if self._channel_ref:
163+
await self._channel_ref.send_response_text(f"[error: {e}]")
164+
await self._channel_ref.send_response_complete()
165+
except Exception:
166+
pass # channel may be dead, don't block finally
167+
finally:
168+
self._processing = False
169+
170+
async def _process(self, event: CognitiveEvent) -> None:
171+
"""Core: event → provider → tool dispatch."""
172+
self.conversation.append({"role": "user", "content": event.content})
173+
self._trim_history()
174+
175+
t_start = time.perf_counter()
176+
177+
# Assemble system prompt with kernel context (afferent path)
178+
kernel_ctx = _fetch_kernel_context()
179+
system_prompt = _BASE_SYSTEM_PROMPT + kernel_ctx
180+
181+
response = await self.provider.chat(
182+
messages=self.conversation,
183+
tools=AGENT_TOOLS,
184+
system=system_prompt,
185+
)
186+
187+
t_llm = (time.perf_counter() - t_start) * 1000
188+
189+
# Dispatch tool calls
190+
assistant_parts: list[str] = []
191+
192+
for tc in response.tool_calls:
193+
if tc.name == "speak":
194+
text = tc.arguments.get("text", "")
195+
if text:
196+
assistant_parts.append(text)
197+
# Show text in chat panel
198+
if self._channel_ref:
199+
await self._channel_ref.send_response_text(text)
200+
# Route through bus → VoiceEncoder → TTS → channel.deliver
201+
intent = CognitiveIntent(
202+
modality=ModalityType.VOICE,
203+
content=text,
204+
target_channel=self.channel_id,
205+
metadata={
206+
"voice": self._channel_ref.config.get("voice", "bm_lewis") if self._channel_ref else "bm_lewis",
207+
"speed": self._channel_ref.config.get("speed", 1.25) if self._channel_ref else 1.25,
208+
},
209+
)
210+
# Fire-and-forget: bus.act(blocking=False) returns QueuedJob immediately,
211+
# OutputQueue drain thread handles TTS encoding + delivery.
212+
self.bus.act(intent, channel=self.channel_id)
213+
214+
elif tc.name == "send_text":
215+
text = tc.arguments.get("text", "")
216+
if text:
217+
assistant_parts.append(text)
218+
if self._channel_ref:
219+
await self._channel_ref.send_response_text(text)
220+
221+
# Fallback: if provider returned text but no tool calls, auto-speak
222+
if not response.tool_calls and response.text:
223+
text = response.text
224+
assistant_parts.append(text)
225+
if self._channel_ref:
226+
await self._channel_ref.send_response_text(text)
227+
intent = CognitiveIntent(
228+
modality=ModalityType.VOICE,
229+
content=text,
230+
target_channel=self.channel_id,
231+
metadata={
232+
"voice": self._channel_ref.config.get("voice", "bm_lewis") if self._channel_ref else "bm_lewis",
233+
"speed": self._channel_ref.config.get("speed", 1.25) if self._channel_ref else 1.25,
234+
},
235+
)
236+
self.bus.act(intent, channel=self.channel_id)
237+
238+
# Update conversation history
239+
if assistant_parts:
240+
assistant_text = " ".join(assistant_parts)
241+
self.conversation.append({
242+
"role": "assistant",
243+
"content": assistant_text,
244+
})
245+
246+
# Log exchange to CogOS bus (observation channel — Claude can see this)
247+
_log_exchange_to_bus(event.content, assistant_text, self.provider.name)
248+
249+
# Signal completion
250+
if self._channel_ref:
251+
await self._channel_ref.send_response_complete(
252+
metrics={"llm_ms": round(t_llm, 1), "provider": self.provider.name}
253+
)
254+
255+
def _trim_history(self) -> None:
256+
"""Keep conversation within MAX_HISTORY messages."""
257+
if len(self.conversation) > MAX_HISTORY:
258+
self.conversation = self.conversation[-MAX_HISTORY:]

0 commit comments

Comments
 (0)