From ee393be87b8fabec2225b40dc4b5f245da7c2ce0 Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Mon, 22 Jun 2026 09:59:59 +0300 Subject: [PATCH 1/2] feat(agent): ProcedureWatcher flags stalled in-conduct procedures CORA's 8th seeded agent and first liveness automation on the Operation BC. A deterministic, flag-only, composition-root periodic watcher: each tick it lists in-conduct procedures (Running / Held) and records one Decision(context=ProcedureProgress, choice=Stall) per stall episode for any that has sat past an operator window without progressing. It issues no command (it surfaces the stall so a human acts before an experiment hangs unnoticed mid-procedure). Procedure is a distinct aggregate from Run, so this is a liveness gap RunSupervisor does not cover. The load-bearing guard is the anti-false-flag fold. Appending activity steps does not advance proj_operation_procedure_summary.last_status_changed_at (the projection NO-OPs ProcedureActivitiesLogbookOpened against it; activity is orthogonal to lifecycle), so a Running procedure actively logging steps would look frozen by its status timestamp alone. Keying on it without folding in activity recency would false-flag an actively-progressing conduct, a foolable watchdog that is worse than none. So a Running candidate already past its status-timestamp window gets one read of the latest activity recorded_at before it is flagged; Held is not folded (a paused conduct accepts no activity). This mirrors ClearanceWatcher folding ReviewStep.decided_at for UnderReview. Activity recency had no existing read path: activity rows land in the write-only ActivityStore side table and the aggregate stream carries only the one-time ProcedureActivitiesLogbookOpened marker. So this adds a BC-local ProcedureActivityLookup read port (+ Postgres adapter + in-memory stub), keyed on recorded_at (the CORA write-time trust anchor, not the spoofable sampled_at), riding a new (procedure_id, recorded_at DESC) index. Mirrors the RunChannelLookup pattern. naming-r3: context ProcedureProgress (the lifecycle dimension, family-clean with ClearanceProgress; not the state-noun ProcedureStall, not the existing conduct context ProcedureExecution); choice Stall (Flag and Stale are taken by sibling contexts, and choice tokens must be globally unique in the DecisionChoice projection); agent kind ProcedureWatcher (two-word aggregate-named, like ClearanceWatcher / CalibrationWatcher). Agent id in a new 0c0c block. Off by default, gates on Actor.active. Co-Authored-By: Claude Opus 4.8 --- apps/api/src/cora/agent/__init__.py | 6 + .../src/cora/agent/seed_procedure_watcher.py | 105 +++++ apps/api/src/cora/api/_procedure_watcher.py | 332 ++++++++++++++ apps/api/src/cora/api/main.py | 8 + .../decision/aggregates/decision/__init__.py | 6 + .../decision/aggregates/decision/state.py | 26 ++ apps/api/src/cora/infrastructure/config.py | 36 ++ .../postgres_procedure_activity_lookup.py | 48 +++ apps/api/src/cora/operation/ports/__init__.py | 8 + .../ports/procedure_activity_lookup.py | 108 +++++ ...test_procedure_activity_lookup_postgres.py | 84 ++++ .../unit/agent/test_procedure_watcher_seed.py | 105 +++++ .../tests/unit/api/test_procedure_watcher.py | 408 ++++++++++++++++++ .../decision/test_procedure_progress_vocab.py | 69 +++ .../test_procedure_activity_lookup.py | 52 +++ docs/architecture/modules/agent/index.md | 9 +- ...procedure_activities_proc_recorded_idx.sql | 22 + infra/atlas/migrations/atlas.sum | 3 +- 18 files changed, 1430 insertions(+), 5 deletions(-) create mode 100644 apps/api/src/cora/agent/seed_procedure_watcher.py create mode 100644 apps/api/src/cora/api/_procedure_watcher.py create mode 100644 apps/api/src/cora/operation/adapters/postgres_procedure_activity_lookup.py create mode 100644 apps/api/src/cora/operation/ports/procedure_activity_lookup.py create mode 100644 apps/api/tests/integration/test_procedure_activity_lookup_postgres.py create mode 100644 apps/api/tests/unit/agent/test_procedure_watcher_seed.py create mode 100644 apps/api/tests/unit/api/test_procedure_watcher.py create mode 100644 apps/api/tests/unit/decision/test_procedure_progress_vocab.py create mode 100644 apps/api/tests/unit/operation/test_procedure_activity_lookup.py create mode 100644 infra/atlas/migrations/20260622010000_add_entries_operation_procedure_activities_proc_recorded_idx.sql diff --git a/apps/api/src/cora/agent/__init__.py b/apps/api/src/cora/agent/__init__.py index f776cc8679..06680f99b3 100644 --- a/apps/api/src/cora/agent/__init__.py +++ b/apps/api/src/cora/agent/__init__.py @@ -54,6 +54,10 @@ CLEARANCE_WATCHER_AGENT_ID, seed_clearance_watcher_agent, ) +from cora.agent.seed_procedure_watcher import ( + PROCEDURE_WATCHER_AGENT_ID, + seed_procedure_watcher_agent, +) from cora.agent.seed_run_supervisor import ( RUN_SUPERVISOR_AGENT_ID, seed_run_supervisor_agent, @@ -66,6 +70,7 @@ "CAUTION_PROMOTER_AGENT_ID", "CLEARANCE_EXPIRER_AGENT_ID", "CLEARANCE_WATCHER_AGENT_ID", + "PROCEDURE_WATCHER_AGENT_ID", "RUN_SUPERVISOR_AGENT_ID", "AgentHandlers", "CautionProposalMalformedError", @@ -83,6 +88,7 @@ "seed_caution_promoter_agent", "seed_clearance_expirer_agent", "seed_clearance_watcher_agent", + "seed_procedure_watcher_agent", "seed_run_debriefer_agent", "seed_run_supervisor_agent", "wire_agent", diff --git a/apps/api/src/cora/agent/seed_procedure_watcher.py b/apps/api/src/cora/agent/seed_procedure_watcher.py new file mode 100644 index 0000000000..4dee441544 --- /dev/null +++ b/apps/api/src/cora/agent/seed_procedure_watcher.py @@ -0,0 +1,105 @@ +"""Bootstrap-time seed for the ProcedureWatcher Agent. + +The ProcedureWatcher runtime (CORA's 8th seeded agent, a composition-root +periodic loop, deterministic flag-only) needs an Agent record (and its +co-registered Actor) at the pinned `PROCEDURE_WATCHER_AGENT_ID` so it can author +ProcedureProgress Decisions (`decided_by = ActorId(PROCEDURE_WATCHER_AGENT_ID)`) +as an agent-kind principal. Mirrors `cora.agent.seed_calibration_watcher` except +for the per-agent constants; the shared scaffolding lives in +`cora.agent._agent_seed`. + +Per [[project-procedure-watcher-design]]: + - Pinned UUID in the `0c0c` block (eighth seeded identity, sibling to + RunDebriefer `aaaa00XX`, CautionDrafter `bbbb00XX`, RunSupervisor + `cccc00XX`, CautionPromoter `dddd00XX`, ClearanceExpirer `eeee00XX`, + ClearanceWatcher `ffff00XX`, CalibrationWatcher `ca1100XX`); + deployment-stable forever. + - DETERMINISTIC agent (rule-based, NOT LLM): no prompt template and a sentinel + `ModelRef` (`provider="deterministic"`), never used to build an LLM (the + runtime is a periodic staleness comparison over in-conduct procedures). + - FLAG-ONLY: the runtime issues NO command. It records one + Decision(context=ProcedureProgress, choice=Stall) per stall episode for a + human to act on. Permission to record Decisions is granted at + agent-definition time (the RunDebriefer stance); there is no + authorized-command leg and so no per-command Policy grant to seed. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from uuid import UUID + +from cora.agent._agent_seed import AgentSeedIdentity, seed_agent +from cora.agent.aggregates.agent import ModelRef + +if TYPE_CHECKING: + from cora.infrastructure.kernel import Kernel + + +# --------------------------------------------------------------------------- +# ProcedureWatcher agent identity (deployment-stable constants) +# --------------------------------------------------------------------------- + +# Treat as FOREVER-STABLE. Same change-cost rationale as the other seeded +# agents: changing this orphans every prior ProcedureWatcher-authored Decision +# (their actor_id pointers go stale). UUID is in the deployment-controlled +# `0c0c` block (eighth seeded identity). +PROCEDURE_WATCHER_AGENT_ID = UUID("01900000-0000-7000-8000-00000c0c0010") +PROCEDURE_WATCHER_AGENT_NAME = "ProcedureWatcher" +PROCEDURE_WATCHER_AGENT_KIND = "ProcedureWatcher" +PROCEDURE_WATCHER_AGENT_VERSION = "1.0.0" +PROCEDURE_WATCHER_AGENT_DESCRIPTION = ( + "Deterministic in-loop agent: watches in-conduct procedures (Running / Held) " + "and records one Decision(context=ProcedureProgress, choice=Stall) per " + "procedure that has sat past the staleness window without progressing. For a " + "Running candidate it folds in the latest activity recorded_at first, so an " + "actively-logging conduct is not falsely flagged. Flag-only (advise a " + "human); issues no command." +) + + +# Sentinel model ref: ProcedureWatcher is rule-based, not an LLM agent. The +# Agent aggregate requires a ModelRef; this value is never used to build an LLM +# (the runtime is a periodic staleness comparison, no build_llm call). +_DETERMINISTIC_MODEL_REF = ModelRef( + provider="deterministic", + model="agent:ProcedureWatcher:v1", + snapshot_pin=None, +) + + +# --------------------------------------------------------------------------- +# Deterministic IDs for the bootstrap write envelope +# --------------------------------------------------------------------------- + +_AGENT_EVENT_ID = UUID("01900000-0000-7000-8000-00000c0c0012") +_ACTOR_EVENT_ID = UUID("01900000-0000-7000-8000-00000c0c0013") +_BOOTSTRAP_CORRELATION_ID = UUID("01900000-0000-7000-8000-00000c0c0014") + + +async def seed_procedure_watcher_agent(kernel: Kernel) -> None: + """Seed the ProcedureWatcher Agent + co-registered Actor (idempotent).""" + identity = AgentSeedIdentity( + agent_id=PROCEDURE_WATCHER_AGENT_ID, + name=PROCEDURE_WATCHER_AGENT_NAME, + kind=PROCEDURE_WATCHER_AGENT_KIND, + version=PROCEDURE_WATCHER_AGENT_VERSION, + description=PROCEDURE_WATCHER_AGENT_DESCRIPTION, + model_ref=_DETERMINISTIC_MODEL_REF, + prompt_template_id=None, + agent_event_id=_AGENT_EVENT_ID, + actor_event_id=_ACTOR_EVENT_ID, + correlation_id=_BOOTSTRAP_CORRELATION_ID, + command_name="SeedProcedureWatcherAgent", + ) + await seed_agent(kernel, identity) + + +__all__ = [ + "PROCEDURE_WATCHER_AGENT_DESCRIPTION", + "PROCEDURE_WATCHER_AGENT_ID", + "PROCEDURE_WATCHER_AGENT_KIND", + "PROCEDURE_WATCHER_AGENT_NAME", + "PROCEDURE_WATCHER_AGENT_VERSION", + "seed_procedure_watcher_agent", +] diff --git a/apps/api/src/cora/api/_procedure_watcher.py b/apps/api/src/cora/api/_procedure_watcher.py new file mode 100644 index 0000000000..8461d0b638 --- /dev/null +++ b/apps/api/src/cora/api/_procedure_watcher.py @@ -0,0 +1,332 @@ +"""ProcedureWatcher runtime: the 8th seeded agent, third pure flag-only watcher. + +A periodic background task, hosted at the composition root (`cora.api`) because +it reads the Operation BC AND composes Decision BC events; only `cora.api` may +depend on both (same placement rationale as `_clearance_watcher`, +`_calibration_watcher`, and `_run_supervisor`). See +[[project-procedure-watcher-design]]. + +## What v1 does + +Each tick it lists in-conduct procedures (`Running` and `Held`), selects those +that have sat past the operator-config staleness window without progressing, and +records one `Decision(context=ProcedureProgress, choice=Stall)` per stall +EPISODE. It is FLAG-ONLY: it issues NO command (it cannot un-stick a conduct; it +surfaces the stall so a human acts before an experiment hangs unnoticed +mid-procedure). Procedure is a distinct aggregate from Run, so this liveness gap +is one `_run_supervisor` does not cover. + +## Staleness clock and the active-conduct false-positive guard + +`stalled_seconds = now - last_progress_at`. + +For `Held` the conduct is paused and accepts no activity, so +`last_status_changed_at` (the time it was held, on the list projection) is the +correct clock. For `Running`, `proj_operation_procedure_summary` advances +`last_status_changed_at` only on real lifecycle transitions and NO-OPs it for +`ProcedureActivitiesLogbookOpened` / `ProcedureIterationStarted` (activity is +orthogonal to lifecycle); so keying on it alone would FALSE-FLAG a procedure +that is actively logging steps. Therefore a `Running` candidate that already +looks stale by its status timestamp gets ONE per-candidate +`read_procedure_activity_recency` to fold in the latest activity `recorded_at` +before it is flagged. Bounding that read to already-looks-stale `Running` +candidates keeps the per-tick cost low. This mirrors `_clearance_watcher` +folding the latest `ReviewStep.decided_at` for an `UnderReview` clearance. + +## Idempotency / edge-trigger: one Stall per stall episode + +The Decision id is `uuid5(namespace, "decision:{procedure_id}:{last_progress_at}")`, +so re-ticks while the procedure is still stalled hit the same id +(`ConcurrencyError` no-op). A status change or a later activity advances +`last_progress_at`, opening a fresh episode that can flag again if it re-stalls. + +## Fail-safe and bounded + +Flag-only and reversible by construction: the worst outcome is an advisory +Decision a human can ignore. The runtime gates on `Actor.active`, so +deactivating the agent Actor stops it. Off by default +(`settings.procedure_watcher_enabled`). +""" + +from __future__ import annotations + +import asyncio +import contextlib +from typing import TYPE_CHECKING +from uuid import UUID, uuid5 + +from cora.access.aggregates.actor import load_actor +from cora.agent.seed_procedure_watcher import PROCEDURE_WATCHER_AGENT_ID +from cora.decision.aggregates.decision import ( + DECISION_CONTEXT_PROCEDURE_PROGRESS, + DecisionChoice, + DecisionConfidenceSource, + DecisionContext, + DecisionRegistered, + DecisionRule, + event_type_name, + to_payload, + validate_confidence, + validate_inputs, + validate_reasoning, +) +from cora.infrastructure.event_envelope import to_new_event +from cora.infrastructure.logging import get_logger +from cora.infrastructure.ports import ConcurrencyError +from cora.infrastructure.routing import NIL_SENTINEL_ID +from cora.operation.adapters.postgres_procedure_activity_lookup import ( + PostgresProcedureActivityLookup, +) +from cora.operation.features.list_procedures import ListProcedures +from cora.operation.ports import InMemoryProcedureActivityLookup +from cora.shared.identity import ActorId + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from datetime import datetime + + from cora.infrastructure.kernel import Kernel + from cora.operation.features.list_procedures import ProcedureSummaryItem + from cora.operation.features.list_procedures.handler import Handler as ListProceduresHandler + from cora.operation.features.list_procedures.query import ProcedureStatusFilter + from cora.operation.ports import ProcedureActivityLookup + +_log = get_logger(__name__) + +_RULE = "agent:ProcedureWatcher:v1" +_COMMAND_NAME = "ProcedureWatcherTick" +_STREAM_TYPE = "Decision" +_PAGE_LIMIT = 100 +_CHOICE_STALL = "Stall" +_STATUS_RUNNING = "Running" + +# The two in-conduct lifecycle states the watcher surveys. Defined (registered, +# not started) and the terminal states (Completed / Aborted / Truncated) are out +# of scope: only an active or paused conduct can hang mid-flight. +_WATCHED_STATUSES: tuple[ProcedureStatusFilter, ...] = ("Running", "Held") + +# Stable namespace for deriving the deterministic Decision id from the procedure +# id + the stall-episode timestamp (0c0c block, distinct from the seed envelope +# ids), so one Stall is written per stall episode and re-ticks are no-ops. +_DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-00000c0c0002") + + +def is_stalled(last_progress_at: datetime, now: datetime, stale_after_seconds: float) -> bool: + """Pure rule: an in-conduct procedure is stalled once it has sat past the + staleness window without progress. + + Inclusive boundary: elapsed == window FLAGS (`>=`). + """ + return (now - last_progress_at).total_seconds() >= stale_after_seconds + + +def _derive_decision_id(procedure_id: UUID, last_progress_at: datetime) -> UUID: + """Deterministic ProcedureProgress Decision id for one stall episode.""" + return uuid5(_DECISION_NAMESPACE, f"decision:{procedure_id}:{last_progress_at.isoformat()}") + + +def _default_activity_lookup(deps: Kernel) -> ProcedureActivityLookup: + """Build the BC-local ProcedureActivityLookup: Postgres when a pool is + present, the in-memory stub otherwise (the test / pool-less default).""" + if deps.pool is not None: + return PostgresProcedureActivityLookup(deps.pool) + return InMemoryProcedureActivityLookup() + + +async def _record_decision( + deps: Kernel, + *, + procedure_id: UUID, + status: str, + last_progress_at: datetime, + now: datetime, +) -> None: + """Append one DecisionRegistered(context=ProcedureProgress, choice=Stall). + + Idempotent: the deterministic id makes a re-flag of the same stall episode a + ConcurrencyError no-op (mirrors `_calibration_watcher._record_decision`). + """ + decision_id = _derive_decision_id(procedure_id, last_progress_at) + stalled_seconds = int((now - last_progress_at).total_seconds()) + domain_event = DecisionRegistered( + decision_id=decision_id, + decided_by=ActorId(PROCEDURE_WATCHER_AGENT_ID), + context=DecisionContext(DECISION_CONTEXT_PROCEDURE_PROGRESS).value, + choice=DecisionChoice(_CHOICE_STALL).value, + parent_id=None, + override_kind=None, + rule=DecisionRule(_RULE).value, + reasoning=validate_reasoning( + f"Procedure has been {status} for {stalled_seconds}s without progressing " + "(past the staleness window, no recent activity); surfaced for operator " + "follow-up." + ), + confidence=validate_confidence(None), + confidence_source=DecisionConfidenceSource.SELF_REPORTED, + alternatives=(), + inputs=validate_inputs( + { + "procedure_id": str(procedure_id), + "status": status, + "last_progress_at": last_progress_at.isoformat(), + "stalled_seconds": str(stalled_seconds), + "occurred_at": now.isoformat(), + } + ), + reasoning_signature=None, + occurred_at=now, + ) + new_event = to_new_event( + event_type=event_type_name(domain_event), + payload=to_payload(domain_event), + occurred_at=now, + event_id=uuid5(decision_id, "event:0"), + command_name=_COMMAND_NAME, + correlation_id=deps.id_generator.new_id(), + causation_id=None, + principal_id=PROCEDURE_WATCHER_AGENT_ID, + ) + try: + await deps.event_store.append( + stream_type=_STREAM_TYPE, + stream_id=decision_id, + expected_version=0, + events=[new_event], + ) + except ConcurrencyError: + _log.info("procedure_watcher.decision_already_written", procedure_id=str(procedure_id)) + return + _log.info("procedure_watcher.flagged", procedure_id=str(procedure_id), status=status) + + +async def _drain_watched_procedures( + list_procedures: ListProceduresHandler, deps: Kernel +) -> list[ProcedureSummaryItem]: + """Page through list_procedures for each watched in-conduct status.""" + items: list[ProcedureSummaryItem] = [] + for status in _WATCHED_STATUSES: + cursor: str | None = None + while True: + page = await list_procedures( + ListProcedures(status=status, cursor=cursor, limit=_PAGE_LIMIT), + principal_id=PROCEDURE_WATCHER_AGENT_ID, + correlation_id=deps.id_generator.new_id(), + surface_id=NIL_SENTINEL_ID, + ) + items.extend(page.items) + if page.next_cursor is None: + break + cursor = page.next_cursor + return items + + +async def _watch_tick( + *, + deps: Kernel, + list_procedures: ListProceduresHandler, + activity_lookup: ProcedureActivityLookup, +) -> None: + """One watch sweep over all in-conduct procedures.""" + actor = await load_actor(deps.event_store, PROCEDURE_WATCHER_AGENT_ID) + if actor is None or not actor.active: + # Agent not seeded yet, or deactivated by an operator: stand down. + return + + now = deps.clock.now() + stale_after = deps.settings.procedure_watcher_stale_after_seconds + items = await _drain_watched_procedures(list_procedures, deps) + for item in items: + if item.status not in _WATCHED_STATUSES: + # Defensive guard against a future filter widening: only flag the + # in-conduct states this watcher owns. + continue + base = item.last_status_changed_at + if base is None: + # No status-change timestamp recorded: cannot evaluate; defer. + continue + if not is_stalled(base, now, stale_after): + # Fresh by status timestamp. For Running a later activity only makes + # it fresher, so skipping here cannot hide a stall. + continue + last_progress_at = base + if item.status == _STATUS_RUNNING: + # Looks stale by status ts, but appending activity does NOT advance + # it; confirm against the latest activity recorded_at before flagging. + recency = await activity_lookup.read_procedure_activity_recency( + procedure_id=item.procedure_id + ) + activity_at = recency.latest_recorded_at + if activity_at is not None and activity_at > last_progress_at: + last_progress_at = activity_at + if not is_stalled(last_progress_at, now, stale_after): + continue # actively logging: progressing, not stalled + await _record_decision( + deps, + procedure_id=item.procedure_id, + status=item.status, + last_progress_at=last_progress_at, + now=now, + ) + + +async def _watch_loop( + deps: Kernel, + list_procedures: ListProceduresHandler, + activity_lookup: ProcedureActivityLookup, + interval_seconds: float, +) -> None: + """Periodic watch loop. A failed tick is logged; the next tick retries.""" + while True: + try: + await _watch_tick( + deps=deps, + list_procedures=list_procedures, + activity_lookup=activity_lookup, + ) + except asyncio.CancelledError: + raise + except Exception: + _log.exception("procedure_watcher.tick_failed") + await asyncio.sleep(interval_seconds) + + +@contextlib.asynccontextmanager +async def procedure_watcher_lifespan( + deps: Kernel, + *, + list_procedures: ListProceduresHandler, + activity_lookup: ProcedureActivityLookup | None = None, + interval_seconds: float | None = None, +) -> AsyncGenerator[None]: + """Spawn the ProcedureWatcher loop for the duration of the context. + + No-op unless `settings.procedure_watcher_enabled` is True (default off, so a + deployment opts in explicitly). `activity_lookup` defaults to the BC-local + `_default_activity_lookup(deps)` (Postgres when a pool is present). + """ + if not deps.settings.procedure_watcher_enabled: + _log.info("procedure_watcher.skipped", reason="disabled") + yield + return + + lookup = activity_lookup if activity_lookup is not None else _default_activity_lookup(deps) + interval = ( + interval_seconds + if interval_seconds is not None + else deps.settings.procedure_watcher_tick_seconds + ) + _log.info("procedure_watcher.started", interval_seconds=interval) + task = asyncio.create_task( + _watch_loop(deps, list_procedures, lookup, interval), + name="procedure-watcher", + ) + try: + yield + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + _log.info("procedure_watcher.stopped") + + +__all__ = ["is_stalled", "procedure_watcher_lifespan"] diff --git a/apps/api/src/cora/api/main.py b/apps/api/src/cora/api/main.py index 7b4ba94f2d..91f91f7540 100644 --- a/apps/api/src/cora/api/main.py +++ b/apps/api/src/cora/api/main.py @@ -59,6 +59,7 @@ seed_caution_promoter_agent, seed_clearance_expirer_agent, seed_clearance_watcher_agent, + seed_procedure_watcher_agent, seed_run_debriefer_agent, seed_run_supervisor_agent, wire_agent, @@ -71,6 +72,7 @@ from cora.api._conduct_run_tool import register_conduct_run_tools from cora.api._enclosure_permit_observer import ControlPortEnclosureObserver from cora.api._inference_recorder import DelegatingInferenceRecorder +from cora.api._procedure_watcher import procedure_watcher_lifespan from cora.api._run_supervisor import run_supervisor_lifespan from cora.api.middleware import BodySizeLimitMiddleware from cora.api.protected_resource_metadata import register_protected_resource_metadata_route @@ -800,6 +802,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: await seed_clearance_watcher_agent(deps) # same shape for CalibrationWatcher (deterministic flag-only agent). await seed_calibration_watcher_agent(deps) + # same shape for ProcedureWatcher (deterministic flag-only agent). + await seed_procedure_watcher_agent(deps) # Drain Federation-owned projections so the Postgres-backed # FacilityLookup.list_active() resolves the self-Facility row @@ -876,6 +880,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: deps, list_calibrations=app.state.calibration.list_calibrations, ), + procedure_watcher_lifespan( + deps, + list_procedures=app.state.operation.list_procedures, + ), ): yield finally: diff --git a/apps/api/src/cora/decision/aggregates/decision/__init__.py b/apps/api/src/cora/decision/aggregates/decision/__init__.py index 26df2333c0..f30f167827 100644 --- a/apps/api/src/cora/decision/aggregates/decision/__init__.py +++ b/apps/api/src/cora/decision/aggregates/decision/__init__.py @@ -47,6 +47,7 @@ DECISION_CONTEXT_MAX_LENGTH, DECISION_CONTEXT_POLICY_GRANT, DECISION_CONTEXT_PROCEDURE_EXECUTION, + DECISION_CONTEXT_PROCEDURE_PROGRESS, DECISION_CONTEXT_RECIPE_APPROVAL, DECISION_CONTEXT_RESOURCE_ALLOCATION, DECISION_CONTEXT_RUN_ABORT, @@ -61,6 +62,7 @@ DECISION_REASONING_SIGNATURE_MAX_LENGTH, DECISION_RULE_MAX_LENGTH, LOGBOOK_KIND_INFERENCE, + PROCEDURE_PROGRESS_CHOICES, RUN_DEBRIEF_CHOICES, RUN_SUPERVISION_CHOICES, CalibrationVerificationChoice, @@ -94,6 +96,7 @@ InvalidDecisionReasoningError, InvalidDecisionRuleError, InvalidReasoningSignatureError, + ProcedureProgressChoice, RunDebriefChoice, RunSupervisionChoice, confidence_band, @@ -127,6 +130,7 @@ "DECISION_CONTEXT_MAX_LENGTH", "DECISION_CONTEXT_POLICY_GRANT", "DECISION_CONTEXT_PROCEDURE_EXECUTION", + "DECISION_CONTEXT_PROCEDURE_PROGRESS", "DECISION_CONTEXT_RECIPE_APPROVAL", "DECISION_CONTEXT_RESOURCE_ALLOCATION", "DECISION_CONTEXT_RUN_ABORT", @@ -148,6 +152,7 @@ "DECISION_RULE_MAX_LENGTH", "INFERENCE_LOGBOOK_SCHEMA", "LOGBOOK_KIND_INFERENCE", + "PROCEDURE_PROGRESS_CHOICES", "RUN_DEBRIEF_CHOICES", "RUN_SUPERVISION_CHOICES", "CalibrationVerificationChoice", @@ -191,6 +196,7 @@ "InvalidDecisionRuleError", "InvalidReasoningSignatureError", "PostgresInferenceStore", + "ProcedureProgressChoice", "RunDebriefChoice", "RunSupervisionChoice", "confidence_band", diff --git a/apps/api/src/cora/decision/aggregates/decision/state.py b/apps/api/src/cora/decision/aggregates/decision/state.py index 5491a0609d..18d3f3d2dc 100644 --- a/apps/api/src/cora/decision/aggregates/decision/state.py +++ b/apps/api/src/cora/decision/aggregates/decision/state.py @@ -466,6 +466,32 @@ CALIBRATION_VERIFICATION_CHOICES: Final = frozenset({"Stale"}) +# Context for the ProcedureWatcher agent's Decisions about the PROCEDURE-PROGRESS +# lifecycle dimension it patrols: whether an in-conduct procedure is still +# advancing, or has sat in Running / Held past the operator window without +# progressing. Open-ended convention identical to ClearanceProgress / +# CalibrationVerification; the closed choice vocabulary lives in the +# `ProcedureProgressChoice` Literal below. FLAG-ONLY: the agent records one +# Decision per stall episode and issues NO command. The context noun is +# `ProcedureProgress` (the lifecycle dimension being judged, family-clean with +# ClearanceProgress; naming-r3 chose it over the state-noun `ProcedureStall` and +# over the existing conduct-decision context `ProcedureExecution`); the agent kind +# is `ProcedureWatcher` (the doer) -- the same dimension-vs-doer asymmetry as the +# Clearance and Calibration pairs. +DECISION_CONTEXT_PROCEDURE_PROGRESS = "ProcedureProgress" + + +# Closed `choice` value set for `context = "ProcedureProgress"` Decisions. +# Projection-validated, not domain-enforced. Single value today (the agent only +# ever flags a stalled procedure); the Literal exists for symmetry and so a +# future qualified disposition can land additively. `Stall` is globally unique in +# the shared, filtered DecisionChoice projection column (naming-r3: `Flag` is +# taken by ClearanceProgress and `Stale` by CalibrationVerification, so this +# context owns its own disposition token). +ProcedureProgressChoice = Literal["Stall"] +PROCEDURE_PROGRESS_CHOICES: Final = frozenset({"Stall"}) + + # acceptance-signal capture: closed 3-value rating set on # the new `DecisionRated` event. `useful` and `misleading` are # operator-affirmative; `ignored` is a positive marker ("operator saw diff --git a/apps/api/src/cora/infrastructure/config.py b/apps/api/src/cora/infrastructure/config.py index 86aa5baf82..732f0faea8 100644 --- a/apps/api/src/cora/infrastructure/config.py +++ b/apps/api/src/cora/infrastructure/config.py @@ -290,6 +290,17 @@ class Settings(BaseSettings): calibration_watcher_tick_seconds: float = 300.0 calibration_watcher_stale_after_seconds: float = 2592000.0 + # `procedure_watcher_enabled` gates the ProcedureWatcher background runtime + # (8th seeded agent, deterministic flag-only). Default off: deployments opt in + # explicitly. `procedure_watcher_tick_seconds` is the sweep cadence (>= 0.1s). + # `procedure_watcher_stale_after_seconds` is how long an in-conduct procedure + # (Running / Held) may sit without progressing before it is flagged; live + # conduct is far shorter-lived than a clearance or calibration, so the default + # is an hour (off by default; an operator sets the real window on enable). + procedure_watcher_enabled: bool = False + procedure_watcher_tick_seconds: float = 300.0 + procedure_watcher_stale_after_seconds: float = 3600.0 + # Edge auth # `identity_providers` is the list of IdPs CORA accepts tokens # from. Empty (default) keeps the legacy X-Principal-Id-with- @@ -628,3 +639,28 @@ def _validate_calibration_watcher_stale_after_seconds(cls, value: float) -> floa ) raise ValueError(msg) return value + + @field_validator("procedure_watcher_tick_seconds") + @classmethod + def _validate_procedure_watcher_tick_seconds(cls, value: float) -> float: + """Floor of 0.1s prevents a tight watch-sweep loop.""" + if value < 0.1: + msg = ( + f"procedure_watcher_tick_seconds must be >= 0.1, got {value}; " + "values below 100ms would tight-loop the watcher" + ) + raise ValueError(msg) + return value + + @field_validator("procedure_watcher_stale_after_seconds") + @classmethod + def _validate_procedure_watcher_stale_after_seconds(cls, value: float) -> float: + """Must be positive: a non-positive window would flag every in-conduct + procedure.""" + if value <= 0: + msg = ( + f"procedure_watcher_stale_after_seconds must be > 0, got {value}; " + "a non-positive window would flag every in-conduct procedure" + ) + raise ValueError(msg) + return value diff --git a/apps/api/src/cora/operation/adapters/postgres_procedure_activity_lookup.py b/apps/api/src/cora/operation/adapters/postgres_procedure_activity_lookup.py new file mode 100644 index 0000000000..ddb58783ea --- /dev/null +++ b/apps/api/src/cora/operation/adapters/postgres_procedure_activity_lookup.py @@ -0,0 +1,48 @@ +"""asyncpg `ProcedureActivityLookup` over entries_operation_procedure_activities. + +Queries the activity entry table directly for the newest `recorded_at` of +one procedure: the read the ProcedureWatcher anti-false-flag fold needs +and the write-only `ActivityStore` does not provide. No projection (a +projection would cost a permanent fold to serve a rare per-tick point +read; the watcher only folds for a Running candidate already past its +status-timestamp window). + +Keys on `recorded_at` (the CORA write-time trust anchor, not the +spoofable `sampled_at`) and rides the +`entries_operation_procedure_activities_proc_recorded_idx` +`(procedure_id, recorded_at DESC)` btree added alongside this adapter. +The pre-existing indexes are keyed on `sampled_at` (plus a BRIN on +recorded_at), so none serves a procedure-scoped max(recorded_at) without +scanning the procedure's full activity history. +""" + +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false +# asyncpg's stubs are loose; suppress at module level for the adapter. + +from uuid import UUID + +import asyncpg + +from cora.operation.ports.procedure_activity_lookup import ProcedureActivityRecency + +_RECENCY_SQL = """ +SELECT max(recorded_at) AS latest_recorded_at +FROM entries_operation_procedure_activities +WHERE procedure_id = $1 +""" + + +class PostgresProcedureActivityLookup: + """Production `ProcedureActivityLookup`; reads the activity entry table.""" + + def __init__(self, pool: asyncpg.Pool) -> None: + self._pool = pool + + async def read_procedure_activity_recency( + self, *, procedure_id: UUID + ) -> ProcedureActivityRecency: + async with self._pool.acquire() as conn: + row = await conn.fetchrow(_RECENCY_SQL, procedure_id) + # max() over an empty set returns one row with a NULL aggregate. + latest = row["latest_recorded_at"] if row is not None else None + return ProcedureActivityRecency(latest_recorded_at=latest) diff --git a/apps/api/src/cora/operation/ports/__init__.py b/apps/api/src/cora/operation/ports/__init__.py index e1a3f444cf..417553a3b6 100644 --- a/apps/api/src/cora/operation/ports/__init__.py +++ b/apps/api/src/cora/operation/ports/__init__.py @@ -44,6 +44,11 @@ Reading, ReadingKind, ) +from cora.operation.ports.procedure_activity_lookup import ( + InMemoryProcedureActivityLookup, + ProcedureActivityLookup, + ProcedureActivityRecency, +) __all__ = [ "ArtifactNotFoundError", @@ -62,9 +67,12 @@ "ControlTimeoutError", "ControlValueCoercionError", "ControlWriteRejectedError", + "InMemoryProcedureActivityLookup", "JobId", "JobSpec", "NoAdapterForAddressError", + "ProcedureActivityLookup", + "ProcedureActivityRecency", "Quality", "Reading", "ReadingKind", diff --git a/apps/api/src/cora/operation/ports/procedure_activity_lookup.py b/apps/api/src/cora/operation/ports/procedure_activity_lookup.py new file mode 100644 index 0000000000..631925d596 --- /dev/null +++ b/apps/api/src/cora/operation/ports/procedure_activity_lookup.py @@ -0,0 +1,108 @@ +"""Operation-BC-local read port over a Procedure's append-only activity log. + +The read half a watcher needs that the write side does not provide. The +activity entries written by `append_activities` land in +`entries_operation_procedure_activities` via a write-only `ActivityStore`, +and the procedure aggregate stream carries only the one-time +`ProcedureActivitiesLogbookOpened` marker, not per-activity recency. So +"when did this procedure last log activity" has no existing read path. + +## Why it exists: the ProcedureWatcher anti-false-flag fold + +ProcedureWatcher flags a Running / Held procedure that has sat past an +operator window without progressing, keyed on +`proj_operation_procedure_summary.last_status_changed_at`. But appending +activity steps does NOT advance that timestamp: the projection touches +`last_status_changed_at` only on real lifecycle transitions and NO-OPs it +for `ProcedureActivitiesLogbookOpened` / `ProcedureIterationStarted` +(activity is orthogonal to lifecycle). So a Running procedure actively +logging steps for hours looks frozen by its status timestamp alone. +Keying on it without folding in activity recency would FALSE-FLAG an +actively-progressing conduct, a foolable watchdog that is worse than +none. This read returns the newest activity `recorded_at` so the watcher +folds it in before flagging, exactly as ClearanceWatcher folds the latest +ReviewStep.decided_at for an UnderReview clearance. + +## BC-local, not promoted to infrastructure/ports + +The sole consumer is the composition-root ProcedureWatcher, which already +imports Operation-BC symbols directly; the data-owning sibling +`ActivityStore` is itself BC-internal. So this read counterpart lives +beside the BC, mirroring the RunChannelLookup single-root-consumer +precedent. Promote to `infrastructure/ports/` only on a real second +cross-BC consumer (rule-of-three). + +## recorded_at is the trust anchor, not sampled_at + +`recorded_at` is the Postgres write time (`DEFAULT now()`, CORA-owned); +`sampled_at` is the producer's phenomenonTime and is spoofable / +backfillable. The anti-false-flag fold keys on `recorded_at` so a +producer cannot backdate an entry to make a stalled conduct look active. +This mirrors RunChannelLookup keying freshness on `recorded_at`. +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Protocol +from uuid import UUID + + +@dataclass(frozen=True) +class ProcedureActivityRecency: + """Newest activity recorded_at for one Procedure (the anti-false-flag fold). + + `latest_recorded_at` is None exactly when the procedure has never + logged an activity entry: the cannot-tell case in which the watcher + keeps the status timestamp as the staleness clock. Wrapped in a + dataclass rather than a bare `datetime | None` for parity with + RunFeedHealth and so a future field (e.g. an arrival count) can land + additively. + """ + + latest_recorded_at: datetime | None + + +class ProcedureActivityLookup(Protocol): + """Read a Procedure's append-only activity log for the liveness fold. + + One method: the newest activity `recorded_at`. Production adapter: + `PostgresProcedureActivityLookup` (operation/adapters/), backed by + querying the existing `entries_operation_procedure_activities` table. + """ + + async def read_procedure_activity_recency( + self, *, procedure_id: UUID + ) -> ProcedureActivityRecency: + """Newest activity `recorded_at` for `procedure_id`; latest_recorded_at + is None when the procedure has never logged an activity entry.""" + ... + + +class InMemoryProcedureActivityLookup: + """Dict-backed, seedable `ProcedureActivityLookup` for unit tests. + + An unseeded instance is the always-quiet default: reads return a None + recency, so the watcher tick is testable with no activity recorded. + Seeded via `register(...)`, which carries an explicit `recorded_at` + (the read surfaces recorded_at, which the write-model activity entry + does not expose to the watcher). + """ + + def __init__(self) -> None: + self._rows: dict[UUID, list[datetime]] = {} + + def register(self, *, procedure_id: UUID, recorded_at: datetime) -> None: + self._rows.setdefault(procedure_id, []).append(recorded_at) + + async def read_procedure_activity_recency( + self, *, procedure_id: UUID + ) -> ProcedureActivityRecency: + rows = self._rows.get(procedure_id) + return ProcedureActivityRecency(latest_recorded_at=max(rows) if rows else None) + + +__all__ = [ + "InMemoryProcedureActivityLookup", + "ProcedureActivityLookup", + "ProcedureActivityRecency", +] diff --git a/apps/api/tests/integration/test_procedure_activity_lookup_postgres.py b/apps/api/tests/integration/test_procedure_activity_lookup_postgres.py new file mode 100644 index 0000000000..1c7a6aeb6d --- /dev/null +++ b/apps/api/tests/integration/test_procedure_activity_lookup_postgres.py @@ -0,0 +1,84 @@ +"""Integration: PostgresProcedureActivityLookup against real Postgres. + +Seeds rows through PostgresActivityStore (so recorded_at is the real DB +DEFAULT now()), then exercises the recency read. The max assertion keys +on the real DB-assigned recorded_at (read back from the table) so it is +deterministic, not clock-guessed. +""" + +# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false + +import asyncio +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import asyncpg +import pytest + +from cora.operation.adapters.postgres_procedure_activity_lookup import ( + PostgresProcedureActivityLookup, +) +from cora.operation.aggregates.procedure.entries import Activity, PostgresActivityStore + +_NOW = datetime(2026, 6, 22, 12, 0, 0, tzinfo=UTC) + + +def _activity(procedure_id: UUID, logbook_id: UUID) -> Activity: + return Activity( + event_id=uuid4(), + procedure_id=procedure_id, + logbook_id=logbook_id, + actor_id=uuid4(), + command_name="AppendProcedureActivities", + step_kind="action", + payload={"note": "step"}, + sampled_at=_NOW, + occurred_at=_NOW, + correlation_id=uuid4(), + causation_id=None, + ) + + +async def _recorded_at(pool: asyncpg.Pool, event_id: UUID) -> datetime: + async with pool.acquire() as conn: + return await conn.fetchval( + "SELECT recorded_at FROM entries_operation_procedure_activities WHERE event_id = $1", + event_id, + ) + + +@pytest.mark.integration +async def test_recency_is_none_for_procedure_with_no_activity(db_pool: asyncpg.Pool) -> None: + lookup = PostgresProcedureActivityLookup(db_pool) + recency = await lookup.read_procedure_activity_recency(procedure_id=uuid4()) + assert recency.latest_recorded_at is None + + +@pytest.mark.integration +async def test_recency_returns_max_recorded_at(db_pool: asyncpg.Pool) -> None: + procedure_id = uuid4() + logbook_id = uuid4() + store = PostgresActivityStore(db_pool) + lookup = PostgresProcedureActivityLookup(db_pool) + + first = _activity(procedure_id, logbook_id) + await store.append([first]) + await asyncio.sleep(0.01) # guarantee a strictly later recorded_at + second = _activity(procedure_id, logbook_id) + await store.append([second]) + + recency = await lookup.read_procedure_activity_recency(procedure_id=procedure_id) + assert recency.latest_recorded_at == await _recorded_at(db_pool, second.event_id) + + +@pytest.mark.integration +async def test_recency_is_scoped_per_procedure(db_pool: asyncpg.Pool) -> None: + procedure_id = uuid4() + other = uuid4() + store = PostgresActivityStore(db_pool) + lookup = PostgresProcedureActivityLookup(db_pool) + + await store.append([_activity(procedure_id, uuid4())]) + + recency = await lookup.read_procedure_activity_recency(procedure_id=other) + assert recency.latest_recorded_at is None diff --git a/apps/api/tests/unit/agent/test_procedure_watcher_seed.py b/apps/api/tests/unit/agent/test_procedure_watcher_seed.py new file mode 100644 index 0000000000..5936fca55b --- /dev/null +++ b/apps/api/tests/unit/agent/test_procedure_watcher_seed.py @@ -0,0 +1,105 @@ +"""Unit tests for the ProcedureWatcher Agent bootstrap seed. + +ProcedureWatcher is the eighth seeded agent and (with ClearanceWatcher and +CalibrationWatcher) a deterministic flag-only watcher: no prompt template and a +sentinel ModelRef (it is rule-based, a periodic staleness comparison, never +builds an LLM). These tests pin that shape alongside the shared seed scaffolding. +""" + +from datetime import UTC, datetime + +import pytest + +from cora.agent.aggregates.agent import load_agent +from cora.agent.seed_procedure_watcher import ( + PROCEDURE_WATCHER_AGENT_ID, + PROCEDURE_WATCHER_AGENT_KIND, + PROCEDURE_WATCHER_AGENT_NAME, + PROCEDURE_WATCHER_AGENT_VERSION, + seed_procedure_watcher_agent, +) +from cora.infrastructure.config import Settings +from cora.infrastructure.deps import make_inmemory_kernel +from cora.infrastructure.kernel import Kernel +from cora.infrastructure.ports import AllowAllAuthorize, FakeClock, FixedIdGenerator + + +def _kernel() -> Kernel: + settings = Settings() # type: ignore[call-arg] + return make_inmemory_kernel( + settings=settings, + clock=FakeClock(datetime(2026, 6, 22, 14, 0, 0, tzinfo=UTC)), + id_generator=FixedIdGenerator([]), + authz=AllowAllAuthorize(), + ) + + +@pytest.mark.unit +async def test_seed_creates_procedure_watcher_at_pinned_id() -> None: + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + + agent = await load_agent(kernel.event_store, PROCEDURE_WATCHER_AGENT_ID) + assert agent is not None + assert agent.id == PROCEDURE_WATCHER_AGENT_ID + assert agent.name.value == PROCEDURE_WATCHER_AGENT_NAME + assert agent.kind.value == PROCEDURE_WATCHER_AGENT_KIND + assert agent.version.value == PROCEDURE_WATCHER_AGENT_VERSION + + +@pytest.mark.unit +async def test_seed_is_deterministic_no_prompt_sentinel_model() -> None: + """Deterministic agent: no prompt template, sentinel (non-LLM) model_ref.""" + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + + agent = await load_agent(kernel.event_store, PROCEDURE_WATCHER_AGENT_ID) + assert agent is not None + assert agent.prompt_template_id is None + assert agent.model_ref.provider == "deterministic" + assert agent.model_ref.model == "agent:ProcedureWatcher:v1" + + +@pytest.mark.unit +async def test_seed_creates_co_registered_actor() -> None: + """The cross-BC genesis: Actor (kind=agent) at the pinned id.""" + from cora.access.aggregates.actor import load_actor + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + + actor = await load_actor(kernel.event_store, PROCEDURE_WATCHER_AGENT_ID) + assert actor is not None + assert actor.id == PROCEDURE_WATCHER_AGENT_ID + assert actor.kind.value == "agent" + + +@pytest.mark.unit +async def test_seed_is_idempotent() -> None: + """Re-running the seed is a no-op (ConcurrencyError-as-success pattern).""" + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + await seed_procedure_watcher_agent(kernel) + + +@pytest.mark.unit +async def test_procedure_watcher_id_distinct_from_all_prior_agents() -> None: + """The eight seeded agents share the UUID-range scheme but must NOT collide.""" + from cora.agent.seed import RUN_DEBRIEFER_AGENT_ID + from cora.agent.seed_calibration_watcher import CALIBRATION_WATCHER_AGENT_ID + from cora.agent.seed_caution_drafter import CAUTION_DRAFTER_AGENT_ID + from cora.agent.seed_caution_promoter import CAUTION_PROMOTER_AGENT_ID + from cora.agent.seed_clearance_expirer import CLEARANCE_EXPIRER_AGENT_ID + from cora.agent.seed_clearance_watcher import CLEARANCE_WATCHER_AGENT_ID + from cora.agent.seed_run_supervisor import RUN_SUPERVISOR_AGENT_ID + + prior = { + RUN_DEBRIEFER_AGENT_ID, + CAUTION_DRAFTER_AGENT_ID, + RUN_SUPERVISOR_AGENT_ID, + CAUTION_PROMOTER_AGENT_ID, + CLEARANCE_EXPIRER_AGENT_ID, + CLEARANCE_WATCHER_AGENT_ID, + CALIBRATION_WATCHER_AGENT_ID, + } + assert PROCEDURE_WATCHER_AGENT_ID not in prior diff --git a/apps/api/tests/unit/api/test_procedure_watcher.py b/apps/api/tests/unit/api/test_procedure_watcher.py new file mode 100644 index 0000000000..930a03b483 --- /dev/null +++ b/apps/api/tests/unit/api/test_procedure_watcher.py @@ -0,0 +1,408 @@ +"""Tests for the ProcedureWatcher runtime (cora.api._procedure_watcher). + +Covers the pure staleness rule (is_stalled) on both sides of the inclusive +boundary, plus a fakes-driven tick that exercises the drain -> flag Decision +loop for both watched statuses, the Running activity-recency fold (the +anti-false-flag guard), the Held no-fold path, the defensive status guard, the +Actor.active revocation gate, idempotency, and the disabled no-op. +""" + +# white-box test of the runtime internals (private functions / constants) +# pyright: reportPrivateUsage=false + +import asyncio +from datetime import UTC, datetime, timedelta +from uuid import UUID, uuid4 + +import pytest + +from cora.agent.seed_procedure_watcher import ( + PROCEDURE_WATCHER_AGENT_ID, + seed_procedure_watcher_agent, +) +from cora.api._procedure_watcher import ( + _derive_decision_id, + is_stalled, + procedure_watcher_lifespan, +) +from cora.decision.aggregates.decision import load_decision +from cora.infrastructure.adapters.in_memory_event_store import InMemoryEventStore +from cora.infrastructure.config import Settings +from cora.infrastructure.deps import make_inmemory_kernel +from cora.infrastructure.kernel import Kernel +from cora.infrastructure.ports import AllowAllAuthorize, FakeClock, UUIDv7Generator +from cora.infrastructure.routing import NIL_SENTINEL_ID +from cora.operation.features.list_procedures import ( + ListProcedures, + ProcedureListPage, + ProcedureSummaryItem, +) +from cora.operation.ports import InMemoryProcedureActivityLookup +from cora.shared.identity import ActorId + +_NOW = datetime(2026, 6, 22, 12, 0, 0, tzinfo=UTC) +_STALE_AFTER = 3600.0 # 1 hour +_OLD = _NOW - timedelta(hours=2) # clearly stale at a 1-hour window +_RECENT = _NOW - timedelta(minutes=1) # fresh +_STILL_STALE = _NOW - timedelta(minutes=90) # newer than _OLD but still > window +_BOUNDARY = _NOW - timedelta(seconds=int(_STALE_AFTER)) + + +# ---------- pure rule: is_stalled ---------- + + +@pytest.mark.unit +def test_is_stalled_when_status_old() -> None: + assert is_stalled(_OLD, _NOW, _STALE_AFTER) is True + + +@pytest.mark.unit +def test_is_not_stalled_when_status_recent() -> None: + assert is_stalled(_RECENT, _NOW, _STALE_AFTER) is False + + +@pytest.mark.unit +def test_stalled_is_inclusive_at_boundary() -> None: + """Elapsed == window FLAGS (inclusive >=); pins the `>`-vs-`>=` mutant.""" + assert is_stalled(_BOUNDARY, _NOW, _STALE_AFTER) is True + + +@pytest.mark.unit +def test_not_stalled_just_under_boundary() -> None: + just_under = _NOW - timedelta(seconds=int(_STALE_AFTER) - 1) + assert is_stalled(just_under, _NOW, _STALE_AFTER) is False + + +# ---------- tick: full loop with fakes ---------- + + +def _kernel(*, enabled: bool = False, stale_after: float = _STALE_AFTER) -> Kernel: + settings = Settings( # type: ignore[call-arg] + procedure_watcher_enabled=enabled, + procedure_watcher_stale_after_seconds=stale_after, + ) + return make_inmemory_kernel( + settings=settings, + clock=FakeClock(_NOW), + id_generator=UUIDv7Generator(), + authz=AllowAllAuthorize(), + ) + + +def _item( + procedure_id: UUID, + *, + status: str = "Running", + last_status_changed_at: datetime | None, +) -> ProcedureSummaryItem: + return ProcedureSummaryItem( + procedure_id=procedure_id, + name="scan-01", + kind="tomography", + target_asset_ids=[uuid4()], + parent_run_id=uuid4(), + status=status, + activity_logbook_id=uuid4(), + registered_at=_OLD, + last_status_changed_at=last_status_changed_at, + last_status_reason=None, + interrupted_at=None, + iteration_count=0, + ) + + +def _make_list_procedures(items: list[ProcedureSummaryItem], *, honor_filter: bool = True): + async def list_procedures( + query: ListProcedures, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> ProcedureListPage: + if honor_filter and query.status is not None: + matching = [i for i in items if i.status == query.status] + else: + matching = list(items) + return ProcedureListPage(items=matching, next_cursor=None) + + return list_procedures + + +@pytest.mark.unit +async def test_tick_flags_stale_running_with_no_activity() -> None: + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + decision = await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) + assert decision is not None + assert decision.context.value == "ProcedureProgress" + assert decision.choice.value == "Stall" + assert decision.decided_by == ActorId(PROCEDURE_WATCHER_AGENT_ID) + + +@pytest.mark.unit +async def test_tick_flags_stale_held_without_folding_activity() -> None: + """A Held conduct accepts no activity, so it is clocked on its status + timestamp directly; a (defensively seeded) recent activity is ignored.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures( + [_item(pid, status="Held", last_status_changed_at=_OLD)] + ) + lookup = InMemoryProcedureActivityLookup() + lookup.register(procedure_id=pid, recorded_at=_RECENT) # must NOT rescue a Held + + await _watch_tick(deps=kernel, list_procedures=list_procedures, activity_lookup=lookup) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is not None + + +@pytest.mark.unit +async def test_tick_does_not_flag_running_with_recent_activity() -> None: + """The anti-false-flag fold: a Running procedure that looks stale by its + status timestamp but is actively logging activity is NOT flagged.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + lookup = InMemoryProcedureActivityLookup() + lookup.register(procedure_id=pid, recorded_at=_RECENT) + + await _watch_tick(deps=kernel, list_procedures=list_procedures, activity_lookup=lookup) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _RECENT)) is None + + +@pytest.mark.unit +async def test_tick_flags_running_when_latest_activity_also_stale() -> None: + """The fold folds in activity recency but still flags when even the newest + activity is past the window; the episode keys on that folded timestamp.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + lookup = InMemoryProcedureActivityLookup() + lookup.register(procedure_id=pid, recorded_at=_STILL_STALE) # newer than _OLD, still stale + + await _watch_tick(deps=kernel, list_procedures=list_procedures, activity_lookup=lookup) + + # Episode keys on the folded last_progress_at (the latest activity), not _OLD. + assert ( + await load_decision(kernel.event_store, _derive_decision_id(pid, _STILL_STALE)) is not None + ) + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + + +@pytest.mark.unit +async def test_tick_skips_fresh_running() -> None: + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_RECENT)]) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _RECENT)) is None + + +async def _procedure_progress_decision_count(kernel: Kernel) -> int: + """Count ProcedureProgress Decisions written to the event store.""" + store = kernel.event_store + assert isinstance(store, InMemoryEventStore) + count = 0 + for stream_type, stream_id in store._streams: + if stream_type != "Decision": + continue + decision = await load_decision(store, stream_id) + if decision is not None and decision.context.value == "ProcedureProgress": + count += 1 + return count + + +@pytest.mark.unit +async def test_tick_does_not_flag_when_status_timestamp_missing() -> None: + """cannot-tell -> defer: a row with no last_status_changed_at is skipped, so + no ProcedureProgress Decision is written.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=None)]) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + assert await _procedure_progress_decision_count(kernel) == 0 + + +@pytest.mark.unit +async def test_tick_skips_terminal_status_even_if_unfiltered() -> None: + """Defensive guard: a non-watched status (Completed) is never flagged, even + if the drain returned it. Pins the status check against a filter widening.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures( + [_item(pid, status="Completed", last_status_changed_at=_OLD)], honor_filter=False + ) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + + +@pytest.mark.unit +async def test_record_decision_is_idempotent_on_repeated_episode() -> None: + """Re-flagging the same stall episode is a ConcurrencyError no-op, not a crash.""" + from cora.api._procedure_watcher import _record_decision + + kernel = _kernel() + pid = uuid4() + await _record_decision( + kernel, procedure_id=pid, status="Running", last_progress_at=_OLD, now=_NOW + ) + await _record_decision( + kernel, procedure_id=pid, status="Running", last_progress_at=_OLD, now=_NOW + ) + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is not None + + +@pytest.mark.unit +async def test_tick_is_noop_when_watcher_actor_absent() -> None: + """Revocation gate: with no seeded (active) ProcedureWatcher Actor, do nothing.""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() # NOT seeded + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + + +@pytest.mark.unit +def test_default_activity_lookup_is_in_memory_without_a_pool() -> None: + """The production lookup selector (used when the lifespan is not handed an + explicit activity_lookup) builds the in-memory stub when there is no pool, + which is the path a pool-less deployment / the test kernel takes.""" + from cora.api._procedure_watcher import _default_activity_lookup + + kernel = _kernel() # make_inmemory_kernel -> pool is None + assert isinstance(_default_activity_lookup(kernel), InMemoryProcedureActivityLookup) + + +@pytest.mark.unit +async def test_lifespan_is_noop_when_disabled() -> None: + """Default settings (procedure_watcher_enabled=False): clean no-op, no task.""" + kernel = _kernel() + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + + async with procedure_watcher_lifespan(kernel, list_procedures=list_procedures): + pass + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + + +@pytest.mark.unit +async def test_lifespan_enabled_runs_the_loop_and_flags() -> None: + """Enabled: the lifespan spawns the loop, which flags a stale procedure.""" + kernel = _kernel(enabled=True) + await seed_procedure_watcher_agent(kernel) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + + async with procedure_watcher_lifespan( + kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + interval_seconds=0.01, + ): + await asyncio.sleep(0.1) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is not None + + +@pytest.mark.unit +async def test_loop_survives_a_failing_tick() -> None: + """A tick that raises is logged and the loop keeps going; lifespan exits cleanly.""" + kernel = _kernel(enabled=True) + await seed_procedure_watcher_agent(kernel) + + async def failing_list_procedures( + query: ListProcedures, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> ProcedureListPage: + raise RuntimeError("list_procedures boom") + + async with procedure_watcher_lifespan( + kernel, + list_procedures=failing_list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + interval_seconds=0.01, + ): + await asyncio.sleep(0.05) + + +@pytest.mark.unit +def test_procedure_watcher_tick_seconds_rejects_sub_floor() -> None: + with pytest.raises(ValueError, match="procedure_watcher_tick_seconds"): + Settings(procedure_watcher_tick_seconds=0.05) # type: ignore[call-arg] + + +@pytest.mark.unit +def test_procedure_watcher_stale_after_rejects_non_positive() -> None: + with pytest.raises(ValueError, match="procedure_watcher_stale_after_seconds"): + Settings(procedure_watcher_stale_after_seconds=0.0) # type: ignore[call-arg] + + +@pytest.mark.unit +def test_procedure_watcher_settings_accept_valid() -> None: + settings = Settings( # type: ignore[call-arg] + procedure_watcher_tick_seconds=120.0, + procedure_watcher_stale_after_seconds=7200.0, + ) + assert settings.procedure_watcher_tick_seconds == 120.0 + assert settings.procedure_watcher_stale_after_seconds == 7200.0 diff --git a/apps/api/tests/unit/decision/test_procedure_progress_vocab.py b/apps/api/tests/unit/decision/test_procedure_progress_vocab.py new file mode 100644 index 0000000000..8de6c7d2da --- /dev/null +++ b/apps/api/tests/unit/decision/test_procedure_progress_vocab.py @@ -0,0 +1,69 @@ +"""Tests for the ProcedureProgress Decision vocabulary. + +Covers the DECISION_CONTEXT_PROCEDURE_PROGRESS context constant, the closed +PROCEDURE_PROGRESS_CHOICES set, its parity with the ProcedureProgressChoice +Literal, and that `Stall` does not collide in the shared, globally-filtered +DecisionChoice projection column. The ProcedureWatcher agent is flag-only (one +Decision per stall episode); the context noun is `ProcedureProgress` (the +lifecycle dimension) while the agent kind is `ProcedureWatcher` (a deliberate +context-noun-vs-doer asymmetry, not drift). + +The disjointness check unions EVERY closed sibling choice set, including +REACTION_DISMISSAL_CHOICES (imported from the submodule because it is not +re-exported from the package); the calibration vocab test omits it, so this is +the most complete cross-context uniqueness assertion in the suite. +""" + +from typing import get_args + +import pytest + +from cora.decision.aggregates.decision import ( + CALIBRATION_VERIFICATION_CHOICES, + CAUTION_PROMOTION_CHOICES, + CAUTION_PROPOSAL_CHOICES, + CLEARANCE_EXPIRY_CHOICES, + CLEARANCE_PROGRESS_CHOICES, + DECISION_CONTEXT_PROCEDURE_PROGRESS, + PROCEDURE_PROGRESS_CHOICES, + RUN_DEBRIEF_CHOICES, + RUN_SUPERVISION_CHOICES, + ProcedureProgressChoice, +) +from cora.decision.aggregates.decision.state import REACTION_DISMISSAL_CHOICES + + +@pytest.mark.unit +def test_decision_context_procedure_progress_constant() -> None: + assert DECISION_CONTEXT_PROCEDURE_PROGRESS == "ProcedureProgress" + + +@pytest.mark.unit +def test_procedure_progress_choices_closed_set() -> None: + assert frozenset({"Stall"}) == PROCEDURE_PROGRESS_CHOICES + + +@pytest.mark.unit +def test_procedure_progress_choices_match_literal() -> None: + """The frozenset and the Literal stay in lockstep.""" + assert frozenset(get_args(ProcedureProgressChoice)) == PROCEDURE_PROGRESS_CHOICES + + +@pytest.mark.unit +def test_procedure_progress_choice_is_unique_in_shared_namespace() -> None: + """`Stall` does not collide with any sibling context's choice values in the + globally-filtered DecisionChoice projection column (Flag / Stale / Expire / + EventDismissed / ...). naming-r3 chose `Stall` precisely because `Flag` was + taken by ClearanceProgress and `Stale` by CalibrationVerification.""" + siblings = ( + CALIBRATION_VERIFICATION_CHOICES + | CAUTION_PROMOTION_CHOICES + | CAUTION_PROPOSAL_CHOICES + | CLEARANCE_EXPIRY_CHOICES + | CLEARANCE_PROGRESS_CHOICES + | REACTION_DISMISSAL_CHOICES + | RUN_DEBRIEF_CHOICES + | RUN_SUPERVISION_CHOICES + ) + assert PROCEDURE_PROGRESS_CHOICES.isdisjoint(siblings) + assert "Stall" in PROCEDURE_PROGRESS_CHOICES diff --git a/apps/api/tests/unit/operation/test_procedure_activity_lookup.py b/apps/api/tests/unit/operation/test_procedure_activity_lookup.py new file mode 100644 index 0000000000..17467d62b6 --- /dev/null +++ b/apps/api/tests/unit/operation/test_procedure_activity_lookup.py @@ -0,0 +1,52 @@ +"""Unit tests for the ProcedureActivityLookup read port (in-memory stub). + +The stub is the contract the PostgresProcedureActivityLookup adapter +mirrors: an unseeded procedure reads a None recency (cannot-tell, the +watcher keeps the status timestamp as the clock), and a seeded one +returns the newest `recorded_at`, not insertion order. The Postgres +parity is covered in the integration suite. +""" + +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +import pytest + +from cora.operation.ports import InMemoryProcedureActivityLookup + +_PROC = uuid4() +_T0 = datetime(2026, 6, 22, 12, 0, 0, tzinfo=UTC) + + +def _at(seconds: int) -> datetime: + return _T0 + timedelta(seconds=seconds) + + +@pytest.mark.unit +async def test_recency_returns_none_when_procedure_never_logged_activity() -> None: + """An unseeded procedure is the cannot-tell case: the watcher keeps the + status timestamp as the staleness clock rather than folding.""" + lookup = InMemoryProcedureActivityLookup() + recency = await lookup.read_procedure_activity_recency(procedure_id=_PROC) + assert recency.latest_recorded_at is None + + +@pytest.mark.unit +async def test_recency_returns_newest_recorded_at_not_insertion_order() -> None: + """Recency keys on max(recorded_at), independent of registration order.""" + lookup = InMemoryProcedureActivityLookup() + lookup.register(procedure_id=_PROC, recorded_at=_at(30)) + lookup.register(procedure_id=_PROC, recorded_at=_at(10)) + lookup.register(procedure_id=_PROC, recorded_at=_at(20)) + recency = await lookup.read_procedure_activity_recency(procedure_id=_PROC) + assert recency.latest_recorded_at == _at(30) + + +@pytest.mark.unit +async def test_recency_is_scoped_per_procedure() -> None: + """One procedure's activity does not leak into another's recency.""" + other = uuid4() + lookup = InMemoryProcedureActivityLookup() + lookup.register(procedure_id=_PROC, recorded_at=_at(5)) + recency = await lookup.read_procedure_activity_recency(procedure_id=other) + assert recency.latest_recorded_at is None diff --git a/docs/architecture/modules/agent/index.md b/docs/architecture/modules/agent/index.md index fdc3f7bdf6..5b1689162b 100644 --- a/docs/architecture/modules/agent/index.md +++ b/docs/architecture/modules/agent/index.md @@ -1,4 +1,4 @@ -# Agent module stable +# Agent module stable ## Purpose & Scope @@ -16,13 +16,13 @@ An Agent carries five roles: ### The agent fleet -Seven agents are seeded today. They split two ways. By **how they decide**: the two LLM agents (RunDebriefer, CautionDrafter) call a model; the five deterministic agents (RunSupervisor, CautionPromoter, ClearanceExpirer, ClearanceWatcher, CalibrationWatcher) apply a fixed rule and carry a sentinel `model_ref` with no prompt template. By **what they do**: passive agents only advise (they write a Decision and stop); active agents decide and then act, but only by issuing an existing spine command through the same authorized path a human uses, so the resulting record is byte-identical whether a human or the agent acted. +Eight agents are seeded today. They split two ways. By **how they decide**: the two LLM agents (RunDebriefer, CautionDrafter) call a model; the six deterministic agents (RunSupervisor, CautionPromoter, ClearanceExpirer, ClearanceWatcher, CalibrationWatcher, ProcedureWatcher) apply a fixed rule and carry a sentinel `model_ref` with no prompt template. By **what they do**: passive agents only advise (they write a Decision and stop); active agents decide and then act, but only by issuing an existing spine command through the same authorized path a human uses, so the resulting record is byte-identical whether a human or the agent acted. The runtime that drives an agent takes one of three host shapes: - **On-demand slice.** A REST/MCP call invokes the agent directly (`regenerate_run_debrief`). - **Event-triggered subscriber.** A subscriber in the projection worker reacts to a domain event: RunDebriefer and CautionDrafter on a terminal Run, CautionPromoter on a registered `CautionProposal` Decision. -- **Composition-root periodic loop.** A background task sweeps on a timer: RunSupervisor watches in-flight Runs and issues `hold_run` / `resume_run` / `stop_run` as facility beam is lost and returns; ClearanceExpirer sweeps Active clearances and issues `expire_clearance` once a validity window has passed; ClearanceWatcher watches front-of-lifecycle clearances (Submitted, UnderReview, Approved) and records a flag Decision when one stalls past an operator window; CalibrationWatcher watches Provisional calibrations and records a flag Decision when one's newest revision has sat unverified past an operator window. +- **Composition-root periodic loop.** A background task sweeps on a timer: RunSupervisor watches in-flight Runs and issues `hold_run` / `resume_run` / `stop_run` as facility beam is lost and returns; ClearanceExpirer sweeps Active clearances and issues `expire_clearance` once a validity window has passed; ClearanceWatcher watches front-of-lifecycle clearances (Submitted, UnderReview, Approved) and records a flag Decision when one stalls past an operator window; CalibrationWatcher watches Provisional calibrations and records a flag Decision when one's newest revision has sat unverified past an operator window; ProcedureWatcher watches in-conduct procedures (Running, Held) and records a flag Decision when one has sat past an operator window without progressing, folding in the latest activity recency first so an actively-logging Running conduct is not falsely flagged. | Agent | Decides | Host | Acts | |---|---|---|---| @@ -33,8 +33,9 @@ The runtime that drives an agent takes one of three host shapes: | ClearanceExpirer | deterministic | periodic loop | `expire_clearance` + `ClearanceExpiry` Decision | | ClearanceWatcher | deterministic | periodic loop | writes a `ClearanceProgress` flag Decision (passive) | | CalibrationWatcher | deterministic | periodic loop | writes a `CalibrationVerification` (Stale) flag Decision (passive) | +| ProcedureWatcher | deterministic | periodic loop | writes a `ProcedureProgress` (Stall) flag Decision (passive) | -The active runtimes (RunSupervisor, CautionPromoter, ClearanceExpirer) ship off by default, gate every actuation through the Authorize port like any principal, and stand down the moment their Actor is deactivated. None of them reaches past the spine onto the real-time floor: an active agent only issues a command the spine already exposes. ClearanceWatcher and CalibrationWatcher are passive (each records a flag Decision and issues no command) and likewise ship off by default. RunSupervisor additionally carries shadow observe-only rules (run-liveness, plus signal-quality and signal-stall against a live Run's observation channels) that log a would-flag and take no further action; each is a separate opt-in above the agent's own enable, and advise / act promotions are deferred. +The active runtimes (RunSupervisor, CautionPromoter, ClearanceExpirer) ship off by default, gate every actuation through the Authorize port like any principal, and stand down the moment their Actor is deactivated. None of them reaches past the spine onto the real-time floor: an active agent only issues a command the spine already exposes. ClearanceWatcher, CalibrationWatcher, and ProcedureWatcher are passive (each records a flag Decision and issues no command) and likewise ship off by default. RunSupervisor additionally carries shadow observe-only rules (run-liveness, plus signal-quality and signal-stall against a live Run's observation channels) that log a would-flag and take no further action; each is a separate opt-in above the agent's own enable, and advise / act promotions are deferred.
diff --git a/infra/atlas/migrations/20260622010000_add_entries_operation_procedure_activities_proc_recorded_idx.sql b/infra/atlas/migrations/20260622010000_add_entries_operation_procedure_activities_proc_recorded_idx.sql new file mode 100644 index 0000000000..da57c80e3c --- /dev/null +++ b/infra/atlas/migrations/20260622010000_add_entries_operation_procedure_activities_proc_recorded_idx.sql @@ -0,0 +1,22 @@ +-- Procedure-scoped, recorded_at-keyed index for the ProcedureWatcher fold. +-- +-- See [[project-procedure-watcher-design]]. The ProcedureActivityLookup +-- read port serves one query, keyed on recorded_at (the CORA write-time +-- trust anchor, not the spoofable sampled_at) and procedure-scoped: +-- +-- - read_procedure_activity_recency: +-- SELECT max(recorded_at) WHERE procedure_id = $1 +-- +-- This composite btree is LOAD-BEARING for the anti-false-flag fold: the +-- pre-existing indexes on entries_operation_procedure_activities are +-- keyed on sampled_at (plus a BRIN on recorded_at), so none serves a +-- procedure-scoped max(recorded_at) without scanning the procedure's full +-- activity history. The DESC ordering lets the planner satisfy the +-- aggregate from the index head. +-- +-- Additive + forward-only; CREATE INDEX IF NOT EXISTS so a re-run is a +-- no-op. (Not CONCURRENTLY: Atlas wraps each migration file in a +-- transaction, matching every existing index migration in this repo.) + +CREATE INDEX IF NOT EXISTS entries_operation_procedure_activities_proc_recorded_idx + ON entries_operation_procedure_activities (procedure_id, recorded_at DESC); diff --git a/infra/atlas/migrations/atlas.sum b/infra/atlas/migrations/atlas.sum index e37fb9565f..6d1eb075d0 100644 --- a/infra/atlas/migrations/atlas.sum +++ b/infra/atlas/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:Y+l/6SoAljHGZB3HpMSkt1T+g8sLWBXjasyGqKmGdZ8= +h1:49WQSMewzwuL/6HKJr530Qmv7WZF7mVr6MdvB5/Onwo= 20260509120000_init_events.sql h1:GmgCZKfaqXu1m96/cKAks2vhaLWTdEaHTLkFtUo9FXg= 20260509170000_init_idempotency.sql h1:Nbu8DIE4Sv1WiHw3G22+tYffPhKc5Jryw3PMK8wB2zY= 20260510010000_add_event_id.sql h1:RbtYP6uMnOB20zhJ9dNXUi4YVqbmlEzf562pmygnRW8= @@ -150,3 +150,4 @@ h1:Y+l/6SoAljHGZB3HpMSkt1T+g8sLWBXjasyGqKmGdZ8= 20260621040000_init_entries_run_feed_heartbeats.sql h1:MlR+EKgFhxmTKqOpa5DD5WKchzTzwEcDeKoOAC8hTc0= 20260621050000_add_proj_run_summary_rule_inputs.sql h1:W6pzjGGbLEABcxj60nNOyNlTiczU4T+N8mYErqJJwaQ= 20260621060000_proc_summary_status_admit_held.sql h1:XNZsm+19l14iXCiquKSPJ/kMXoSbpuWqojPw+2NFS6o= +20260622010000_add_entries_operation_procedure_activities_proc_recorded_idx.sql h1:BuEvK3A8kfBWl5ih0T3F1Re0tkJAy0KhtzC8Nr/6p1o= From c170c7aa8efe8c1be1dc6ed98dd409606125988d Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Mon, 22 Jun 2026 10:34:35 +0300 Subject: [PATCH 2/2] test(agent): harden ProcedureWatcher drain + kill-switch coverage Folds the two in-scope gaps the fleet review found: - Paginated drain was untested (the fake hardcoded next_cursor=None), so the `cursor = page.next_cursor` continuation never ran; a mutant dropping it survived. Adds test_tick_drains_paginated_procedures: a stale procedure on page 2 is reached only if the cursor advances. - The Actor.active kill switch was only tested via the actor-absent arm. Adds test_tick_is_noop_when_watcher_actor_deactivated: seed then deactivate the agent Actor and assert the tick writes nothing (pins the `not actor.active` disjunct). Also corrects the seed docstring: the watcher issues no write command, but it does issue an authz-gated ListProcedures read each tick, so under a real Authorize policy the agent principal still needs that read grant. Co-Authored-By: Claude Opus 4.8 --- .../src/cora/agent/seed_procedure_watcher.py | 10 +-- .../tests/unit/api/test_procedure_watcher.py | 64 +++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/apps/api/src/cora/agent/seed_procedure_watcher.py b/apps/api/src/cora/agent/seed_procedure_watcher.py index 4dee441544..c4acd78566 100644 --- a/apps/api/src/cora/agent/seed_procedure_watcher.py +++ b/apps/api/src/cora/agent/seed_procedure_watcher.py @@ -17,11 +17,13 @@ - DETERMINISTIC agent (rule-based, NOT LLM): no prompt template and a sentinel `ModelRef` (`provider="deterministic"`), never used to build an LLM (the runtime is a periodic staleness comparison over in-conduct procedures). - - FLAG-ONLY: the runtime issues NO command. It records one + - FLAG-ONLY: the runtime issues NO write command (unlike ClearanceExpirer's + expire_clearance), so there is no per-command Policy grant to seed. It does + issue an authz-gated `ListProcedures` read each tick, so under a real + Authorize policy (not the dev AllowAllAuthorize) the agent principal still + needs that read grant or every drain is denied; it records one Decision(context=ProcedureProgress, choice=Stall) per stall episode for a - human to act on. Permission to record Decisions is granted at - agent-definition time (the RunDebriefer stance); there is no - authorized-command leg and so no per-command Policy grant to seed. + human to act on (the append-only authorship path, the RunDebriefer stance). """ from __future__ import annotations diff --git a/apps/api/tests/unit/api/test_procedure_watcher.py b/apps/api/tests/unit/api/test_procedure_watcher.py index 930a03b483..366824aa6d 100644 --- a/apps/api/tests/unit/api/test_procedure_watcher.py +++ b/apps/api/tests/unit/api/test_procedure_watcher.py @@ -301,6 +301,43 @@ async def test_record_decision_is_idempotent_on_repeated_episode() -> None: assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is not None +@pytest.mark.unit +async def test_tick_drains_paginated_procedures() -> None: + """The drain follows next_cursor across pages, so a stale procedure on a + later page is still flagged (pins the cursor-advance against a mutant).""" + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + page1_pid, page2_pid = uuid4(), uuid4() + + async def list_procedures( + query: ListProcedures, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> ProcedureListPage: + if query.status != "Running": + return ProcedureListPage(items=[], next_cursor=None) + if query.cursor is None: + return ProcedureListPage( + items=[_item(page1_pid, last_status_changed_at=_OLD)], next_cursor="page2" + ) + return ProcedureListPage( + items=[_item(page2_pid, last_status_changed_at=_OLD)], next_cursor=None + ) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + # The page-2 procedure is only reached if the cursor advance ran. + assert await load_decision(kernel.event_store, _derive_decision_id(page2_pid, _OLD)) is not None + + @pytest.mark.unit async def test_tick_is_noop_when_watcher_actor_absent() -> None: """Revocation gate: with no seeded (active) ProcedureWatcher Actor, do nothing.""" @@ -319,6 +356,33 @@ async def test_tick_is_noop_when_watcher_actor_absent() -> None: assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None +@pytest.mark.unit +async def test_tick_is_noop_when_watcher_actor_deactivated() -> None: + """Kill switch: an operator deactivating the agent Actor stands the watcher + down even while seeded. Pins the `not actor.active` disjunct of the gate.""" + from cora.access.features import deactivate_actor + from cora.access.features.deactivate_actor import DeactivateActor + from cora.api._procedure_watcher import _watch_tick + + kernel = _kernel() + await seed_procedure_watcher_agent(kernel) + await deactivate_actor.bind(kernel)( + DeactivateActor(actor_id=PROCEDURE_WATCHER_AGENT_ID), + principal_id=PROCEDURE_WATCHER_AGENT_ID, + correlation_id=uuid4(), + ) + pid = uuid4() + list_procedures = _make_list_procedures([_item(pid, last_status_changed_at=_OLD)]) + + await _watch_tick( + deps=kernel, + list_procedures=list_procedures, + activity_lookup=InMemoryProcedureActivityLookup(), + ) + + assert await load_decision(kernel.event_store, _derive_decision_id(pid, _OLD)) is None + + @pytest.mark.unit def test_default_activity_lookup_is_in_memory_without_a_pool() -> None: """The production lookup selector (used when the lifespan is not handed an