Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .claude/board/AGENT_LOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2026-06-21 (cont.³⁴) — S3 IN-leg driver SHIPPED (actor-side) — version tick → owner forward-arc advance, no-op suppressed

**Main thread (Opus), self-directed ("PR, easy").** Closed the actor-side half of S3 on the same light crate, mirroring the S2 atomic pattern. New in `lance-graph-supervisor::kanban_actor` (feature `supervisor`): (1) `KanbanMsg::Tick { at, reply }` — the **atomic** in-actor realization of the contract's `NextPhaseScheduler`: a substrate version tick advances the owner along the Rubicon forward arc (`phase().next_phases().first()`) in ONE serialized message, reading the phase at the instant of mutation (the codex-#578 atomicity lesson applied to the IN-leg); absorbing column → `None`, **the no-op tick is suppressed** (not an error; forward arc is legal by construction so the infallible `advance_phase` is used). (2) `drive_version_tick(actor, at)` — thin async wrapper. (3) `drive_scheduled_tick(scheduler, view, at, exec, actor)` — generic consumer that drives the EXISTING `VersionScheduler` trait ("propose, don't dispose": scheduler proposes from a view, owner disposes via `Advance`, `None` suppresses), for custom policies (version-delta gating, `Plan`/`Prune`, batching) reading a richer view; documented as advisory (proposal computed outside the owner message → may relay a typed `Illegal` rather than corrupt). **+3 tests (now green):** `version_tick_advances_forward_arc_then_suppresses_at_absorbing` (Planning→CognitiveWork→Evaluation→Commit then suppressed), `concurrent_version_ticks_serialize_along_the_arc` (two ticks chain, no stale-phase collision), `custom_scheduler_proposes_and_owner_disposes` (drives `NextPhaseScheduler` propose→dispose + suppresses an absorbing proposal). `cargo test -p lance-graph-supervisor --features supervisor --lib` = 12 passed/0 failed; clippy clean (no supervisor-crate warnings; pre-existing ontology/callcenter warnings only) + fmt clean; light build, no lance/disk/symbiont gate. **Remaining S3 (lance/disk-gated):** wire the LIVE `LanceVersionScheduler::drive_at_latest` over a real `VersionedGraph::versions()` to feed `at` — the apply + no-op-suppress loop is now done, only the live `versions()` poll remains. OUT-leg actor side now: S4 owner-advance (#576) + delivery edge (#577) + S2 driver (#578) + **S3 driver (this)**. Plan S3 status annotated. Rides a PR on jirak.
## 2026-06-21 (cont.³³) — S2 MUL→phase driver SHIPPED (actor-side) — gate → owner advance

