From eeefd6989f7daf63f237df14ad3bb63b80ba16c2 Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Sun, 21 Jun 2026 22:52:29 +0300 Subject: [PATCH 1/4] feat(decision): add the three RunSupervision advise-rung choices Slice A of the observation-signal advise rung. Adds SupervisionQuieted (run-age liveness backstop), SupervisionStalled (Rule R rate-dropout), and SupervisionBreached (Rule Q quality-below-limit) to the RunSupervisionChoice Literal + RUN_SUPERVISION_CHOICES frozenset (7 -> 10), with the vocab test updated to the 10-value set + a work-noun guard on the new dispositions. WHY: promoting the shipped shadow observation-signal + run-liveness rules one rung (observe -> advise) means the supervisor records one Decision per breach edge for a human; that Decision's choice must exist in the closed set first. Decision-only dispositions (never a command). SupervisionBreached is the naming-r3 rename of the originally-proposed SupervisionDoubted: "Doubted" read as the supervisor's epistemic state; "Breached" names the objective limit-crossing, family-uniform with Deferred / Conflicted / Stalled. This slice adds vocabulary only; the supervisor emission lands next. Co-Authored-By: Claude Opus 4.8 --- .../decision/aggregates/decision/state.py | 22 ++++++++++++++++++- .../decision/test_run_supervision_vocab.py | 17 ++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/apps/api/src/cora/decision/aggregates/decision/state.py b/apps/api/src/cora/decision/aggregates/decision/state.py index 995ceb98fb..bc41a2f589 100644 --- a/apps/api/src/cora/decision/aggregates/decision/state.py +++ b/apps/api/src/cora/decision/aggregates/decision/state.py @@ -299,7 +299,10 @@ # Closed `choice` value set for `context = "RunSupervision"` Decisions. # Projection-validated, not domain-enforced (the open-string -# `DecisionContext` + `DecisionChoice` shape is preserved). Seven values: +# `DecisionContext` + `DecisionChoice` shape is preserved). Ten values: +# five beam-Hold/Resume + two audit-fallback + three advise-rung +# observe->advise dispositions (Quieted / Stalled / Breached), the last +# three Decision-only (one per breach edge, never a command). # # - `Continue` -- no wind-down trigger met; no command # issued (the NoAction-bias default). @@ -320,6 +323,17 @@ # - `SupervisionConflicted` -- audit-only: lost the per-Run lease race # to another supervisor evaluator (parallel # to DebriefConflicted / CautionDraftConflicted). +# - `SupervisionQuieted` -- advise-rung: the run-age run-liveness +# backstop fired (a Run has been Running +# implausibly long). Decision-only, no command. +# - `SupervisionStalled` -- advise-rung: a live observation channel's +# arrivals stopped (Rule R rate-dropout) while +# beam up + feeder alive. Decision-only. +# - `SupervisionBreached` -- advise-rung: a quality channel's latest +# value crossed below the operator-set limit +# (Rule Q). Decision-only. (Named by naming-r3: +# the limit was breached, an objective edge, +# not the supervisor's epistemic state.) RunSupervisionChoice = Literal[ "Continue", "Hold", @@ -328,6 +342,9 @@ "Abort", "SupervisionDeferred", "SupervisionConflicted", + "SupervisionQuieted", + "SupervisionStalled", + "SupervisionBreached", ] RUN_SUPERVISION_CHOICES: Final = frozenset( { @@ -338,6 +355,9 @@ "Abort", "SupervisionDeferred", "SupervisionConflicted", + "SupervisionQuieted", + "SupervisionStalled", + "SupervisionBreached", } ) diff --git a/apps/api/tests/unit/decision/test_run_supervision_vocab.py b/apps/api/tests/unit/decision/test_run_supervision_vocab.py index e9acd99b33..0f3bda17d6 100644 --- a/apps/api/tests/unit/decision/test_run_supervision_vocab.py +++ b/apps/api/tests/unit/decision/test_run_supervision_vocab.py @@ -36,6 +36,9 @@ def test_run_supervision_choices_closed_set() -> None: "Abort", "SupervisionDeferred", "SupervisionConflicted", + "SupervisionQuieted", + "SupervisionStalled", + "SupervisionBreached", } ) == RUN_SUPERVISION_CHOICES @@ -57,3 +60,17 @@ def test_audit_fallback_choices_are_work_noun_qualified() -> None: assert "Conflicted" not in RUN_SUPERVISION_CHOICES assert "SupervisionDeferred" in RUN_SUPERVISION_CHOICES assert "SupervisionConflicted" in RUN_SUPERVISION_CHOICES + + +@pytest.mark.unit +def test_advise_rung_choices_are_work_noun_qualified() -> None: + """The three advise-rung dispositions carry the Supervision work-noun + (no bare Quieted / Stalled / Breached) for the same namespace-collision + reason, and read as objective edges (naming-r3: Breached, not Doubted).""" + assert "Quieted" not in RUN_SUPERVISION_CHOICES + assert "Stalled" not in RUN_SUPERVISION_CHOICES + assert "Breached" not in RUN_SUPERVISION_CHOICES + assert "Doubted" not in RUN_SUPERVISION_CHOICES # renamed away by naming-r3 + assert {"SupervisionQuieted", "SupervisionStalled", "SupervisionBreached"} <= ( + RUN_SUPERVISION_CHOICES + ) From cff45a5da433449a59c8483fead31f6406f6b6ac Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Sun, 21 Jun 2026 23:11:01 +0300 Subject: [PATCH 2/4] feat(api): promote the RunSupervisor shadow rules to the advise rung Slice B of the observation-signal advise rung. Adds run_supervisor_advise_enabled (default off, a further opt-in above each rule's own enable) and, when on, emits exactly one Decision per breach EDGE from the three shadow rules -- still issuing NO command (advise rung): - run-liveness backstop -> SupervisionQuieted - Rule R rate-dropout -> SupervisionStalled - Rule Q quality breach -> SupervisionBreached WHY: the shadow rules (#288 / #273) log would_flag but leave no durable record a human can triage. The advise rung climbs exactly one step (observe -> advise), recording one RunSupervision Decision per breach episode for a human while keeping the act rung (auto-Hold) deferred. Emission is edge-triggered off the already-walled per-rule memory (one Decision per episode; nothing on a standing breach across ticks), beam-free (the liveness rule runs before the beam read), and reuses the existing DecisionRegistered shape under the RunSupervisor identity + Authorize path. Shadow logging is unchanged; advise only adds the Decision. cannot-tell still defers (no Decision). Tests cover advise-off (no Decision), each disposition under advise-on (one Decision, no command), and edge-triggering (one Decision across two ticks of a standing breach). Co-Authored-By: Claude Opus 4.8 --- apps/api/src/cora/api/_run_supervisor.py | 141 +++++++++++++-- apps/api/src/cora/infrastructure/config.py | 10 ++ .../test_2bm_run_supervisor_auto_resume.py | 4 + .../api/tests/unit/api/test_run_supervisor.py | 165 ++++++++++++++++++ 4 files changed, 310 insertions(+), 10 deletions(-) diff --git a/apps/api/src/cora/api/_run_supervisor.py b/apps/api/src/cora/api/_run_supervisor.py index 417c80bc0d..cef6ac3dfa 100644 --- a/apps/api/src/cora/api/_run_supervisor.py +++ b/apps/api/src/cora/api/_run_supervisor.py @@ -432,6 +432,65 @@ async def _record_decision( _log.info("run_supervisor.decision_already_written", choice=choice) +async def _record_supervision_advice( + deps: Kernel, + *, + run_id: UUID, + choice: str, + inputs: dict[str, str], + reasoning: str, +) -> None: + """Append one advise-rung DecisionRegistered for a shadow-rule breach edge. + + The advise rung: when `run_supervisor_advise_enabled` is on, the run-liveness + / quality / stall rules emit ONE Decision(context=RunSupervision, + choice=SupervisionQuieted/Breached/Stalled) per breach edge for a human, and + still issue NO command (Decision-only). Beam-free (unlike `_record_decision`): + the liveness rule runs before the beam read, and the quality/stall evidence is + the rule's own inputs. Edge-triggered by the caller's per-rule memory, so a + fresh id per episode is fine; a ConcurrencyError on cross-restart + re-derivation is treated as success (same posture as `_record_decision`). + """ + now = deps.clock.now() + decision_id = deps.id_generator.new_id() + decision_inputs = {"run_id": str(run_id), **inputs} + domain_event = DecisionRegistered( + decision_id=decision_id, + decided_by=ActorId(RUN_SUPERVISOR_AGENT_ID), + context=DecisionContext(DECISION_CONTEXT_RUN_SUPERVISION).value, + choice=DecisionChoice(choice).value, + parent_id=None, + override_kind=None, + rule=DecisionRule(_RULE).value, + reasoning=validate_reasoning(reasoning), + confidence=validate_confidence(None), + confidence_source=DecisionConfidenceSource.SELF_REPORTED, + alternatives=(), + inputs=validate_inputs(decision_inputs), + reasoning_signature=None, + occurred_at=now, + ) + new_event = to_new_event( + event_type=event_type_name(domain_event), + payload=to_payload(domain_event), + occurred_at=now, + event_id=uuid5(decision_id, "event:0"), + command_name=_COMMAND_NAME, + correlation_id=deps.id_generator.new_id(), + causation_id=None, + principal_id=RUN_SUPERVISOR_AGENT_ID, + ) + try: + await deps.event_store.append( + stream_type=_STREAM_TYPE, + stream_id=decision_id, + expected_version=0, + events=[new_event], + ) + except ConcurrencyError: + _log.info("run_supervisor.decision_already_written", choice=choice) + + async def _issue_hold( deps: Kernel, hold_run: HoldRunHandler, @@ -594,15 +653,17 @@ async def _observe_run_signals( stall: set[UUID], stall_streak: dict[UUID, int], feed_dead_warned: set[UUID], + advise_enabled: bool, ) -> None: - """Shadow observation rules (Rule Q + Rule R): OBSERVE-ONLY. - - Logs `run_quality.would_flag` / `run_stall.would_flag`, records NO Decision - and issues NO command (byte-identical posture to the run-liveness shadow). - Each rule keeps its OWN edge-trigger state (`quality`, `stall` + the - `stall_streak` hysteresis counter), walled off from the beam-Hold FSM - memory, the liveness set, and each other. Reuses the tick's single beam - read for Rule R's beam-awareness. + """Observation rules (Rule Q + Rule R): shadow log + optional advise. + + Always logs `run_quality.would_flag` / `run_stall.would_flag` and issues NO + command. When `advise_enabled`, ALSO records one Decision per breach EDGE + (SupervisionBreached for Rule Q, SupervisionStalled for Rule R); still no + command (advise rung). Each rule keeps its OWN edge-trigger state (`quality`, + `stall` + the `stall_streak` hysteresis counter), walled off from the + beam-Hold FSM memory, the liveness set, and each other. Reuses the tick's + single beam read for Rule R's beam-awareness. """ now = deps.clock.now() beam_open = beam.quality_ok and beam.fes_open and beam.sbs_open and beam.fes_permit @@ -630,6 +691,25 @@ async def _observe_run_signals( snr_limit=item.snr_limit, is_simulated=latest.is_simulated if latest is not None else None, ) + if advise_enabled: + await _record_supervision_advice( + deps, + run_id=run_id, + choice="SupervisionBreached", + inputs={ + "channel": quality_channel, + "value": str(latest.value) if latest is not None else "None", + "snr_limit": str(item.snr_limit), + "is_simulated": str(latest.is_simulated) + if latest is not None + else "None", + }, + reasoning=( + "A quality channel's latest value crossed below the " + "operator-set limit; flagged for a human to review data " + "quality. No command issued (advise rung)." + ), + ) elif not disp.would_flag: quality.discard(run_id) @@ -685,6 +765,24 @@ async def _observe_run_signals( streak=streak, is_simulated=signal.is_simulated_window, ) + if advise_enabled: + await _record_supervision_advice( + deps, + run_id=run_id, + choice="SupervisionStalled", + inputs={ + "channel": stall_channel, + "window_seconds": str(window_seconds), + "expected_interval": str(interval), + "is_simulated": str(signal.is_simulated_window), + }, + reasoning=( + "A live observation channel stopped arriving (no values " + "for longer than the expected interval) while the beam is " + "up and the feeder is alive; flagged as a possible stall. " + "No command issued (advise rung)." + ), + ) else: stall_streak.pop(run_id, None) stall.discard(run_id) @@ -709,9 +807,10 @@ async def _supervise_tick( resume_enabled: bool, resume_settle_ticks: int, liveness_ceiling_seconds: float | None, + advise_enabled: bool, ) -> None: """One supervision pass over all in-flight Runs (hold + gated resume + - shadow liveness).""" + shadow liveness + optional advise).""" actor = await load_actor(deps.event_store, RUN_SUPERVISOR_AGENT_ID) if actor is None or not actor.active: # Supervisor not seeded yet, or deactivated by an operator: stand down. @@ -758,12 +857,28 @@ async def _supervise_tick( ): if item.run_id not in liveness: liveness.add(item.run_id) + running_seconds = int((now - running_since).total_seconds()) _log.info( "run_liveness.would_flag", run_id=str(item.run_id), - running_seconds=int((now - running_since).total_seconds()), + running_seconds=running_seconds, ceiling_seconds=liveness_ceiling_seconds, ) + if advise_enabled: + await _record_supervision_advice( + deps, + run_id=item.run_id, + choice="SupervisionQuieted", + inputs={ + "running_seconds": str(running_seconds), + "ceiling_seconds": str(liveness_ceiling_seconds), + }, + reasoning=( + "The Run has been Running far past the operator run-age " + "ceiling without progressing; flagged for a human to check " + "whether it is hung. No command issued (advise rung)." + ), + ) else: liveness.discard(item.run_id) @@ -805,6 +920,7 @@ async def _supervise_tick( stall=stall, stall_streak=stall_streak, feed_dead_warned=feed_dead_warned, + advise_enabled=advise_enabled, ) # Gated resume pass (Held Runs the supervisor holds). @@ -858,6 +974,7 @@ async def _supervise_loop( resume_enabled: bool, resume_settle_ticks: int, liveness_ceiling_seconds: float | None, + advise_enabled: bool, ) -> None: """Periodic supervision loop. A failed tick is logged; the next tick retries.""" memory: dict[UUID, str] = {} @@ -887,6 +1004,7 @@ async def _supervise_loop( resume_enabled=resume_enabled, resume_settle_ticks=resume_settle_ticks, liveness_ceiling_seconds=liveness_ceiling_seconds, + advise_enabled=advise_enabled, ) except asyncio.CancelledError: raise @@ -940,6 +1058,7 @@ async def run_supervisor_lifespan( resume_enabled = deps.settings.run_supervisor_resume_enabled resume_settle_ticks = deps.settings.run_supervisor_resume_settle_ticks liveness_ceiling_seconds = deps.settings.run_liveness_ceiling_seconds + advise_enabled = deps.settings.run_supervisor_advise_enabled rules_config = ObservationRuleConfig( quality_channel_name=deps.settings.run_quality_channel_name, stall_channel_name=deps.settings.run_stall_channel_name, @@ -954,6 +1073,7 @@ async def run_supervisor_lifespan( liveness_ceiling_seconds=liveness_ceiling_seconds, quality_channel=rules_config.quality_channel_name, stall_channel=rules_config.stall_channel_name, + advise_enabled=advise_enabled, ) task = asyncio.create_task( _supervise_loop( @@ -968,6 +1088,7 @@ async def run_supervisor_lifespan( resume_enabled, resume_settle_ticks, liveness_ceiling_seconds, + advise_enabled, ), name="run-supervisor", ) diff --git a/apps/api/src/cora/infrastructure/config.py b/apps/api/src/cora/infrastructure/config.py index ef92a1b15b..44389909a5 100644 --- a/apps/api/src/cora/infrastructure/config.py +++ b/apps/api/src/cora/infrastructure/config.py @@ -209,6 +209,16 @@ class Settings(BaseSettings): run_supervisor_resume_enabled: bool = False run_supervisor_resume_settle_ticks: int = 2 + # `run_supervisor_advise_enabled` promotes the supervisor's shadow rules + # (run-liveness, signal-quality, signal-stall) one rung from observe to + # advise: on each breach EDGE the supervisor records ONE + # Decision(context=RunSupervision, choice=SupervisionQuieted/Stalled/ + # Breached) for a human, still issuing NO command. Default off, a further + # opt-in above each rule's own enable (a rule with no channel / ceiling + # configured stays silent regardless). Shadow logging continues unchanged; + # advise only adds the edge-triggered Decision. + run_supervisor_advise_enabled: bool = False + # `run_liveness_ceiling_seconds` gates the run-liveness shadow rule # inside the RunSupervisor loop: a Run that has been Running longer than this # (now - running_since) is flagged as possibly-hung. Default None = OFF (a diff --git a/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py b/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py index 56b18f9116..ad744fb3a8 100644 --- a/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py +++ b/apps/api/tests/integration/scenarios/test_2bm_run_supervisor_auto_resume.py @@ -389,6 +389,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool) stall_streak={}, feed_dead_warned=set(), liveness_ceiling_seconds=None, + advise_enabled=False, resume_enabled=True, resume_settle_ticks=1, ) @@ -413,6 +414,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool) stall_streak={}, feed_dead_warned=set(), liveness_ceiling_seconds=None, + advise_enabled=False, resume_enabled=True, resume_settle_ticks=1, ) @@ -465,6 +467,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo stall_streak={}, feed_dead_warned=set(), liveness_ceiling_seconds=None, + advise_enabled=False, resume_enabled=True, resume_settle_ticks=1, ) @@ -496,6 +499,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo stall_streak={}, feed_dead_warned=set(), liveness_ceiling_seconds=None, + advise_enabled=False, resume_enabled=True, resume_settle_ticks=1, ) diff --git a/apps/api/tests/unit/api/test_run_supervisor.py b/apps/api/tests/unit/api/test_run_supervisor.py index c6b6f99012..abd587223b 100644 --- a/apps/api/tests/unit/api/test_run_supervisor.py +++ b/apps/api/tests/unit/api/test_run_supervisor.py @@ -36,6 +36,7 @@ run_supervisor_lifespan, ) from cora.decision.aggregates.decision import load_decision +from cora.infrastructure.adapters.in_memory_event_store import InMemoryEventStore from cora.infrastructure.config import Settings from cora.infrastructure.deps import make_inmemory_kernel from cora.infrastructure.kernel import Kernel @@ -326,6 +327,7 @@ async def _tick( stall: set[UUID] | None = None, stall_streak: dict[UUID, int] | None = None, feed_dead_warned: set[UUID] | None = None, + advise_enabled: bool = False, ) -> None: """Call _supervise_tick, defaulting the resume wiring (off) for hold-only tests.""" if resume_run is None: @@ -348,6 +350,7 @@ async def _tick( resume_enabled=resume_enabled, resume_settle_ticks=resume_settle_ticks, liveness_ceiling_seconds=liveness_ceiling_seconds, + advise_enabled=advise_enabled, ) @@ -1608,3 +1611,165 @@ async def test_shadow_quality_would_flag_log_carries_is_simulated_provenance() - assert len(flagged) == 1 assert flagged[0]["is_simulated"] is True assert flagged[0]["run_id"] == str(run_id) + + +# ---------- advise rung: edge-triggered Decision, still no command ---------- + + +async def _supervision_decision_choices(kernel: Kernel) -> list[str]: + """All RunSupervision Decision choices written to the event store.""" + choices: list[str] = [] + store = kernel.event_store + assert isinstance(store, InMemoryEventStore) + for stream_type, stream_id in store._streams: + if stream_type != "Decision": + continue + decision = await load_decision(store, stream_id) + if decision is not None and decision.context.value == "RunSupervision": + choices.append(decision.choice.value) + return choices + + +@pytest.mark.unit +async def test_advise_off_quality_breach_records_no_decision() -> None: + """Default (advise off): a quality breach logs the shadow would-flag but + records NO Decision and issues no command.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, snr_limit=5.0)]) + hold_run, hold_calls = _make_recording_hold() + lookup = InMemoryRunChannelLookup() + lookup.register(run_id=run_id, channel_name="snr", value=2.0, recorded_at=_NOW) + + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_quality(), + quality=set(), + advise_enabled=False, + ) + + assert await _supervision_decision_choices(kernel) == [] + assert hold_calls == [] + + +@pytest.mark.unit +async def test_advise_quality_breach_records_supervision_breached_without_command() -> None: + """Advise on: a quality breach records exactly one SupervisionBreached + Decision and still issues no command (advise rung).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, snr_limit=5.0)]) + hold_run, hold_calls = _make_recording_hold() + resume_run, resume_calls = _make_recording_resume() + lookup = InMemoryRunChannelLookup() + lookup.register(run_id=run_id, channel_name="snr", value=2.0, recorded_at=_NOW) + + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + resume_run=resume_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_quality(), + quality=set(), + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == ["SupervisionBreached"] + assert hold_calls == [] + assert resume_calls == [] + + +@pytest.mark.unit +async def test_advise_quality_breach_is_edge_triggered_single_decision() -> None: + """Two ticks with the same standing breach record only ONE Decision + (edge-triggered off the shared quality set).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, snr_limit=5.0)]) + hold_run, _hold = _make_recording_hold() + lookup = InMemoryRunChannelLookup() + lookup.register(run_id=run_id, channel_name="snr", value=2.0, recorded_at=_NOW) + quality: set[UUID] = set() + + for _ in range(2): + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_quality(), + quality=quality, + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == ["SupervisionBreached"] + + +@pytest.mark.unit +async def test_advise_stall_breach_records_supervision_stalled_without_command() -> None: + """Advise on: a stall (alive feeder, beam up, zero arrivals) records one + SupervisionStalled Decision once the hysteresis streak is met; no command.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, expected_observation_interval_seconds=10.0)]) + hold_run, hold_calls = _make_recording_hold() + lookup = InMemoryRunChannelLookup() + lookup.register_heartbeat(run_id=run_id, recorded_at=_NOW) + stall: set[UUID] = set() + stall_streak: dict[UUID, int] = {} + + for _ in range(2): # hysteresis = 2 + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_stall(hysteresis=2), + stall=stall, + stall_streak=stall_streak, + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == ["SupervisionStalled"] + assert hold_calls == [] + + +@pytest.mark.unit +async def test_advise_liveness_stale_records_supervision_quieted_without_command() -> None: + """Advise on: a Run past the run-age ceiling records one SupervisionQuieted + Decision; no command.""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, running_since=_NOW - timedelta(hours=2))]) + hold_run, hold_calls = _make_recording_hold() + + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + liveness=set(), + liveness_ceiling_seconds=3600.0, + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == ["SupervisionQuieted"] + assert hold_calls == [] From f90f7498468cfb8e0e7adf70b7f428f494f56406 Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Sun, 21 Jun 2026 23:25:44 +0300 Subject: [PATCH 3/4] test(api): cover advise-rung edge-trigger + cannot-tell gates Gate-review follow-ups (the advise diff drew 2 ship + 1 changes_needed, the last purely a test-coverage gap; the correctness/trust lens passed clean). Adds three tests: - advise liveness is edge-triggered: two ticks of a standing stale Run record only ONE SupervisionQuieted Decision (parity with the quality + stall edge-trigger tests). - advise records no Decision when the quality channel has no observation (cannot-tell -> defer; pins that the value-None path never emits, which a reviewer worried about -- the decider returns would_flag=False on None). - advise records no Decision when the rule is disabled (snr_limit None): advise respects each rule's own enable, not just the global advise flag. Test-only; no production change. Co-Authored-By: Claude Opus 4.8 --- .../api/tests/unit/api/test_run_supervisor.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/apps/api/tests/unit/api/test_run_supervisor.py b/apps/api/tests/unit/api/test_run_supervisor.py index abd587223b..ede16c424f 100644 --- a/apps/api/tests/unit/api/test_run_supervisor.py +++ b/apps/api/tests/unit/api/test_run_supervisor.py @@ -1773,3 +1773,83 @@ async def test_advise_liveness_stale_records_supervision_quieted_without_command assert await _supervision_decision_choices(kernel) == ["SupervisionQuieted"] assert hold_calls == [] + + +@pytest.mark.unit +async def test_advise_liveness_stale_is_edge_triggered_single_decision() -> None: + """Two ticks with the same standing stale Run record only ONE + SupervisionQuieted Decision (edge-triggered off the shared liveness set).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, running_since=_NOW - timedelta(hours=2))]) + hold_run, _hold = _make_recording_hold() + liveness: set[UUID] = set() + + for _ in range(2): + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + liveness=liveness, + liveness_ceiling_seconds=3600.0, + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == ["SupervisionQuieted"] + + +@pytest.mark.unit +async def test_advise_records_no_decision_when_quality_channel_has_no_observation() -> None: + """Advise on but the quality channel never produced a value: cannot-tell, so + no flag and no Decision (the value-None case never emits a SupervisionBreached).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, snr_limit=5.0)]) + hold_run, _hold = _make_recording_hold() + lookup = InMemoryRunChannelLookup() # no snr value registered for this Run + + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_quality(), + quality=set(), + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == [] + + +@pytest.mark.unit +async def test_advise_records_no_decision_when_rule_disabled() -> None: + """Advise on but the Run has no precomputed snr_limit: Rule Q is disabled for + it, so advise stays silent (advise respects each rule's own enable, not just + the global advise flag).""" + kernel = _kernel() + await seed_run_supervisor_agent(kernel) + run_id = uuid4() + list_runs = _make_list_runs([_running_item(run_id, snr_limit=None)]) + hold_run, _hold = _make_recording_hold() + lookup = InMemoryRunChannelLookup() + lookup.register(run_id=run_id, channel_name="snr", value=0.1, recorded_at=_NOW) + + await _tick( + kernel, + list_runs=list_runs, + hold_run=hold_run, + beam_lookup=_BeamOpen(), + memory={}, + channel_lookup=lookup, + rules_config=_rules_quality(), + quality=set(), + advise_enabled=True, + ) + + assert await _supervision_decision_choices(kernel) == [] From bcbfe56d0a63fa6536a8f9417005297be60ea03f Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Sun, 21 Jun 2026 23:45:55 +0300 Subject: [PATCH 4/4] test(api): cover the advise-emitter ConcurrencyError no-op branch The diff-coverage gate (hard 90% on changed lines) flagged _run_supervisor.py at 88.9%: the new _record_supervision_advice except ConcurrencyError branch (lines 490-491) was uncovered. Adds an idempotency test that re-derives the same advise Decision id (via a FixedIdGenerator repeating the id) so the second append collides and is swallowed -- mirrors the existing test_record_decision_is_idempotent_on_repeated_id for the beam-Hold path. Test-only; covers the cross-restart re-emission no-op. Co-Authored-By: Claude Opus 4.8 --- .../api/tests/unit/api/test_run_supervisor.py | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/apps/api/tests/unit/api/test_run_supervisor.py b/apps/api/tests/unit/api/test_run_supervisor.py index ede16c424f..4a00646180 100644 --- a/apps/api/tests/unit/api/test_run_supervisor.py +++ b/apps/api/tests/unit/api/test_run_supervisor.py @@ -40,7 +40,12 @@ from cora.infrastructure.config import Settings from cora.infrastructure.deps import make_inmemory_kernel from cora.infrastructure.kernel import Kernel -from cora.infrastructure.ports import AllowAllAuthorize, FakeClock, UUIDv7Generator +from cora.infrastructure.ports import ( + AllowAllAuthorize, + FakeClock, + FixedIdGenerator, + UUIDv7Generator, +) from cora.infrastructure.ports.beam_availability_lookup import ( BeamAvailabilityLookup, BeamAvailabilityLookupResult, @@ -1853,3 +1858,32 @@ async def test_advise_records_no_decision_when_rule_disabled() -> None: ) assert await _supervision_decision_choices(kernel) == [] + + +@pytest.mark.unit +async def test_record_supervision_advice_is_idempotent_on_repeated_id() -> None: + """A re-derived advise Decision id (cross-restart re-emission) is a + ConcurrencyError no-op, not a crash (mirrors _record_decision).""" + from cora.api._run_supervisor import _record_supervision_advice + + shared_id = uuid4() + # Each emission mints decision_id then correlation_id; repeat decision_id so + # the second append collides on the existing Decision stream. + kernel = make_inmemory_kernel( + settings=Settings(), # type: ignore[call-arg] + clock=FakeClock(_NOW), + id_generator=FixedIdGenerator([shared_id, uuid4(), shared_id, uuid4()]), + authz=AllowAllAuthorize(), + ) + run_id = uuid4() + for _ in range(2): + await _record_supervision_advice( + kernel, + run_id=run_id, + choice="SupervisionStalled", + inputs={"channel": "projection_index"}, + reasoning="stalled (advise rung)", + ) + + # The second emission re-derived the same id and was swallowed: one Decision. + assert await _supervision_decision_choices(kernel) == ["SupervisionStalled"]