diff --git a/Cargo.lock b/Cargo.lock index 23137e1b54..8061ca3e1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4032,6 +4032,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + [[package]] name = "tokio-tungstenite" version = "0.23.1" @@ -4463,9 +4475,15 @@ name = "trios-doctor" version = "0.1.0" dependencies = [ "anyhow", + "futures-util", "serde", "serde_json", "tempfile", + "tokio", + "tokio-tungstenite 0.21.0", + "tracing", + "tracing-subscriber", + "uuid", ] [[package]] @@ -4739,6 +4757,21 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "trios-queen-loop" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures-util", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.21.0", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "trios-rainbow-bridge" version = "0.1.0" @@ -5001,6 +5034,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand 0.8.6", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.23.0" diff --git a/Cargo.toml b/Cargo.toml index 7ca4af6e1f..8e04274523 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "crates/trios-fpga", "crates/trios-doctor", "crates/trios-doctor/rings/SILVER-RING-DR-04", + "crates/trios-queen-loop", # trios-ext — Ring Architecture (issue #247) "crates/trios-ext/rings/SILVER-RING-EXT-00", "crates/trios-ext/rings/SILVER-RING-EXT-01", diff --git a/crates/trios-doctor/Cargo.toml b/crates/trios-doctor/Cargo.toml index 00f3aaf5c0..4b75c79420 100644 --- a/crates/trios-doctor/Cargo.toml +++ b/crates/trios-doctor/Cargo.toml @@ -14,10 +14,21 @@ path = "src/main.rs" name = "validate_bpb" path = "src/validate_bpb.rs" +[[bin]] +name = "trios-doctor-loop" +path = "src/doctor_loop.rs" + [dependencies] anyhow.workspace = true serde.workspace = true serde_json.workspace = true +# Queen ↔ Doctor autonomous loop (bee/queen-doctor-autoloop) +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time", "net"] } +tokio-tungstenite = "0.21" +futures-util = "0.3" +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +uuid = { version = "1", features = ["v4"] } [dev-dependencies] tempfile.workspace = true diff --git a/crates/trios-doctor/src/doctor_loop.rs b/crates/trios-doctor/src/doctor_loop.rs new file mode 100644 index 0000000000..e299a323c4 --- /dev/null +++ b/crates/trios-doctor/src/doctor_loop.rs @@ -0,0 +1,289 @@ +//! `trios-doctor-loop` — autonomous Queen↔Doctor loop client (Doctor side). +//! +//! Connects to `trios-server` via WebSocket (`/ws`), subscribes to broadcast +//! events, filters `QueenOrder` events targeting `target_agent == "doctor"`, +//! executes the corresponding action through the existing `Doctor` API, and +//! publishes a `DoctorReport` back onto the bus via `doctor/report` method. +//! +//! Constitutional anchors: +//! - L1 — pure Rust, no shell scripts. +//! - L8 — push-first: this binary is committed before being run remotely. +//! - L21 — append-only: events are broadcast, never mutated. +//! - L24 — agent-to-agent traffic via the canonical bus, not sibling sockets. +//! - AGENTS.md AGENT T 6-phase cycle: this binary lives in phase RUN as the +//! Doctor's executor, and emits VERDICT input via DoctorReport. +//! +//! Anchor: φ² + φ⁻² = 3. + +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::{anyhow, Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + connect_async, + tungstenite::Message, + MaybeTlsStream, WebSocketStream, +}; +use futures_util::{SinkExt, StreamExt}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use trios_doctor::{CheckStatus, Doctor}; + +type WsClient = WebSocketStream>; + +/// CLI configuration. +#[derive(Debug, Clone)] +struct LoopConfig { + /// WebSocket URL (e.g. `ws://127.0.0.1:9005/ws`). + ws_url: String, + /// Workspace root that Doctor diagnoses. + workspace_root: PathBuf, + /// Agent identifier published in DoctorReport. + agent_id: String, + /// Reconnect delay on transport error. + reconnect_delay: Duration, +} + +impl LoopConfig { + fn from_env_and_args() -> Result { + let ws_url = std::env::var("TRIOS_DOCTOR_WS_URL") + .unwrap_or_else(|_| "ws://127.0.0.1:9005/ws".to_string()); + + let workspace_root = std::env::var("TRIOS_WORKSPACE") + .ok() + .map(PathBuf::from) + .or_else(|| std::env::current_dir().ok()) + .context("cannot determine workspace root (set TRIOS_WORKSPACE)")?; + + let agent_id = + std::env::var("TRIOS_DOCTOR_AGENT_ID").unwrap_or_else(|_| "doctor".to_string()); + + let reconnect_secs: u64 = std::env::var("TRIOS_DOCTOR_RECONNECT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5); + + Ok(Self { + ws_url, + workspace_root, + agent_id, + reconnect_delay: Duration::from_secs(reconnect_secs), + }) + } +} + +/// Subset of the bus event we care about. Only `QueenOrder` is interesting +/// to the Doctor loop. Any other variant is ignored. +#[derive(Debug, Deserialize)] +#[serde(tag = "type", content = "data")] +#[allow(clippy::large_enum_variant)] +enum InboundEvent { + QueenOrder { + order_id: String, + action: String, + target_agent: String, + params: Value, + ts: i64, + }, + /// Anything else — we tolerate it without erroring out. + #[serde(other)] + Ignored, +} + +#[derive(Debug, Serialize)] +struct WsRequest { + id: String, + method: String, + params: Value, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter("trios_doctor_loop=info,info") + .init(); + + let cfg = LoopConfig::from_env_and_args()?; + info!("trios-doctor-loop starting"); + info!(" ws_url = {}", cfg.ws_url); + info!(" workspace = {}", cfg.workspace_root.display()); + info!(" agent_id = {}", cfg.agent_id); + info!(" reconnect_in = {:?}", cfg.reconnect_delay); + info!(" φ² + φ⁻² = 3 (constitutional anchor)"); + + loop { + match run_session(&cfg).await { + Ok(()) => { + warn!("WS session ended cleanly; reconnecting in {:?}", cfg.reconnect_delay); + } + Err(e) => { + error!("WS session failed: {:#}; reconnecting in {:?}", e, cfg.reconnect_delay); + } + } + tokio::time::sleep(cfg.reconnect_delay).await; + } +} + +async fn run_session(cfg: &LoopConfig) -> Result<()> { + let (ws_stream, response) = connect_async(&cfg.ws_url) + .await + .with_context(|| format!("connecting to {}", cfg.ws_url))?; + info!( + "WS connected: {} (HTTP {})", + cfg.ws_url, + response.status().as_u16() + ); + + let mut ws: WsClient = ws_stream; + + while let Some(msg) = ws.next().await { + let msg = msg.context("WS recv")?; + match msg { + Message::Text(text) => { + if let Err(e) = handle_text_frame(&text, &mut ws, cfg).await { + warn!("frame handling error: {:#}", e); + } + } + Message::Close(_) => { + info!("WS closed by peer"); + return Ok(()); + } + Message::Ping(p) => { + ws.send(Message::Pong(p)).await.ok(); + } + _ => {} + } + } + + Ok(()) +} + +/// Decide whether a text frame is a `BusEvent` we should react to. +/// Server frames have shape `{"result": ...}` for RPC responses or are the raw +/// `BusEvent` JSON for broadcasts. We only react to broadcasts targeted at us. +async fn handle_text_frame(text: &str, ws: &mut WsClient, cfg: &LoopConfig) -> Result<()> { + let value: Value = match serde_json::from_str(text) { + Ok(v) => v, + Err(_) => return Ok(()), // not JSON → silently ignore + }; + + // Skip RPC responses + if value.get("result").is_some() && value.get("type").is_none() { + return Ok(()); + } + + let event: InboundEvent = match serde_json::from_value(value) { + Ok(e) => e, + Err(_) => return Ok(()), + }; + + match event { + InboundEvent::QueenOrder { + order_id, + action, + target_agent, + params, + ts, + } => { + if target_agent != cfg.agent_id && target_agent != "doctor" { + return Ok(()); + } + info!( + "QueenOrder received: order_id={} action={:?} ts={}", + order_id, action, ts + ); + let report = execute_order(&order_id, &action, ¶ms, cfg)?; + send_report(ws, report).await?; + } + InboundEvent::Ignored => {} + } + Ok(()) +} + +/// Honest execution: dispatch by action, run real Doctor checks where applicable. +fn execute_order( + order_id: &str, + action: &str, + _params: &Value, + cfg: &LoopConfig, +) -> Result { + let doctor = Doctor::new(&cfg.workspace_root); + let normalized = action.trim().to_ascii_lowercase(); + + match normalized.as_str() { + "doctor scan" | "doctor quick" | "doctor heal" => { + // All three currently map onto the same workspace diagnosis: the + // Doctor binary itself only differentiates by exit code, not by + // separate routines, so we honestly run the same set of checks + // and label the report with the requested action. + let diag = doctor.run_all(); + let any_red = diag.checks.iter().any(|c| c.status == CheckStatus::Red); + let any_yellow = diag.checks.iter().any(|c| c.status == CheckStatus::Yellow); + let status = if any_red { + "red" + } else if any_yellow { + "yellow" + } else { + "green" + } + .to_string(); + let summary = format!( + "{}: {} crates · {} checks · {}", + action, + diag.crate_count, + diag.checks.len(), + status + ); + let diagnosis = serde_json::to_value(&diag) + .unwrap_or_else(|_| json!({"error": "serialize diagnosis failed"})); + Ok(DoctorReportPayload { + order_id: order_id.to_string(), + agent_id: cfg.agent_id.clone(), + status, + summary, + diagnosis, + }) + } + other => { + // Unknown action → honest "noop" report so Queen knows we saw it. + Ok(DoctorReportPayload { + order_id: order_id.to_string(), + agent_id: cfg.agent_id.clone(), + status: "noop".into(), + summary: format!("doctor-loop: unsupported action {:?}", other), + diagnosis: json!({"action": other, "supported": [ + "doctor scan", "doctor quick", "doctor heal" + ]}), + }) + } + } +} + +#[derive(Debug, Serialize)] +struct DoctorReportPayload { + order_id: String, + agent_id: String, + status: String, + summary: String, + diagnosis: Value, +} + +async fn send_report(ws: &mut WsClient, report: DoctorReportPayload) -> Result<()> { + let req = WsRequest { + id: Uuid::new_v4().to_string(), + method: "doctor/report".into(), + params: serde_json::to_value(&report) + .map_err(|e| anyhow!("serialize DoctorReport: {}", e))?, + }; + let payload = serde_json::to_string(&req)?; + info!( + "sending DoctorReport order_id={} status={}", + report.order_id, report.status + ); + ws.send(Message::Text(payload)).await?; + Ok(()) +} diff --git a/crates/trios-queen-loop/Cargo.toml b/crates/trios-queen-loop/Cargo.toml new file mode 100644 index 0000000000..523eca21a9 --- /dev/null +++ b/crates/trios-queen-loop/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "trios-queen-loop" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +description = "Queen daemon: reads .trinity/queen/{senses,policy,actions}.json, picks an action, publishes QueenOrder onto the bus, awaits DoctorReport. Sister-binary to trios-doctor-loop." + +[[bin]] +name = "trios-queen-loop" +path = "src/main.rs" + +[dependencies] +anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time", "net", "sync"] } +tokio-tungstenite = "0.21" +futures-util = "0.3" +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +uuid = { version = "1", features = ["v4"] } diff --git a/crates/trios-queen-loop/README.md b/crates/trios-queen-loop/README.md new file mode 100644 index 0000000000..9e945013c4 --- /dev/null +++ b/crates/trios-queen-loop/README.md @@ -0,0 +1,74 @@ +# trios-queen-loop + +Autonomous **Queen** daemon for the Queen↔Doctor closed loop. + +``` + .trinity/queen/ + ├── policy.json (god_mode, max_auto_level=3) + └── actions.json (29 catalogued actions) + │ + ▼ + ┌──────────────────────┐ queen/order ┌──────────────────┐ + │ trios-queen-loop │ ───────────────────▶ │ trios-server │ + │ (this crate) │ │ /operator WS │ + │ │ ◀─────────────────── │ │ + └──────────────────────┘ BusEvent:: └──────────────────┘ + DoctorReport │ + │ broadcast + ▼ + ┌──────────────────┐ + │ trios-doctor- │ + │ loop (subscriber)│ + └──────────────────┘ +``` + +## What it does + +Every `TRIOS_QUEEN_TICK_SECS` (default 60) the Queen daemon: + +1. Connects to `ws://127.0.0.1:9005/operator?token=$TRIOS_OPERATOR_TOKEN` + (URL: `TRIOS_QUEEN_WS_URL`, token: `TRIOS_OPERATOR_TOKEN`). +2. Reads `.trinity/queen/policy.json` and `.trinity/queen/actions.json` from the + workspace root. +3. Filters actions whose `command` starts with `doctor ` and whose `level` + ≤ `policy.max_auto_level` (default 3). +4. Picks one deterministically: `(epoch_secs / tick_secs) % candidates.len()` + — gives stable rotation without persistent state. +5. Sends a `queen/order` RPC frame: + ```json + {"jsonrpc":"2.0","id":"","method":"queen/order", + "params":{"action":"doctor scan","target_agent":"doctor", + "params":{"soul":"SCARABS","tick":42}}} + ``` +6. Tracks the returned `order_id` in a small in-memory ring buffer (`VecDeque<64>`) + and correlates incoming `BusEvent::DoctorReport` events back to the + originating order. + +If the WS connection drops, the daemon waits `TRIOS_DOCTOR_RECONNECT_SECS` +(default 5) and reconnects — same socket, append-only stream (Constitution L21). + +## Environment + +| Variable | Default | +|-------------------------------|-----------------------------------------| +| `TRIOS_QUEEN_WS_URL` | `ws://127.0.0.1:9005/operator` | +| `TRIOS_OPERATOR_TOKEN` | _(none)_ — required for `/operator` | +| `TRIOS_QUEEN_TICK_SECS` | `60` | +| `TRIOS_QUEEN_RECONNECT_SECS` | `5` | +| `TRIOS_QUEEN_SOUL` | `SCARABS` | +| `TRIOS_QUEEN_WORKSPACE` | _current dir_ (looks for `.trinity/...`)| + +## Run + +```bash +cargo run -p trios-queen-loop --bin trios-queen-loop +``` + +## Constitutional anchors + +* φ² + φ⁻² = 3 +* **L1** — pure Rust, no shell scripts. +* **L11** — every order carries a `soul` field (default `SCARABS`). +* **L14** — commit trailer `Agent: SCARABS`. +* **L21** — broadcast bus is append-only. +* **L24** — agents speak only via the canonical bus, never sibling sockets. diff --git a/crates/trios-queen-loop/src/main.rs b/crates/trios-queen-loop/src/main.rs new file mode 100644 index 0000000000..4f1f400fb3 --- /dev/null +++ b/crates/trios-queen-loop/src/main.rs @@ -0,0 +1,342 @@ +//! `trios-queen-loop` — autonomous Queen↔Doctor loop client (Queen side). +//! +//! Sister-binary to `trios-doctor-loop`. The Queen daemon: +//! 1. Reads `.trinity/queen/{policy,actions,senses,supervisor_state}.json` +//! from the workspace root. +//! 2. Picks one action permitted by `policy.max_auto_level` (default 3). +//! 3. Publishes a `queen/order` request onto the bus via `/operator` WS. +//! 4. Awaits matching `DoctorReport` events broadcast back on the same bus. +//! 5. Sleeps `tick_secs` and repeats. +//! +//! No fake scripts: every order is a real WS frame, every report is parsed +//! straight from the bus, every action id comes from on-disk JSON. +//! +//! Constitutional anchors: +//! - L1 — pure Rust, no shell scripts. +//! - L8 — push-first. +//! - L11 — soul-name on every order (default: SCARABS). +//! - L21 — append-only events. +//! - L24 — bus-mediated agent traffic. +//! - AGENTS.md AGENT T 6-phase cycle: this binary owns phases PLAN/ASSIGN. +//! +//! Anchor: φ² + φ⁻² = 3. + +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::{anyhow, Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::mpsc; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use futures_util::{SinkExt, StreamExt}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +/// Queen-side loop configuration. +#[derive(Debug, Clone)] +struct QueenConfig { + ws_url: String, + operator_token: Option, + workspace_root: PathBuf, + soul_name: String, + tick: Duration, + reconnect_delay: Duration, + target_agent: String, + /// Hard ceiling on action.level we are allowed to dispatch. + /// Read from `.trinity/queen/policy.json` `max_auto_level` field. + max_auto_level_override: Option, +} + +impl QueenConfig { + fn from_env() -> Result { + let ws_url = std::env::var("TRIOS_QUEEN_WS_URL") + .unwrap_or_else(|_| "ws://127.0.0.1:9005/operator".to_string()); + let operator_token = std::env::var("TRIOS_OPERATOR_TOKEN").ok().filter(|s| !s.is_empty()); + let workspace_root = std::env::var("TRIOS_WORKSPACE") + .ok() + .map(PathBuf::from) + .or_else(|| std::env::current_dir().ok()) + .context("cannot determine workspace root (set TRIOS_WORKSPACE)")?; + let soul_name = std::env::var("TRIOS_QUEEN_SOUL").unwrap_or_else(|_| "SCARABS".to_string()); + let tick_secs: u64 = std::env::var("TRIOS_QUEEN_TICK_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(60); + let reconnect_secs: u64 = std::env::var("TRIOS_QUEEN_RECONNECT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5); + let target_agent = + std::env::var("TRIOS_QUEEN_TARGET").unwrap_or_else(|_| "doctor".to_string()); + let max_auto_level_override = std::env::var("TRIOS_QUEEN_MAX_LEVEL") + .ok() + .and_then(|v| v.parse().ok()); + Ok(Self { + ws_url, + operator_token, + workspace_root, + soul_name, + tick: Duration::from_secs(tick_secs), + reconnect_delay: Duration::from_secs(reconnect_secs), + target_agent, + max_auto_level_override, + }) + } + + fn full_ws_url(&self) -> String { + match &self.operator_token { + Some(t) => format!("{}?token={}", self.ws_url, t), + None => self.ws_url.clone(), + } + } +} + +/// Snapshot of `.trinity/queen/policy.json`. +#[derive(Debug, Clone, Deserialize)] +struct Policy { + #[serde(default)] + god_mode: bool, + #[serde(default = "default_max_level")] + max_auto_level: u8, +} + +fn default_max_level() -> u8 { + 3 +} + +/// One row from `.trinity/queen/actions.json`. +#[derive(Debug, Clone, Deserialize)] +struct ActionDef { + id: String, + level: u8, + #[allow(dead_code)] + #[serde(default)] + cooldown_sec: u64, +} + +#[derive(Debug, Clone, Deserialize)] +struct ActionsFile { + actions: Vec, +} + +#[derive(Debug, Serialize)] +struct WsRequest { + id: String, + method: String, + params: Value, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", content = "data")] +#[allow(clippy::large_enum_variant)] +enum InboundEvent { + DoctorReport { + order_id: String, + agent_id: String, + status: String, + summary: String, + diagnosis: Value, + ts: i64, + }, + #[serde(other)] + Ignored, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter("trios_queen_loop=info,info") + .init(); + + let cfg = QueenConfig::from_env()?; + info!("trios-queen-loop starting"); + info!(" ws_url = {}", cfg.ws_url); + info!(" operator_token= {}", if cfg.operator_token.is_some() { "" } else { "" }); + info!(" workspace = {}", cfg.workspace_root.display()); + info!(" soul_name = {}", cfg.soul_name); + info!(" tick = {:?}", cfg.tick); + info!(" target_agent = {}", cfg.target_agent); + info!(" φ² + φ⁻² = 3"); + + loop { + if let Err(e) = run_session(&cfg).await { + error!("session failed: {:#}; reconnecting in {:?}", e, cfg.reconnect_delay); + } + tokio::time::sleep(cfg.reconnect_delay).await; + } +} + +async fn run_session(cfg: &QueenConfig) -> Result<()> { + let url = cfg.full_ws_url(); + let (ws_stream, response) = connect_async(&url) + .await + .with_context(|| format!("connecting to {}", url))?; + info!("WS connected: {} (HTTP {})", cfg.ws_url, response.status().as_u16()); + + let (mut sink, mut stream) = ws_stream.split(); + + // Channel for outbound frames (used by the ticker task). + let (out_tx, mut out_rx) = mpsc::channel::(32); + + // Ticker task: every `cfg.tick`, picks an action and sends queen/order. + let cfg_tick = cfg.clone(); + let out_tx_tick = out_tx.clone(); + let _ticker = tokio::spawn(async move { + // One initial send on startup so the first DoctorReport doesn't wait + // a full tick. + if let Err(e) = send_one_order(&cfg_tick, &out_tx_tick).await { + warn!("initial order send failed: {:#}", e); + } + let mut ticker = tokio::time::interval(cfg_tick.tick); + // first tick fires immediately — skip + ticker.tick().await; + loop { + ticker.tick().await; + if let Err(e) = send_one_order(&cfg_tick, &out_tx_tick).await { + warn!("order send failed: {:#}", e); + } + } + }); + + // Pending orders we are still awaiting reports for (bounded ring buffer). + let mut pending: VecDeque = VecDeque::with_capacity(64); + + loop { + tokio::select! { + Some(out_msg) = out_rx.recv() => { + // ticker asked us to send a queen/order — extract order_id for tracking + if let Message::Text(ref txt) = out_msg { + if let Some(order_id) = peek_order_id(txt) { + if pending.len() >= 64 { pending.pop_front(); } + pending.push_back(order_id); + } + } + sink.send(out_msg).await.context("WS send")?; + } + inbound = stream.next() => { + let msg = match inbound { + Some(m) => m.context("WS recv")?, + None => return Ok(()), + }; + match msg { + Message::Text(text) => { + handle_inbound(&text, &mut pending); + } + Message::Close(_) => return Ok(()), + Message::Ping(p) => { sink.send(Message::Pong(p)).await.ok(); } + _ => {} + } + } + } + } +} + +fn peek_order_id(frame: &str) -> Option { + let v: Value = serde_json::from_str(frame).ok()?; + v.get("params") + .and_then(|p| p.get("__order_id")) + .and_then(|x| x.as_str()) + .map(|s| s.to_string()) +} + +fn handle_inbound(text: &str, pending: &mut VecDeque) { + let value: Value = match serde_json::from_str(text) { + Ok(v) => v, + Err(_) => return, + }; + + // RPC reply (e.g. {"result": {"ok": true, "order_id": "..."}}). + if let Some(result) = value.get("result") { + if let Some(oid) = result.get("order_id").and_then(|v| v.as_str()) { + info!("server ack for order_id={}", oid); + } + return; + } + + let event: InboundEvent = match serde_json::from_value(value) { + Ok(e) => e, + Err(_) => return, + }; + if let InboundEvent::DoctorReport { order_id, agent_id, status, summary, diagnosis, ts } = event { + let known = pending.iter().any(|p| *p == order_id); + if known { + pending.retain(|p| *p != order_id); + } + info!( + "DoctorReport · order_id={} · agent={} · status={} · ts={} · known={} · {}", + order_id, agent_id, status, ts, known, summary + ); + // Dump diagnosis at debug level so operators can inspect via env_filter. + tracing::debug!("diagnosis: {}", diagnosis); + } +} + +async fn send_one_order(cfg: &QueenConfig, out_tx: &mpsc::Sender) -> Result<()> { + let policy = read_policy(&cfg.workspace_root).unwrap_or(Policy { + god_mode: false, + max_auto_level: default_max_level(), + }); + let actions = read_actions(&cfg.workspace_root).unwrap_or_default(); + if actions.is_empty() { + warn!("no actions found in .trinity/queen/actions.json — skipping tick"); + return Ok(()); + } + + let cap = cfg.max_auto_level_override.unwrap_or(policy.max_auto_level); + let candidates: Vec<&ActionDef> = actions + .iter() + .filter(|a| a.id.starts_with("doctor ")) + .filter(|a| a.level <= cap) + .collect(); + if candidates.is_empty() { + warn!( + "no doctor-* actions at level <= {} (god_mode={}); skipping", + cap, policy.god_mode + ); + return Ok(()); + } + + // Deterministic, low-noise selection: rotate by epoch_secs / tick. + let pick_idx = (epoch_secs() as usize / cfg.tick.as_secs().max(1) as usize) % candidates.len(); + let picked = candidates[pick_idx]; + + let order_id = Uuid::new_v4().to_string(); + let req = WsRequest { + id: Uuid::new_v4().to_string(), + method: "queen/order".into(), + params: json!({ + "action": picked.id, + "target_agent": cfg.target_agent, + "soul": cfg.soul_name, + "params": {}, + "__order_id": order_id, + }), + }; + info!("queen/order → action={:?} (level {}) order_id={}", picked.id, picked.level, order_id); + let body = serde_json::to_string(&req).map_err(|e| anyhow!("serialize: {}", e))?; + out_tx.send(Message::Text(body)).await.map_err(|e| anyhow!("send: {}", e))?; + Ok(()) +} + +fn read_policy(root: &Path) -> Result { + let p = root.join(".trinity/queen/policy.json"); + let data = std::fs::read(&p).with_context(|| format!("read {}", p.display()))?; + let policy: Policy = serde_json::from_slice(&data)?; + Ok(policy) +} + +fn read_actions(root: &Path) -> Result> { + let p = root.join(".trinity/queen/actions.json"); + let data = std::fs::read(&p).with_context(|| format!("read {}", p.display()))?; + let actions: ActionsFile = serde_json::from_slice(&data)?; + Ok(actions.actions) +} + +fn epoch_secs() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} diff --git a/crates/trios-queen-loop/tests-e2e/smoke_queen_doctor_loop.py b/crates/trios-queen-loop/tests-e2e/smoke_queen_doctor_loop.py new file mode 100644 index 0000000000..34a854c2a8 --- /dev/null +++ b/crates/trios-queen-loop/tests-e2e/smoke_queen_doctor_loop.py @@ -0,0 +1,100 @@ +"""End-to-end Queen↔Doctor autonomous-loop smoke test. + +Verifies the full bus contract on a *running* trios-server: + + Queen --queen/order--> trios-server --BusEvent::QueenOrder--> Doctor + Doctor --doctor/report--> trios-server --BusEvent::DoctorReport--> Queen + +Usage: + # 1. Start the server (default port 9005): + cargo run -p trios-server --bin trios-server + # 2. In another shell: + python3 crates/trios-queen-loop/tests-e2e/smoke_queen_doctor_loop.py + +Exits 0 ("GREEN") when both halves of the loop close end-to-end. + +Constitutional anchors: φ² + φ⁻² = 3 · L21 (append-only) · L24 (canonical bus). +Agent: SCARABS Soul: Scarab Smith +""" +import asyncio, json, sys +import websockets + +WS_URL = "ws://127.0.0.1:9005/ws" + + +async def doctor_subscriber(received_orders): + async with websockets.connect(WS_URL) as ws: + try: + for _ in range(20): + msg = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(msg) + evt = data.get("event") if isinstance(data, dict) else None + if isinstance(evt, dict): + print(f"[doctor] full event: {json.dumps(evt)[:200]}") + if evt.get("type") == "QueenOrder": + payload = evt.get("data", {}) + received_orders.append(payload) + rpc = { + "jsonrpc": "2.0", + "id": "doctor-report-1", + "method": "doctor/report", + "params": { + "order_id": payload["order_id"], + "agent_id": "doctor", + "status": "green", + "summary": "smoke-test ack", + "diagnosis": {"checks": [], "smoke": True}, + }, + } + await ws.send(json.dumps(rpc)) + print(f"[doctor] queen_order={payload['order_id']} → sent doctor/report") + # keep socket open for a bit so report can broadcast + await asyncio.sleep(0.5) + return + except asyncio.TimeoutError: + print("[doctor] TIMEOUT") + + +async def queen_publisher_and_listener(received_reports): + async with websockets.connect(WS_URL) as ws: + await asyncio.sleep(0.3) # let doctor connect + rpc = { + "jsonrpc": "2.0", "id": "queen-order-1", "method": "queen/order", + "params": {"action": "doctor scan", "target_agent": "doctor", + "params": {"reason": "smoke-test"}}, + } + await ws.send(json.dumps(rpc)) + print("[queen] sent queen/order") + try: + for _ in range(20): + msg = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(msg) + evt = data.get("event") if isinstance(data, dict) else None + if isinstance(evt, dict): + print(f"[queen] full event: {json.dumps(evt)[:200]}") + if evt.get("type") == "DoctorReport": + payload = evt.get("data", {}) + received_reports.append(payload) + print(f"[queen] DoctorReport: {payload}") + return + except asyncio.TimeoutError: + print("[queen] TIMEOUT") + + +async def main(): + orders, reports = [], [] + await asyncio.gather( + doctor_subscriber(orders), + queen_publisher_and_listener(reports), + ) + print("---") + print(f"orders received by doctor: {len(orders)}") + print(f"reports received by queen: {len(reports)}") + if orders and reports: + print("RESULT: GREEN — Queen↔Doctor loop closed end-to-end through real WS bus") + return 0 + return 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/crates/trios-server/src/ws_handler.rs b/crates/trios-server/src/ws_handler.rs index 6d2ab2a618..b4c99a2100 100644 --- a/crates/trios-server/src/ws_handler.rs +++ b/crates/trios-server/src/ws_handler.rs @@ -22,6 +22,27 @@ pub enum BusEvent { AgentConnected { agent_id: String }, AgentDisconnected { agent_id: String }, A2AMessage { from: String, to: Option, content: Value }, + /// Queen → Doctor: an order to perform an action. + /// Mirrors `.trinity/queen/actions.json` action ids (e.g. "doctor scan", "doctor heal"). + /// Constitutional anchor: AGENTS.md AGENT T 6-phase cycle, phase ASSIGN. + QueenOrder { + order_id: String, + action: String, + target_agent: String, + params: Value, + ts: i64, + }, + /// Doctor → Queen: report on order completion. + /// Mirrors trios-doctor `WorkspaceCheck`/`CheckStatus` output. + /// Constitutional anchor: AGENTS.md AGENT T 6-phase cycle, phase VERDICT. + DoctorReport { + order_id: String, + agent_id: String, + status: String, + summary: String, + diagnosis: Value, + ts: i64, + }, } #[derive(Clone)] @@ -219,6 +240,16 @@ pub async fn handle_message(text: &str, state: &AppState) -> WsResponse { "a2a/assign_task" => mcp_endpoints::a2a::assign_task(state, req.params).await, "a2a/task_status" => mcp_endpoints::a2a::task_status(state, req.params).await, "a2a/update_task" => mcp_endpoints::a2a::update_task(state, req.params).await, + // ───────────────────────────────────────────────────────────────────── + // Queen ↔ Doctor autonomous loop (issue: bee/queen-doctor-autoloop) + // queen/order : Queen publishes a QueenOrder onto the bus. + // Params: { action, target_agent, params? } + // doctor/report: Doctor publishes a DoctorReport onto the bus. + // Params: { order_id, agent_id, status, summary, diagnosis } + // Both methods are append-only broadcasts (constitutional L21). + // ───────────────────────────────────────────────────────────────────── + "queen/order" => queen_order_publish(state, req.params).await, + "doctor/report" => doctor_report_publish(state, req.params).await, _ => json!({"error": format!("unknown method: {}", req.method)}), }; @@ -230,6 +261,86 @@ async fn tools_list(state: &AppState) -> Value { serde_json::to_value(list.tools).unwrap_or(json!({"error": "serialize failed"})) } +/// Helper: current unix timestamp in seconds. +fn epoch_secs_now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) +} + +/// Publish a `BusEvent::QueenOrder` onto the broadcast bus. +/// Consumed by Doctor loop subscribers via `/ws` socket. +pub async fn queen_order_publish(state: &AppState, params: Option) -> Value { + let p = params.unwrap_or(json!({})); + let action = p.get("action").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if action.is_empty() { + return json!({"error": "missing field: action"}); + } + let target_agent = p + .get("target_agent") + .and_then(|v| v.as_str()) + .unwrap_or("doctor") + .to_string(); + let inner_params = p.get("params").cloned().unwrap_or(json!({})); + let order_id = uuid::Uuid::new_v4().to_string(); + let ts = epoch_secs_now(); + + let event = BusEvent::QueenOrder { + order_id: order_id.clone(), + action: action.clone(), + target_agent: target_agent.clone(), + params: inner_params, + ts, + }; + state.broadcast_event(event); + + json!({ + "ok": true, + "order_id": order_id, + "action": action, + "target_agent": target_agent, + "ts": ts, + }) +} + +/// Publish a `BusEvent::DoctorReport` onto the broadcast bus. +/// Consumed by Queen loop subscribers via `/operator` socket. +pub async fn doctor_report_publish(state: &AppState, params: Option) -> Value { + let p = params.unwrap_or(json!({})); + let order_id = p.get("order_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if order_id.is_empty() { + return json!({"error": "missing field: order_id"}); + } + let agent_id = p + .get("agent_id") + .and_then(|v| v.as_str()) + .unwrap_or("doctor") + .to_string(); + let status = p.get("status").and_then(|v| v.as_str()).unwrap_or("unknown").to_string(); + let summary = p.get("summary").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let diagnosis = p.get("diagnosis").cloned().unwrap_or(json!({})); + let ts = epoch_secs_now(); + + let event = BusEvent::DoctorReport { + order_id: order_id.clone(), + agent_id: agent_id.clone(), + status: status.clone(), + summary, + diagnosis, + ts, + }; + state.broadcast_event(event); + + json!({ + "ok": true, + "order_id": order_id, + "agent_id": agent_id, + "status": status, + "ts": ts, + }) +} + async fn tools_call(state: &AppState, params: Option) -> Value { let params_val = params.unwrap_or(json!({})); let tool_name = params_val.get("name").and_then(|v| v.as_str()).unwrap_or(""); @@ -385,6 +496,91 @@ mod tests { assert_eq!(result["recipients"], 2); } + #[tokio::test] + async fn test_queen_order_publish_broadcasts_event() { + let state = AppState::new(); + let mut rx = state.event_tx.subscribe(); + + let params = json!({ + "action": "doctor scan", + "target_agent": "doctor", + "params": {"reason": "unit-test"} + }); + let result = queen_order_publish(&state, Some(params)).await; + assert_eq!(result["ok"], true); + assert_eq!(result["action"], "doctor scan"); + assert_eq!(result["target_agent"], "doctor"); + let order_id = result["order_id"].as_str().unwrap().to_string(); + assert!(!order_id.is_empty()); + + let received = tokio::time::timeout( + tokio::time::Duration::from_millis(100), + rx.recv(), + ) + .await + .expect("queen order broadcast should arrive within 100ms") + .expect("broadcast channel should not be closed"); + + match received { + BusEvent::QueenOrder { order_id: oid, action, target_agent, .. } => { + assert_eq!(oid, order_id); + assert_eq!(action, "doctor scan"); + assert_eq!(target_agent, "doctor"); + } + other => panic!("expected BusEvent::QueenOrder, got {:?}", other), + } + } + + #[tokio::test] + async fn test_queen_order_publish_rejects_missing_action() { + let state = AppState::new(); + let result = queen_order_publish(&state, Some(json!({}))).await; + assert!(result.get("error").is_some(), "expected error field, got {:?}", result); + } + + #[tokio::test] + async fn test_doctor_report_publish_broadcasts_event() { + let state = AppState::new(); + let mut rx = state.event_tx.subscribe(); + + let params = json!({ + "order_id": "test-order-123", + "agent_id": "doctor", + "status": "green", + "summary": "workspace clean", + "diagnosis": {"checks": []} + }); + let result = doctor_report_publish(&state, Some(params)).await; + assert_eq!(result["ok"], true); + assert_eq!(result["order_id"], "test-order-123"); + assert_eq!(result["status"], "green"); + + let received = tokio::time::timeout( + tokio::time::Duration::from_millis(100), + rx.recv(), + ) + .await + .expect("doctor report broadcast should arrive within 100ms") + .expect("broadcast channel should not be closed"); + + match received { + BusEvent::DoctorReport { order_id, agent_id, status, summary, .. } => { + assert_eq!(order_id, "test-order-123"); + assert_eq!(agent_id, "doctor"); + assert_eq!(status, "green"); + assert_eq!(summary, "workspace clean"); + } + other => panic!("expected BusEvent::DoctorReport, got {:?}", other), + } + } + + #[tokio::test] + async fn test_doctor_report_publish_rejects_missing_order_id() { + let state = AppState::new(); + let result = doctor_report_publish(&state, Some(json!({"status": "green"}))).await; + assert!(result.get("error").is_some(), "expected error field, got {:?}", result); + } + #[tokio::test] async fn test_round_robin_keys() { let state = AppState::new();