**Main thread (Opus), self-directed (da-capo).** S2→S4 composition on the same light crate: `drive_mul_advance(actor, qualia, mantissa)` in `lance-graph-supervisor::kanban_actor` reads the owner's phase (`KanbanMsg::Phase`), runs the contract's `mul::i4_eval::gate_decision_i4` → `KanbanColumn::advance_on_gate` (Flow→forward, Block→Prune-where-legal, Hold→None), and on a non-Hold gate `cast`s `KanbanMsg::Advance` to the owning actor (the owner advances ITSELF — the operator model). `mul_target` is the pure lowering. Integer i4 path — no f64/NaN. **+1 test (5 total green):** `s2_driver_gate_advances_then_holds` (Flow qualia+mantissa>0 → Planning→CognitiveWork; neutral+0 → Hold → no advance, phase stays). clippy + fmt clean; light build, no disk/symbiont gate. This is the actor-side S2 consumer (`mul_phase_step` node wrapper stays the single-node convenience). **Remaining S2:** the per-row `cognitive-shader-driver` owner loop over the `qualia` column (needs `MailboxSoaView::qualia()` + the shader-driver build = disk) — heavier, deferred. **OUT-leg now real+tested on the light crate: S4 actor (#576) + delivery edge (#577) + S2 actor-side driver (this).** Only S3 (lance `LanceVersionScheduler` consumer) + run-NaN need the heavier builds. Plan S2 status annotated. Rides a PR on jirak.
Expand Down
33 changes: 30 additions & 3 deletions .claude/plans/capstone-out-leg-wiring-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,36 @@ advances exactly the expected rows; integer i4 gate ⇒ no NaN.

## S3 — version→move gets the LIVE subscription (not a synthetic tick)

**Census state:** PARTIAL. `symbiont::kanban_loop` exercises `on_version` from a
synthetic `u32` tick (`self.cycle`); the lowering is proven, the live source is
open.
**Status (2026-06-21): apply+suppress DRIVER shipped** (actor-side path).
`lance-graph-supervisor::kanban_actor` (feature `supervisor`) now carries the
IN-leg consumer primitives:
- `KanbanMsg::Tick { at, reply }` — the **atomic** in-actor realization of
`NextPhaseScheduler`: a version tick advances the owner along the forward arc
(`phase().next_phases().first()`) in ONE serialized message, reading the phase
at the instant of mutation (the codex-#578 atomicity lesson applied to the
IN-leg). Absorbing column → `None`: **the no-op tick is suppressed**, not an
error.
- `drive_version_tick(actor, at)` — thin async wrapper over `Tick`.
- `drive_scheduled_tick(scheduler, view, at, exec, actor)` — generic consumer
that drives the EXISTING `VersionScheduler` trait ("propose, don't dispose":
the scheduler proposes from a view, the owner disposes via `Advance`; `None`
suppresses). For custom policies (version-delta gating, `Plan`/`Prune`,
batching) that read a richer view than the owner computes internally.

Tests (light, no lance): forward-arc chain Planning→…→Commit then suppressed at
absorbing; two concurrent ticks serialize along the arc (no stale-phase
collision); the generic consumer drives `NextPhaseScheduler` propose→dispose +
suppresses an absorbing proposal.

**Remaining (lance/disk-gated):** wire the LIVE source —
`lance-graph::graph::scheduler::LanceVersionScheduler::drive_at_latest` over a
real `VersionedGraph::versions()` — to feed `at` into `drive_version_tick` (or a
custom policy into `drive_scheduled_tick`). The apply + no-op-suppress loop is
done; only the live `versions()` poll remains.

**Census state (orig):** PARTIAL. `symbiont::kanban_loop` exercises `on_version`
from a synthetic `u32` tick (`self.cycle`); the lowering is proven, the live
source is open.

**Enabler:** none — and crucially the **live scheduler ALSO already exists**:
`lance-graph::graph::scheduler::LanceVersionScheduler<S = NextPhaseScheduler>`
Expand Down
257 changes: 256 additions & 1 deletion crates/lance-graph-supervisor/src/kanban_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

use lance_graph_contract::kanban::{KanbanColumn, KanbanMove, RubiconTransitionError};
use lance_graph_contract::mul::i4_eval::gate_decision_i4;
use lance_graph_contract::soa_view::MailboxSoaOwner;
use lance_graph_contract::scheduler::{DatasetVersion, VersionScheduler};
use lance_graph_contract::soa_view::{MailboxSoaOwner, MailboxSoaView};
use lance_graph_contract::QualiaI4_16D;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};

Expand All @@ -52,6 +53,21 @@ pub enum KanbanMsg {
mantissa: i8,
reply: RpcReplyPort<Result<Option<KanbanMove>, RubiconTransitionError>>,
},
/// **Atomic** S3 IN-leg step: a substrate version tick (`at`) advances the
/// owner along the Rubicon **forward arc** — `phase().next_phases().first()` —
/// in ONE message, reading the owner's phase at the instant of mutation. This
/// is the in-actor realization of [`scheduler::NextPhaseScheduler`]'s policy
/// (`E-SUBSTRATE-IS-THE-SCHEDULER`): a Lance `versions()` event lowers to the
/// next legal move and the owner applies it. Replies `Some(move)` on advance,
/// or `None` when the owner is in an absorbing column (`Commit`/`Prune`) — a
/// **no-op tick is suppressed**, not an error. No error variant: the forward
/// arc is legal by construction.
///
/// [`scheduler::NextPhaseScheduler`]: lance_graph_contract::scheduler::NextPhaseScheduler
Tick {
at: DatasetVersion,
reply: RpcReplyPort<Option<KanbanMove>>,
},
}

