From 2f5ed3d0f9becdffa2a838b463c0cd77697abc8f Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 15:02:48 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(supervisor):=20S3=20IN-leg=20driver=20?= =?UTF-8?q?=E2=80=94=20version=20tick=20=E2=86=92=20owner=20forward-arc=20?= =?UTF-8?q?advance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The actor-side half of S3 (E-SUBSTRATE-IS-THE-SCHEDULER), mirroring the S2 atomic MulAdvance pattern in lance-graph-supervisor::kanban_actor: - KanbanMsg::Tick { at, reply } — atomic in-actor realization of the contract's NextPhaseScheduler: a substrate version tick advances the owner along the Rubicon forward arc (phase().next_phases().first()) in ONE serialized message, reading the phase at the instant of mutation. Absorbing column -> None: the no-op tick is suppressed (forward arc is legal by construction, so the infallible advance_phase is used; no error path). This applies the codex #578 atomicity lesson to the IN-leg. - drive_version_tick(actor, at) — thin async wrapper over Tick. - drive_scheduled_tick(scheduler, view, at, exec, actor) — generic consumer that drives the EXISTING VersionScheduler trait ("propose, don't dispose": the scheduler proposes from a view, the owner disposes via Advance, None suppresses). For custom policies that read a richer view than the owner computes internally; documented as advisory (proposal computed outside the owner message, so it may relay a typed Illegal rather than corrupt). +3 tests (12 passed/0 failed under --features supervisor --lib): forward-arc chain then suppress at absorbing; two concurrent ticks serialize along the arc (no stale-phase collision); generic consumer drives NextPhaseScheduler propose->dispose + suppresses an absorbing proposal. clippy + fmt clean; light build (no lance/datafusion, no disk gate). Remaining S3 (lance/disk-gated): wire the LIVE LanceVersionScheduler::drive_at_latest over a real VersionedGraph::versions() to feed `at`. The apply + no-op-suppress loop is done; only the live versions() poll remains. Board hygiene: plan S3 status annotated; AGENT_LOG cont.34 prepended. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01CcpLeEC3XK8Eye53GKBVvi --- .claude/board/AGENT_LOG.md | 3 + .claude/plans/capstone-out-leg-wiring-v1.md | 33 ++- .../src/kanban_actor.rs | 209 +++++++++++++++++- 3 files changed, 241 insertions(+), 4 deletions(-) diff --git a/.claude/board/AGENT_LOG.md b/.claude/board/AGENT_LOG.md index 31669955..0dc2e369 100644 --- a/.claude/board/AGENT_LOG.md +++ b/.claude/board/AGENT_LOG.md @@ -1,3 +1,6 @@ +## 2026-06-21 (cont.³⁴) — S3 IN-leg driver SHIPPED (actor-side) — version tick → owner forward-arc advance, no-op suppressed + +**Main thread (Opus), self-directed ("PR, easy").** Closed the actor-side half of S3 on the same light crate, mirroring the S2 atomic pattern. New in `lance-graph-supervisor::kanban_actor` (feature `supervisor`): (1) `KanbanMsg::Tick { at, reply }` — the **atomic** in-actor realization of the contract's `NextPhaseScheduler`: a substrate version tick advances the owner along the Rubicon forward arc (`phase().next_phases().first()`) in ONE serialized message, reading the phase at the instant of mutation (the codex-#578 atomicity lesson applied to the IN-leg); absorbing column → `None`, **the no-op tick is suppressed** (not an error; forward arc is legal by construction so the infallible `advance_phase` is used). (2) `drive_version_tick(actor, at)` — thin async wrapper. (3) `drive_scheduled_tick(scheduler, view, at, exec, actor)` — generic consumer that drives the EXISTING `VersionScheduler` trait ("propose, don't dispose": scheduler proposes from a view, owner disposes via `Advance`, `None` suppresses), for custom policies (version-delta gating, `Plan`/`Prune`, batching) reading a richer view; documented as advisory (proposal computed outside the owner message → may relay a typed `Illegal` rather than corrupt). **+3 tests (now green):** `version_tick_advances_forward_arc_then_suppresses_at_absorbing` (Planning→CognitiveWork→Evaluation→Commit then suppressed), `concurrent_version_ticks_serialize_along_the_arc` (two ticks chain, no stale-phase collision), `custom_scheduler_proposes_and_owner_disposes` (drives `NextPhaseScheduler` propose→dispose + suppresses an absorbing proposal). `cargo test -p lance-graph-supervisor --features supervisor --lib` = 12 passed/0 failed; clippy clean (no supervisor-crate warnings; pre-existing ontology/callcenter warnings only) + fmt clean; light build, no lance/disk/symbiont gate. **Remaining S3 (lance/disk-gated):** wire the LIVE `LanceVersionScheduler::drive_at_latest` over a real `VersionedGraph::versions()` to feed `at` — the apply + no-op-suppress loop is now done, only the live `versions()` poll remains. OUT-leg actor side now: S4 owner-advance (#576) + delivery edge (#577) + S2 driver (#578) + **S3 driver (this)**. Plan S3 status annotated. Rides a PR on jirak. ## 2026-06-21 (cont.³³) — S2 MUL→phase driver SHIPPED (actor-side) — gate → owner advance **Main thread (Opus), self-directed (da-capo).** S2→S4 composition on the same light crate: `drive_mul_advance(actor, qualia, mantissa)` in `lance-graph-supervisor::kanban_actor` reads the owner's phase (`KanbanMsg::Phase`), runs the contract's `mul::i4_eval::gate_decision_i4` → `KanbanColumn::advance_on_gate` (Flow→forward, Block→Prune-where-legal, Hold→None), and on a non-Hold gate `cast`s `KanbanMsg::Advance` to the owning actor (the owner advances ITSELF — the operator model). `mul_target` is the pure lowering. Integer i4 path — no f64/NaN. **+1 test (5 total green):** `s2_driver_gate_advances_then_holds` (Flow qualia+mantissa>0 → Planning→CognitiveWork; neutral+0 → Hold → no advance, phase stays). clippy + fmt clean; light build, no disk/symbiont gate. This is the actor-side S2 consumer (`mul_phase_step` node wrapper stays the single-node convenience). **Remaining S2:** the per-row `cognitive-shader-driver` owner loop over the `qualia` column (needs `MailboxSoaView::qualia()` + the shader-driver build = disk) — heavier, deferred. **OUT-leg now real+tested on the light crate: S4 actor (#576) + delivery edge (#577) + S2 actor-side driver (this).** Only S3 (lance `LanceVersionScheduler` consumer) + run-NaN need the heavier builds. Plan S2 status annotated. Rides a PR on jirak. diff --git a/.claude/plans/capstone-out-leg-wiring-v1.md b/.claude/plans/capstone-out-leg-wiring-v1.md index 3f943eca..1560459a 100644 --- a/.claude/plans/capstone-out-leg-wiring-v1.md +++ b/.claude/plans/capstone-out-leg-wiring-v1.md @@ -73,9 +73,36 @@ advances exactly the expected rows; integer i4 gate ⇒ no NaN. ## S3 — version→move gets the LIVE subscription (not a synthetic tick) -**Census state:** PARTIAL. `symbiont::kanban_loop` exercises `on_version` from a -synthetic `u32` tick (`self.cycle`); the lowering is proven, the live source is -open. +**Status (2026-06-21): apply+suppress DRIVER shipped** (actor-side path). +`lance-graph-supervisor::kanban_actor` (feature `supervisor`) now carries the +IN-leg consumer primitives: +- `KanbanMsg::Tick { at, reply }` — the **atomic** in-actor realization of + `NextPhaseScheduler`: a version tick advances the owner along the forward arc + (`phase().next_phases().first()`) in ONE serialized message, reading the phase + at the instant of mutation (the codex-#578 atomicity lesson applied to the + IN-leg). Absorbing column → `None`: **the no-op tick is suppressed**, not an + error. +- `drive_version_tick(actor, at)` — thin async wrapper over `Tick`. +- `drive_scheduled_tick(scheduler, view, at, exec, actor)` — generic consumer + that drives the EXISTING `VersionScheduler` trait ("propose, don't dispose": + the scheduler proposes from a view, the owner disposes via `Advance`; `None` + suppresses). For custom policies (version-delta gating, `Plan`/`Prune`, + batching) that read a richer view than the owner computes internally. + +Tests (light, no lance): forward-arc chain Planning→…→Commit then suppressed at +absorbing; two concurrent ticks serialize along the arc (no stale-phase +collision); the generic consumer drives `NextPhaseScheduler` propose→dispose + +suppresses an absorbing proposal. + +**Remaining (lance/disk-gated):** wire the LIVE source — +`lance-graph::graph::scheduler::LanceVersionScheduler::drive_at_latest` over a +real `VersionedGraph::versions()` — to feed `at` into `drive_version_tick` (or a +custom policy into `drive_scheduled_tick`). The apply + no-op-suppress loop is +done; only the live `versions()` poll remains. + +**Census state (orig):** PARTIAL. `symbiont::kanban_loop` exercises `on_version` +from a synthetic `u32` tick (`self.cycle`); the lowering is proven, the live +source is open. **Enabler:** none — and crucially the **live scheduler ALSO already exists**: `lance-graph::graph::scheduler::LanceVersionScheduler` diff --git a/crates/lance-graph-supervisor/src/kanban_actor.rs b/crates/lance-graph-supervisor/src/kanban_actor.rs index 4fd1636f..f681ba1d 100644 --- a/crates/lance-graph-supervisor/src/kanban_actor.rs +++ b/crates/lance-graph-supervisor/src/kanban_actor.rs @@ -26,7 +26,8 @@ use lance_graph_contract::kanban::{KanbanColumn, KanbanMove, RubiconTransitionError}; use lance_graph_contract::mul::i4_eval::gate_decision_i4; -use lance_graph_contract::soa_view::MailboxSoaOwner; +use lance_graph_contract::scheduler::{DatasetVersion, VersionScheduler}; +use lance_graph_contract::soa_view::{MailboxSoaOwner, MailboxSoaView}; use lance_graph_contract::QualiaI4_16D; use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort}; @@ -52,6 +53,21 @@ pub enum KanbanMsg { mantissa: i8, reply: RpcReplyPort, RubiconTransitionError>>, }, + /// **Atomic** S3 IN-leg step: a substrate version tick (`at`) advances the + /// owner along the Rubicon **forward arc** — `phase().next_phases().first()` — + /// in ONE message, reading the owner's phase at the instant of mutation. This + /// is the in-actor realization of [`scheduler::NextPhaseScheduler`]'s policy + /// (`E-SUBSTRATE-IS-THE-SCHEDULER`): a Lance `versions()` event lowers to the + /// next legal move and the owner applies it. Replies `Some(move)` on advance, + /// or `None` when the owner is in an absorbing column (`Commit`/`Prune`) — a + /// **no-op tick is suppressed**, not an error. No error variant: the forward + /// arc is legal by construction. + /// + /// [`scheduler::NextPhaseScheduler`]: lance_graph_contract::scheduler::NextPhaseScheduler + Tick { + at: DatasetVersion, + reply: RpcReplyPort>, + }, } /// A ractor actor whose `State` IS a [`MailboxSoaOwner`] — the SoA mailbox and @@ -119,6 +135,19 @@ where }; let _ = reply.send(result); } + KanbanMsg::Tick { at: _, reply } => { + // Forward-arc advance, atomic against the owner's live phase. The + // first legal successor is empty exactly for absorbing columns + // (`Commit`/`Prune`) → `None` suppresses the no-op tick. The arc + // is legal by construction, so the infallible `advance_phase` is + // correct here (no `try_`/error path). + let from = state.phase(); + let moved = from + .next_phases() + .first() + .map(|&to| state.advance_phase(to)); + let _ = reply.send(moved); + } } Ok(()) } @@ -239,6 +268,71 @@ pub async fn drive_mul_advance( }) } +// ─── S3 IN-leg: substrate version tick → owner forward-arc advance ───────────── + +/// S3 driver: a substrate version tick advances the owner along the Rubicon +/// forward arc, in ONE atomic actor message ([`KanbanMsg::Tick`]). Returns the +/// emitted [`KanbanMove`] on advance, or `None` when the owner is absorbing +/// (`Commit`/`Prune`) — the **no-op tick is suppressed** (D-MBX-9-IN, +/// `E-SUBSTRATE-IS-THE-SCHEDULER`). +/// +/// **Atomicity:** like [`drive_mul_advance`], the next-phase decision and the +/// transition run inside the SAME serialized mailbox message, so concurrent ticks +/// cannot read a stale phase and collide — they chain along the arc instead +/// (codex #578 lesson, applied to the IN-leg). This is the actor-side realization +/// of the contract's [`NextPhaseScheduler`] policy; use [`drive_scheduled_tick`] +/// when a custom [`VersionScheduler`] policy (version-delta gating, `Plan`/`Prune` +/// over the forward arc, batching) reads a richer view. +/// +/// [`NextPhaseScheduler`]: lance_graph_contract::scheduler::NextPhaseScheduler +pub async fn drive_version_tick( + actor: &ActorRef, + at: DatasetVersion, +) -> Result, KanbanRouteError> { + ractor::call!(actor, |reply| KanbanMsg::Tick { at, reply }) + .map_err(|e| KanbanRouteError::Rpc(e.to_string())) +} + +/// S3 driver (custom policy): drive an arbitrary [`VersionScheduler`] for one +/// version tick. The scheduler **proposes** the next move from `view`; if it +/// yields `Some`, the owner **disposes** it via [`KanbanMsg::Advance`]; `None` +/// **suppresses the no-op tick** ("propose, don't dispose" — the scheduler reads, +/// the owner is the sole mutator). +/// +/// Unlike [`drive_version_tick`], the proposal is computed OUTSIDE the owner's +/// message (from the supplied `view`), so it is **advisory**: if the owner's phase +/// changes between the proposal and the `Advance`, the edge may be rejected +/// ([`KanbanRouteError::Illegal`]) rather than silently corrupting — surfaced, not +/// panicked. For the pure forward-arc policy prefer the atomic +/// [`drive_version_tick`]; reach for this only when the policy needs a richer view +/// than the owner computes internally. +pub async fn drive_scheduled_tick( + scheduler: &S, + view: &V, + at: DatasetVersion, + exec: lance_graph_contract::kanban::ExecTarget, + actor: &ActorRef, +) -> Result, KanbanRouteError> +where + S: VersionScheduler, + V: MailboxSoaView, +{ + // Propose: lower the version event to the next legal move (or `None`). + let Some(proposed) = scheduler.on_version(view, at, exec) else { + return Ok(None); // absorbing / policy-filtered → suppress the no-op tick + }; + // Dispose: the owner applies it (checked); relay an illegal edge as typed. + let inner = ractor::call!(actor, |reply| KanbanMsg::Advance { + to: proposed.to, + reply + }) + .map_err(|e| KanbanRouteError::Rpc(e.to_string()))?; + inner.map(Some).map_err(|e| KanbanRouteError::Illegal { + from: e.from, + to: e.to, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -482,4 +576,117 @@ mod tests { actor.stop(None); handle.await.expect("actor join"); } + + #[tokio::test] + async fn version_tick_advances_forward_arc_then_suppresses_at_absorbing() { + // S3 IN-leg: a version tick advances along the forward arc; once the owner + // reaches an absorbing column the tick is a suppressed no-op (`None`). + let (actor, handle) = Actor::spawn( + None, + KanbanActor::::default(), + board(KanbanColumn::Planning), + ) + .await + .expect("spawn"); + + // Planning → CognitiveWork → Evaluation → Commit, one tick per version. + let expected = [ + KanbanColumn::CognitiveWork, + KanbanColumn::Evaluation, + KanbanColumn::Commit, + ]; + for (i, want) in expected.iter().enumerate() { + let mv = drive_version_tick(&actor, DatasetVersion(i as u64 + 1)) + .await + .expect("tick ok") + .expect("non-absorbing advances"); + assert_eq!(mv.to, *want); + } + + // Commit is absorbing: the next tick advances nothing (no-op suppressed). + let noop = drive_version_tick(&actor, DatasetVersion(99)) + .await + .expect("tick ok"); + assert!(noop.is_none(), "absorbing column must suppress the tick"); + let phase = ractor::call!(actor, |reply| KanbanMsg::Phase { reply }).expect("rpc"); + assert_eq!(phase, KanbanColumn::Commit); + + actor.stop(None); + handle.await.expect("actor join"); + } + + #[tokio::test] + async fn concurrent_version_ticks_serialize_along_the_arc() { + // Two concurrent ticks must NOT both read a stale `Planning`; the atomic + // `Tick` serializes decision+advance in the owner's mailbox, so they chain + // Planning → CognitiveWork → Evaluation (both advance, neither is lost). + let (actor, handle) = Actor::spawn( + None, + KanbanActor::::default(), + board(KanbanColumn::Planning), + ) + .await + .expect("spawn"); + + let a1 = actor.clone(); + let a2 = actor.clone(); + let (r1, r2) = tokio::join!( + drive_version_tick(&a1, DatasetVersion(1)), + drive_version_tick(&a2, DatasetVersion(2)), + ); + assert!(r1.expect("tick1 ok").is_some(), "first advanced"); + assert!(r2.expect("tick2 ok").is_some(), "second advanced"); + + let phase = ractor::call!(actor, |reply| KanbanMsg::Phase { reply }).expect("rpc"); + assert_eq!(phase, KanbanColumn::Evaluation); + + actor.stop(None); + handle.await.expect("actor join"); + } + + #[tokio::test] + async fn custom_scheduler_proposes_and_owner_disposes() { + use lance_graph_contract::scheduler::NextPhaseScheduler; + + // The generic consumer drives the EXISTING `VersionScheduler` trait: the + // reference `NextPhaseScheduler` proposes from a view, the owner disposes. + let (actor, handle) = Actor::spawn( + None, + KanbanActor::::default(), + board(KanbanColumn::Planning), + ) + .await + .expect("spawn"); + + // View mirrors the owner's current phase; scheduler proposes CognitiveWork. + let view = board(KanbanColumn::Planning); + let mv = drive_scheduled_tick( + &NextPhaseScheduler, + &view, + DatasetVersion(1), + ExecTarget::Native, + &actor, + ) + .await + .expect("scheduled ok") + .expect("forward arc proposed + disposed"); + assert_eq!(mv.from, KanbanColumn::Planning); + assert_eq!(mv.to, KanbanColumn::CognitiveWork); + + // An absorbing view → scheduler yields `None` → suppressed, no RPC needed. + let absorbing_view = board(KanbanColumn::Commit); + let noop = drive_scheduled_tick( + &NextPhaseScheduler, + &absorbing_view, + DatasetVersion(2), + ExecTarget::Native, + &actor, + ) + .await + .expect("scheduled ok"); + assert!(noop.is_none(), "absorbing proposal is suppressed"); + + actor.stop(None); + handle.await.expect("actor join"); + } } From fd1d58ffc79b99726017e74a9c3ecb65daa7e6d6 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Jun 2026 15:12:15 +0000 Subject: [PATCH 2/2] fix(supervisor): preserve scheduler exec target in drive_scheduled_tick (codex #579) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduler stamps the chosen backend onto proposed.exec, but the consumer sent only proposed.to and returned the owner's move — and owners default to ExecTarget::Native, so a Jit/SurrealQl/Elixir selection was silently reported and routed as Native. Fix: keep the owner's emitted move (authoritative phase transition, witness position, libet anchor from the real mutation) but overlay proposed.exec — the backend routing tag is the policy's decision, which the owner can't make. +1 test (13 total green): scheduled_tick_preserves_non_native_exec_target asserts Jit/SurrealQl/Elixir survive round-trip. clippy + fmt clean. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01CcpLeEC3XK8Eye53GKBVvi --- .../src/kanban_actor.rs | 62 ++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/crates/lance-graph-supervisor/src/kanban_actor.rs b/crates/lance-graph-supervisor/src/kanban_actor.rs index f681ba1d..de315eed 100644 --- a/crates/lance-graph-supervisor/src/kanban_actor.rs +++ b/crates/lance-graph-supervisor/src/kanban_actor.rs @@ -303,9 +303,12 @@ pub async fn drive_version_tick( /// message (from the supplied `view`), so it is **advisory**: if the owner's phase /// changes between the proposal and the `Advance`, the edge may be rejected /// ([`KanbanRouteError::Illegal`]) rather than silently corrupting — surfaced, not -/// panicked. For the pure forward-arc policy prefer the atomic -/// [`drive_version_tick`]; reach for this only when the policy needs a richer view -/// than the owner computes internally. +/// panicked. The returned move is the owner's (authoritative phase transition, +/// witness position, and libet anchor from the REAL mutation) with the +/// **scheduler's `exec` overlaid** — the backend routing tag is the policy's +/// decision, which the owner (defaulting to `Native`) can't make. For the pure +/// forward-arc policy prefer the atomic [`drive_version_tick`]; reach for this +/// only when the policy needs a richer view than the owner computes internally. pub async fn drive_scheduled_tick( scheduler: &S, view: &V, @@ -327,10 +330,21 @@ where reply }) .map_err(|e| KanbanRouteError::Rpc(e.to_string()))?; - inner.map(Some).map_err(|e| KanbanRouteError::Illegal { - from: e.from, - to: e.to, - }) + match inner { + // The owner's emitted move is authoritative for the phase transition, + // witness position, and libet anchor (from the REAL mutation), but it + // defaults to `ExecTarget::Native` and can't know which backend the policy + // chose — overlay the scheduler's selection so a `Jit`/`SurrealQl`/`Elixir` + // target is not silently reported/routed as Native (codex #579 P2). + Ok(mut emitted) => { + emitted.exec = proposed.exec; + Ok(Some(emitted)) + } + Err(e) => Err(KanbanRouteError::Illegal { + from: e.from, + to: e.to, + }), + } } #[cfg(test)] @@ -689,4 +703,38 @@ mod tests { actor.stop(None); handle.await.expect("actor join"); } + + #[tokio::test] + async fn scheduled_tick_preserves_non_native_exec_target() { + use lance_graph_contract::scheduler::NextPhaseScheduler; + + // codex #579 P2: the scheduler selects the backend; the owner defaults to + // `Native`. The returned move must carry the scheduler's exec, NOT be + // flattened to the owner's Native default. + for exec in [ExecTarget::Jit, ExecTarget::SurrealQl, ExecTarget::Elixir] { + // Fresh owner per exec so the phase starts at Planning each iteration. + let (actor, handle) = Actor::spawn( + None, + KanbanActor::::default(), + board(KanbanColumn::Planning), + ) + .await + .expect("spawn"); + + let view = board(KanbanColumn::Planning); + let mv = + drive_scheduled_tick(&NextPhaseScheduler, &view, DatasetVersion(1), exec, &actor) + .await + .expect("scheduled ok") + .expect("forward arc proposed + disposed"); + assert_eq!(mv.to, KanbanColumn::CognitiveWork); + assert_eq!( + mv.exec, exec, + "scheduler's backend must survive, not be overwritten with Native" + ); + + actor.stop(None); + handle.await.expect("actor join"); + } + } }