Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions apps/api/src/cora/api/_run_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,65 @@ async def _record_decision(
_log.info("run_supervisor.decision_already_written", choice=choice)


async def _record_supervision_advice(
deps: Kernel,
*,
run_id: UUID,
choice: str,
inputs: dict[str, str],
reasoning: str,
) -> None:
"""Append one advise-rung DecisionRegistered for a shadow-rule breach edge.

The advise rung: when `run_supervisor_advise_enabled` is on, the run-liveness
/ quality / stall rules emit ONE Decision(context=RunSupervision,
choice=SupervisionQuieted/Breached/Stalled) per breach edge for a human, and
still issue NO command (Decision-only). Beam-free (unlike `_record_decision`):
the liveness rule runs before the beam read, and the quality/stall evidence is
the rule's own inputs. Edge-triggered by the caller's per-rule memory, so a
fresh id per episode is fine; a ConcurrencyError on cross-restart
re-derivation is treated as success (same posture as `_record_decision`).
"""
now = deps.clock.now()
decision_id = deps.id_generator.new_id()
decision_inputs = {"run_id": str(run_id), **inputs}
domain_event = DecisionRegistered(
decision_id=decision_id,
decided_by=ActorId(RUN_SUPERVISOR_AGENT_ID),
context=DecisionContext(DECISION_CONTEXT_RUN_SUPERVISION).value,
choice=DecisionChoice(choice).value,
parent_id=None,
override_kind=None,
rule=DecisionRule(_RULE).value,
reasoning=validate_reasoning(reasoning),
confidence=validate_confidence(None),
confidence_source=DecisionConfidenceSource.SELF_REPORTED,
alternatives=(),
inputs=validate_inputs(decision_inputs),
reasoning_signature=None,
occurred_at=now,
)
new_event = to_new_event(
event_type=event_type_name(domain_event),
payload=to_payload(domain_event),
occurred_at=now,
event_id=uuid5(decision_id, "event:0"),
command_name=_COMMAND_NAME,
correlation_id=deps.id_generator.new_id(),
causation_id=None,
principal_id=RUN_SUPERVISOR_AGENT_ID,
)
try:
await deps.event_store.append(
stream_type=_STREAM_TYPE,
stream_id=decision_id,
expected_version=0,
events=[new_event],
)
except ConcurrencyError:
_log.info("run_supervisor.decision_already_written", choice=choice)


async def _issue_hold(
deps: Kernel,
hold_run: HoldRunHandler,
Expand Down Expand Up @@ -594,15 +653,17 @@ async def _observe_run_signals(
stall: set[UUID],
stall_streak: dict[UUID, int],
feed_dead_warned: set[UUID],
advise_enabled: bool,
) -> None:
"""Shadow observation rules (Rule Q + Rule R): OBSERVE-ONLY.

Logs `run_quality.would_flag` / `run_stall.would_flag`, records NO Decision
and issues NO command (byte-identical posture to the run-liveness shadow).
Each rule keeps its OWN edge-trigger state (`quality`, `stall` + the
`stall_streak` hysteresis counter), walled off from the beam-Hold FSM
memory, the liveness set, and each other. Reuses the tick's single beam
read for Rule R's beam-awareness.
"""Observation rules (Rule Q + Rule R): shadow log + optional advise.

Always logs `run_quality.would_flag` / `run_stall.would_flag` and issues NO
command. When `advise_enabled`, ALSO records one Decision per breach EDGE
(SupervisionBreached for Rule Q, SupervisionStalled for Rule R); still no
command (advise rung). Each rule keeps its OWN edge-trigger state (`quality`,
`stall` + the `stall_streak` hysteresis counter), walled off from the
beam-Hold FSM memory, the liveness set, and each other. Reuses the tick's
single beam read for Rule R's beam-awareness.
"""
now = deps.clock.now()
beam_open = beam.quality_ok and beam.fes_open and beam.sbs_open and beam.fes_permit
Expand Down Expand Up @@ -630,6 +691,25 @@ async def _observe_run_signals(
snr_limit=item.snr_limit,
is_simulated=latest.is_simulated if latest is not None else None,
)
if advise_enabled:
await _record_supervision_advice(
deps,
run_id=run_id,
choice="SupervisionBreached",
inputs={
"channel": quality_channel,
"value": str(latest.value) if latest is not None else "None",
"snr_limit": str(item.snr_limit),
"is_simulated": str(latest.is_simulated)
if latest is not None
else "None",
},
reasoning=(
"A quality channel's latest value crossed below the "
"operator-set limit; flagged for a human to review data "
"quality. No command issued (advise rung)."
),
)
elif not disp.would_flag:
quality.discard(run_id)

Expand Down Expand Up @@ -685,6 +765,24 @@ async def _observe_run_signals(
streak=streak,
is_simulated=signal.is_simulated_window,
)
if advise_enabled:
await _record_supervision_advice(
deps,
run_id=run_id,
choice="SupervisionStalled",
inputs={
"channel": stall_channel,
"window_seconds": str(window_seconds),
"expected_interval": str(interval),
"is_simulated": str(signal.is_simulated_window),
},
reasoning=(
"A live observation channel stopped arriving (no values "
"for longer than the expected interval) while the beam is "
"up and the feeder is alive; flagged as a possible stall. "
"No command issued (advise rung)."
),
)
else:
stall_streak.pop(run_id, None)
stall.discard(run_id)
Expand All @@ -709,9 +807,10 @@ async def _supervise_tick(
resume_enabled: bool,
resume_settle_ticks: int,
liveness_ceiling_seconds: float | None,
advise_enabled: bool,
) -> None:
"""One supervision pass over all in-flight Runs (hold + gated resume +
shadow liveness)."""
shadow liveness + optional advise)."""
actor = await load_actor(deps.event_store, RUN_SUPERVISOR_AGENT_ID)
if actor is None or not actor.active:
# Supervisor not seeded yet, or deactivated by an operator: stand down.
Expand Down Expand Up @@ -758,12 +857,28 @@ async def _supervise_tick(
):
if item.run_id not in liveness:
liveness.add(item.run_id)
running_seconds = int((now - running_since).total_seconds())
_log.info(
"run_liveness.would_flag",
run_id=str(item.run_id),
running_seconds=int((now - running_since).total_seconds()),
running_seconds=running_seconds,
ceiling_seconds=liveness_ceiling_seconds,
)
if advise_enabled:
await _record_supervision_advice(
deps,
run_id=item.run_id,
choice="SupervisionQuieted",
inputs={
"running_seconds": str(running_seconds),
"ceiling_seconds": str(liveness_ceiling_seconds),
},
reasoning=(
"The Run has been Running far past the operator run-age "
"ceiling without progressing; flagged for a human to check "
"whether it is hung. No command issued (advise rung)."
),
)
else:
liveness.discard(item.run_id)

Expand Down Expand Up @@ -805,6 +920,7 @@ async def _supervise_tick(
stall=stall,
stall_streak=stall_streak,
feed_dead_warned=feed_dead_warned,
advise_enabled=advise_enabled,
)

# Gated resume pass (Held Runs the supervisor holds).
Expand Down Expand Up @@ -858,6 +974,7 @@ async def _supervise_loop(
resume_enabled: bool,
resume_settle_ticks: int,
liveness_ceiling_seconds: float | None,
advise_enabled: bool,
) -> None:
"""Periodic supervision loop. A failed tick is logged; the next tick retries."""
memory: dict[UUID, str] = {}
Expand Down Expand Up @@ -887,6 +1004,7 @@ async def _supervise_loop(
resume_enabled=resume_enabled,
resume_settle_ticks=resume_settle_ticks,
liveness_ceiling_seconds=liveness_ceiling_seconds,
advise_enabled=advise_enabled,
)
except asyncio.CancelledError:
raise
Expand Down Expand Up @@ -940,6 +1058,7 @@ async def run_supervisor_lifespan(
resume_enabled = deps.settings.run_supervisor_resume_enabled
resume_settle_ticks = deps.settings.run_supervisor_resume_settle_ticks
liveness_ceiling_seconds = deps.settings.run_liveness_ceiling_seconds
advise_enabled = deps.settings.run_supervisor_advise_enabled
rules_config = ObservationRuleConfig(
quality_channel_name=deps.settings.run_quality_channel_name,
stall_channel_name=deps.settings.run_stall_channel_name,
Expand All @@ -954,6 +1073,7 @@ async def run_supervisor_lifespan(
liveness_ceiling_seconds=liveness_ceiling_seconds,
quality_channel=rules_config.quality_channel_name,
stall_channel=rules_config.stall_channel_name,
advise_enabled=advise_enabled,
)
task = asyncio.create_task(
_supervise_loop(
Expand All @@ -968,6 +1088,7 @@ async def run_supervisor_lifespan(
resume_enabled,
resume_settle_ticks,
liveness_ceiling_seconds,
advise_enabled,
),
name="run-supervisor",
)
Expand Down
22 changes: 21 additions & 1 deletion apps/api/src/cora/decision/aggregates/decision/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@

# Closed `choice` value set for `context = "RunSupervision"` Decisions.
# Projection-validated, not domain-enforced (the open-string
# `DecisionContext` + `DecisionChoice` shape is preserved). Seven values:
# `DecisionContext` + `DecisionChoice` shape is preserved). Ten values:
# five beam-Hold/Resume + two audit-fallback + three advise-rung
# observe->advise dispositions (Quieted / Stalled / Breached), the last
# three Decision-only (one per breach edge, never a command).
#
# - `Continue` -- no wind-down trigger met; no command
# issued (the NoAction-bias default).
Expand All @@ -320,6 +323,17 @@
# - `SupervisionConflicted` -- audit-only: lost the per-Run lease race
# to another supervisor evaluator (parallel
# to DebriefConflicted / CautionDraftConflicted).
# - `SupervisionQuieted` -- advise-rung: the run-age run-liveness
# backstop fired (a Run has been Running
# implausibly long). Decision-only, no command.
# - `SupervisionStalled` -- advise-rung: a live observation channel's
# arrivals stopped (Rule R rate-dropout) while
# beam up + feeder alive. Decision-only.
# - `SupervisionBreached` -- advise-rung: a quality channel's latest
# value crossed below the operator-set limit
# (Rule Q). Decision-only. (Named by naming-r3:
# the limit was breached, an objective edge,
# not the supervisor's epistemic state.)
RunSupervisionChoice = Literal[
"Continue",
"Hold",
Expand All @@ -328,6 +342,9 @@
"Abort",
"SupervisionDeferred",
"SupervisionConflicted",
"SupervisionQuieted",
"SupervisionStalled",
"SupervisionBreached",
]
RUN_SUPERVISION_CHOICES: Final = frozenset(
{
Expand All @@ -338,6 +355,9 @@
"Abort",
"SupervisionDeferred",
"SupervisionConflicted",
"SupervisionQuieted",
"SupervisionStalled",
"SupervisionBreached",
}
)

Expand Down
10 changes: 10 additions & 0 deletions apps/api/src/cora/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ class Settings(BaseSettings):
run_supervisor_resume_enabled: bool = False
run_supervisor_resume_settle_ticks: int = 2

# `run_supervisor_advise_enabled` promotes the supervisor's shadow rules
# (run-liveness, signal-quality, signal-stall) one rung from observe to
# advise: on each breach EDGE the supervisor records ONE
# Decision(context=RunSupervision, choice=SupervisionQuieted/Stalled/
# Breached) for a human, still issuing NO command. Default off, a further
# opt-in above each rule's own enable (a rule with no channel / ceiling
# configured stays silent regardless). Shadow logging continues unchanged;
# advise only adds the edge-triggered Decision.
run_supervisor_advise_enabled: bool = False

# `run_liveness_ceiling_seconds` gates the run-liveness shadow rule
# inside the RunSupervisor loop: a Run that has been Running longer than this
# (now - running_since) is flagged as possibly-hung. Default None = OFF (a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool)
stall_streak={},
feed_dead_warned=set(),
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
resume_settle_ticks=1,
)
Expand All @@ -413,6 +414,7 @@ async def test_supervisor_auto_resumes_when_envelope_safe(db_pool: asyncpg.Pool)
stall_streak={},
feed_dead_warned=set(),
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
resume_settle_ticks=1,
)
Expand Down Expand Up @@ -465,6 +467,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo
stall_streak={},
feed_dead_warned=set(),
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
resume_settle_ticks=1,
)
Expand Down Expand Up @@ -496,6 +499,7 @@ async def test_supervisor_stays_held_when_clearance_expired(db_pool: asyncpg.Poo
stall_streak={},
feed_dead_warned=set(),
liveness_ceiling_seconds=None,
advise_enabled=False,
resume_enabled=True,
resume_settle_ticks=1,
)
Expand Down
Loading
Loading