/// A ractor actor whose `State` IS a [`MailboxSoaOwner`] — the SoA mailbox and
Expand Down Expand Up @@ -119,6 +135,19 @@ where
};
let _ = reply.send(result);
}
KanbanMsg::Tick { at: _, reply } => {
// Forward-arc advance, atomic against the owner's live phase. The
// first legal successor is empty exactly for absorbing columns
// (`Commit`/`Prune`) → `None` suppresses the no-op tick. The arc
// is legal by construction, so the infallible `advance_phase` is
// correct here (no `try_`/error path).
let from = state.phase();
let moved = from
.next_phases()
.first()
.map(|&to| state.advance_phase(to));
let _ = reply.send(moved);
}
}
Ok(())
}
Expand Down Expand Up @@ -239,6 +268,85 @@ pub async fn drive_mul_advance(
})
}

// ─── S3 IN-leg: substrate version tick → owner forward-arc advance ─────────────

/// S3 driver: a substrate version tick advances the owner along the Rubicon
/// forward arc, in ONE atomic actor message ([`KanbanMsg::Tick`]). Returns the
/// emitted [`KanbanMove`] on advance, or `None` when the owner is absorbing
/// (`Commit`/`Prune`) — the **no-op tick is suppressed** (D-MBX-9-IN,
/// `E-SUBSTRATE-IS-THE-SCHEDULER`).
///
/// **Atomicity:** like [`drive_mul_advance`], the next-phase decision and the
/// transition run inside the SAME serialized mailbox message, so concurrent ticks
/// cannot read a stale phase and collide — they chain along the arc instead
/// (codex #578 lesson, applied to the IN-leg). This is the actor-side realization
/// of the contract's [`NextPhaseScheduler`] policy; use [`drive_scheduled_tick`]
/// when a custom [`VersionScheduler`] policy (version-delta gating, `Plan`/`Prune`
/// over the forward arc, batching) reads a richer view.
///
/// [`NextPhaseScheduler`]: lance_graph_contract::scheduler::NextPhaseScheduler
pub async fn drive_version_tick(
actor: &ActorRef<KanbanMsg>,
at: DatasetVersion,
) -> Result<Option<KanbanMove>, KanbanRouteError> {
ractor::call!(actor, |reply| KanbanMsg::Tick { at, reply })
.map_err(|e| KanbanRouteError::Rpc(e.to_string()))
}

/// S3 driver (custom policy): drive an arbitrary [`VersionScheduler`] for one
/// version tick. The scheduler **proposes** the next move from `view`; if it
/// yields `Some`, the owner **disposes** it via [`KanbanMsg::Advance`]; `None`
/// **suppresses the no-op tick** ("propose, don't dispose" — the scheduler reads,
/// the owner is the sole mutator).
///
/// Unlike [`drive_version_tick`], the proposal is computed OUTSIDE the owner's
/// message (from the supplied `view`), so it is **advisory**: if the owner's phase
/// changes between the proposal and the `Advance`, the edge may be rejected
/// ([`KanbanRouteError::Illegal`]) rather than silently corrupting — surfaced, not
/// panicked. The returned move is the owner's (authoritative phase transition,
/// witness position, and libet anchor from the REAL mutation) with the
/// **scheduler's `exec` overlaid** — the backend routing tag is the policy's
/// decision, which the owner (defaulting to `Native`) can't make. For the pure
/// forward-arc policy prefer the atomic [`drive_version_tick`]; reach for this
/// only when the policy needs a richer view than the owner computes internally.
pub async fn drive_scheduled_tick<S, V>(
scheduler: &S,
view: &V,
at: DatasetVersion,
exec: lance_graph_contract::kanban::ExecTarget,
actor: &ActorRef<KanbanMsg>,
) -> Result<Option<KanbanMove>, KanbanRouteError>
where
S: VersionScheduler,
V: MailboxSoaView,
{
// Propose: lower the version event to the next legal move (or `None`).
let Some(proposed) = scheduler.on_version(view, at, exec) else {
return Ok(None); // absorbing / policy-filtered → suppress the no-op tick
};
// Dispose: the owner applies it (checked); relay an illegal edge as typed.
let inner = ractor::call!(actor, |reply| KanbanMsg::Advance {
to: proposed.to,
reply
Comment on lines +328 to +330

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve non-Native exec targets in scheduled moves

When callers use drive_scheduled_tick with ExecTarget::Jit, SurrealQl, or Elixir, scheduler.on_version(...) stamps that backend onto the proposed KanbanMove, but this RPC sends only proposed.to and returns the owner-generated move instead. The current owner implementations hard-code ExecTarget::Native, so the scheduled move is reported/routed as Native even though the scheduler selected a different backend.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in fd1d58ff. drive_scheduled_tick now keeps the owner's emitted move (authoritative phase transition, witness position, libet anchor — from the real mutation) but overlays proposed.exec, so a Jit/SurrealQl/Elixir selection survives instead of being flattened to the owner's Native default. Regression test scheduled_tick_preserves_non_native_exec_target asserts all three non-Native targets round-trip (13 tests green, clippy + fmt clean).

(Note: the atomic drive_version_tick/Tick path is unaffected — it has no scheduler choosing a backend, so the owner's Native is correct there.)


Generated by Claude Code

})
.map_err(|e| KanbanRouteError::Rpc(e.to_string()))?;
match inner {
// The owner's emitted move is authoritative for the phase transition,
// witness position, and libet anchor (from the REAL mutation), but it
// defaults to `ExecTarget::Native` and can't know which backend the policy
// chose — overlay the scheduler's selection so a `Jit`/`SurrealQl`/`Elixir`
// target is not silently reported/routed as Native (codex #579 P2).
Ok(mut emitted) => {
emitted.exec = proposed.exec;
Ok(Some(emitted))
}
Err(e) => Err(KanbanRouteError::Illegal {
from: e.from,
to: e.to,
}),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -482,4 +590,151 @@ mod tests {
actor.stop(None);
handle.await.expect("actor join");
}

#[tokio::test]
async fn version_tick_advances_forward_arc_then_suppresses_at_absorbing() {
// S3 IN-leg: a version tick advances along the forward arc; once the owner
// reaches an absorbing column the tick is a suppressed no-op (`None`).
let (actor, handle) = Actor::spawn(
None,
KanbanActor::<TestBoard>::default(),
board(KanbanColumn::Planning),
)
.await
.expect("spawn");

// Planning → CognitiveWork → Evaluation → Commit, one tick per version.
let expected = [
KanbanColumn::CognitiveWork,
KanbanColumn::Evaluation,
KanbanColumn::Commit,
];
for (i, want) in expected.iter().enumerate() {
let mv = drive_version_tick(&actor, DatasetVersion(i as u64 + 1))
.await
.expect("tick ok")
.expect("non-absorbing advances");
assert_eq!(mv.to, *want);
}

// Commit is absorbing: the next tick advances nothing (no-op suppressed).
let noop = drive_version_tick(&actor, DatasetVersion(99))
.await
.expect("tick ok");
assert!(noop.is_none(), "absorbing column must suppress the tick");
let phase = ractor::call!(actor, |reply| KanbanMsg::Phase { reply }).expect("rpc");
assert_eq!(phase, KanbanColumn::Commit);

actor.stop(None);
handle.await.expect("actor join");
}

#[tokio::test]
async fn concurrent_version_ticks_serialize_along_the_arc() {
// Two concurrent ticks must NOT both read a stale `Planning`; the atomic
// `Tick` serializes decision+advance in the owner's mailbox, so they chain
// Planning → CognitiveWork → Evaluation (both advance, neither is lost).
let (actor, handle) = Actor::spawn(
None,
KanbanActor::<TestBoard>::default(),
board(KanbanColumn::Planning),
)
.await
.expect("spawn");

let a1 = actor.clone();
let a2 = actor.clone();
let (r1, r2) = tokio::join!(
drive_version_tick(&a1, DatasetVersion(1)),
drive_version_tick(&a2, DatasetVersion(2)),
);
assert!(r1.expect("tick1 ok").is_some(), "first advanced");
assert!(r2.expect("tick2 ok").is_some(), "second advanced");

let phase = ractor::call!(actor, |reply| KanbanMsg::Phase { reply }).expect("rpc");
assert_eq!(phase, KanbanColumn::Evaluation);

actor.stop(None);
handle.await.expect("actor join");
}

#[tokio::test]
async fn custom_scheduler_proposes_and_owner_disposes() {
use lance_graph_contract::scheduler::NextPhaseScheduler;

// The generic consumer drives the EXISTING `VersionScheduler` trait: the
// reference `NextPhaseScheduler` proposes from a view, the owner disposes.
let (actor, handle) = Actor::spawn(
None,
KanbanActor::<TestBoard>::default(),
board(KanbanColumn::Planning),
)
.await
.expect("spawn");

// View mirrors the owner's current phase; scheduler proposes CognitiveWork.
let view = board(KanbanColumn::Planning);
let mv = drive_scheduled_tick(
&NextPhaseScheduler,
&view,
DatasetVersion(1),
ExecTarget::Native,
&actor,
)
.await
.expect("scheduled ok")
.expect("forward arc proposed + disposed");
assert_eq!(mv.from, KanbanColumn::Planning);
assert_eq!(mv.to, KanbanColumn::CognitiveWork);

// An absorbing view → scheduler yields `None` → suppressed, no RPC needed.
let absorbing_view = board(KanbanColumn::Commit);
let noop = drive_scheduled_tick(
&NextPhaseScheduler,
&absorbing_view,
DatasetVersion(2),
ExecTarget::Native,
&actor,
)
.await
.expect("scheduled ok");
assert!(noop.is_none(), "absorbing proposal is suppressed");

actor.stop(None);
handle.await.expect("actor join");
}

#[tokio::test]
async fn scheduled_tick_preserves_non_native_exec_target() {
use lance_graph_contract::scheduler::NextPhaseScheduler;

// codex #579 P2: the scheduler selects the backend; the owner defaults to
// `Native`. The returned move must carry the scheduler's exec, NOT be
// flattened to the owner's Native default.
for exec in [ExecTarget::Jit, ExecTarget::SurrealQl, ExecTarget::Elixir] {
// Fresh owner per exec so the phase starts at Planning each iteration.
let (actor, handle) = Actor::spawn(
None,
KanbanActor::<TestBoard>::default(),
board(KanbanColumn::Planning),
)
.await
.expect("spawn");

let view = board(KanbanColumn::Planning);
let mv =
drive_scheduled_tick(&NextPhaseScheduler, &view, DatasetVersion(1), exec, &actor)
.await
.expect("scheduled ok")
.expect("forward arc proposed + disposed");
assert_eq!(mv.to, KanbanColumn::CognitiveWork);
assert_eq!(
mv.exec, exec,
"scheduler's backend must survive, not be overwritten with Native"
);

actor.stop(None);
handle.await.expect("actor join");
}
}
}
Loading