From ee0f073d36c9a2a14732005867b31df3bc83fc8d Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Mon, 22 Jun 2026 13:56:08 +0300 Subject: [PATCH] feat(agent): CampaignWatcher flags Held campaigns stuck past a window CORA's 9th seeded agent and the first new consumer of the shared cora.api._flag_watcher scaffold (PR #308). A deterministic, flag-only, composition-root periodic watcher: each tick it lists Held campaigns (operator-paused) and records one Decision(context=CampaignProgress, choice=Stuck) per stuck episode for any whose last_status_changed_at (the time it was held) has sat past an operator window without being resumed or closed. It issues no command (it surfaces the forgotten pause so a human resumes or closes the campaign). Off by default; gates on Actor.active. On the scaffold it is a thin module: the staleness rule, the per-episode Decision id, the DecisionRegistered envelope, and the loop/lifespan come from _flag_watcher; this module owns only the Held drain, the campaign vocabulary, and the namespace. The simplest consumer yet: no activity fold needed, because Held makes no run-execution progress (last_status_changed_at, advanced only by resume/close, is the true clock; membership curation touches only run_count). A defensive status==Held re-check guards a future filter widening. naming-r3: context CampaignProgress (family-clean with ClearanceProgress / ProcedureProgress); choice Stuck -- the ideation's proposed "reuse Stall" would have collided (Stall is owned by ProcedureProgress, and choice tokens must be globally unique in the DecisionChoice projection), so this context owns its own token. Agent kind CampaignWatcher; agent id in a new cab1 block. No migration: proj_campaign_summary already carries last_status_changed_at + admits Held, and list_campaigns already filters by status. v1 watches Held only; Planned (legitimately not-started-yet) is deferred to a later variant. Co-Authored-By: Claude Opus 4.8 --- apps/api/src/cora/agent/__init__.py | 6 + .../src/cora/agent/seed_campaign_watcher.py | 104 ++++++ apps/api/src/cora/api/_campaign_watcher.py | 206 ++++++++++ apps/api/src/cora/api/main.py | 8 + .../decision/aggregates/decision/__init__.py | 6 + .../decision/aggregates/decision/state.py | 24 ++ apps/api/src/cora/infrastructure/config.py | 35 ++ .../unit/agent/test_campaign_watcher_seed.py | 107 ++++++ .../tests/unit/api/test_campaign_watcher.py | 351 ++++++++++++++++++ .../decision/test_campaign_progress_vocab.py | 70 ++++ docs/architecture/modules/agent/index.md | 9 +- 11 files changed, 922 insertions(+), 4 deletions(-) create mode 100644 apps/api/src/cora/agent/seed_campaign_watcher.py create mode 100644 apps/api/src/cora/api/_campaign_watcher.py create mode 100644 apps/api/tests/unit/agent/test_campaign_watcher_seed.py create mode 100644 apps/api/tests/unit/api/test_campaign_watcher.py create mode 100644 apps/api/tests/unit/decision/test_campaign_progress_vocab.py diff --git a/apps/api/src/cora/agent/__init__.py b/apps/api/src/cora/agent/__init__.py index 06680f99b3..dae5206b79 100644 --- a/apps/api/src/cora/agent/__init__.py +++ b/apps/api/src/cora/agent/__init__.py @@ -41,6 +41,10 @@ CALIBRATION_WATCHER_AGENT_ID, seed_calibration_watcher_agent, ) +from cora.agent.seed_campaign_watcher import ( + CAMPAIGN_WATCHER_AGENT_ID, + seed_campaign_watcher_agent, +) from cora.agent.seed_caution_drafter import seed_caution_drafter_agent from cora.agent.seed_caution_promoter import ( CAUTION_PROMOTER_AGENT_ID, @@ -67,6 +71,7 @@ __all__ = [ "CALIBRATION_WATCHER_AGENT_ID", + "CAMPAIGN_WATCHER_AGENT_ID", "CAUTION_PROMOTER_AGENT_ID", "CLEARANCE_EXPIRER_AGENT_ID", "CLEARANCE_WATCHER_AGENT_ID", @@ -84,6 +89,7 @@ "register_agent_subscribers", "register_agent_tools", "seed_calibration_watcher_agent", + "seed_campaign_watcher_agent", "seed_caution_drafter_agent", "seed_caution_promoter_agent", "seed_clearance_expirer_agent", diff --git a/apps/api/src/cora/agent/seed_campaign_watcher.py b/apps/api/src/cora/agent/seed_campaign_watcher.py new file mode 100644 index 0000000000..81384da48c --- /dev/null +++ b/apps/api/src/cora/agent/seed_campaign_watcher.py @@ -0,0 +1,104 @@ +"""Bootstrap-time seed for the CampaignWatcher Agent. + +The CampaignWatcher runtime (CORA's 9th seeded agent, a composition-root periodic +loop, deterministic flag-only) needs an Agent record (and its co-registered +Actor) at the pinned `CAMPAIGN_WATCHER_AGENT_ID` so it can author CampaignProgress +Decisions (`decided_by = ActorId(CAMPAIGN_WATCHER_AGENT_ID)`) as an agent-kind +principal. Mirrors `cora.agent.seed_procedure_watcher` except for the per-agent +constants; the shared scaffolding lives in `cora.agent._agent_seed`, and the +runtime mechanics in `cora.api._flag_watcher`. + +Per [[project-campaign-watcher-design]]: + - Pinned UUID in the `cab1` block (ninth seeded identity, sibling to + RunDebriefer `aaaa00XX`, CautionDrafter `bbbb00XX`, RunSupervisor + `cccc00XX`, CautionPromoter `dddd00XX`, ClearanceExpirer `eeee00XX`, + ClearanceWatcher `ffff00XX`, CalibrationWatcher `ca1100XX`, ProcedureWatcher + `0c0c00XX`); deployment-stable forever. `cab1` is distinct from calibration's + `ca11`. + - DETERMINISTIC agent (rule-based, NOT LLM): no prompt template and a sentinel + `ModelRef` (`provider="deterministic"`), never used to build an LLM (the + runtime is a periodic staleness comparison over Held campaigns). + - FLAG-ONLY: the runtime issues NO command. It records one + Decision(context=CampaignProgress, choice=Stuck) per stuck-Held episode for a + human to act on. Permission to record Decisions is granted at agent-definition + time (the RunDebriefer stance); it issues no write command, though it does + issue an authz-gated ListCampaigns read each tick. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from uuid import UUID + +from cora.agent._agent_seed import AgentSeedIdentity, seed_agent +from cora.agent.aggregates.agent import ModelRef + +if TYPE_CHECKING: + from cora.infrastructure.kernel import Kernel + + +# --------------------------------------------------------------------------- +# CampaignWatcher agent identity (deployment-stable constants) +# --------------------------------------------------------------------------- + +# Treat as FOREVER-STABLE. Same change-cost rationale as the other seeded +# agents: changing this orphans every prior CampaignWatcher-authored Decision +# (their actor_id pointers go stale). UUID is in the deployment-controlled +# `cab1` block (ninth seeded identity). +CAMPAIGN_WATCHER_AGENT_ID = UUID("01900000-0000-7000-8000-0000cab10010") +CAMPAIGN_WATCHER_AGENT_NAME = "CampaignWatcher" +CAMPAIGN_WATCHER_AGENT_KIND = "CampaignWatcher" +CAMPAIGN_WATCHER_AGENT_VERSION = "1.0.0" +CAMPAIGN_WATCHER_AGENT_DESCRIPTION = ( + "Deterministic in-loop agent: watches Held campaigns (operator-paused) and " + "records one Decision(context=CampaignProgress, choice=Stuck) per campaign " + "that has sat Held past the staleness window without being resumed or closed. " + "Flag-only (advise a human); issues no command." +) + + +# Sentinel model ref: CampaignWatcher is rule-based, not an LLM agent. The Agent +# aggregate requires a ModelRef; this value is never used to build an LLM (the +# runtime is a periodic staleness comparison, no build_llm call). +_DETERMINISTIC_MODEL_REF = ModelRef( + provider="deterministic", + model="agent:CampaignWatcher:v1", + snapshot_pin=None, +) + + +# --------------------------------------------------------------------------- +# Deterministic IDs for the bootstrap write envelope +# --------------------------------------------------------------------------- + +_AGENT_EVENT_ID = UUID("01900000-0000-7000-8000-0000cab10012") +_ACTOR_EVENT_ID = UUID("01900000-0000-7000-8000-0000cab10013") +_BOOTSTRAP_CORRELATION_ID = UUID("01900000-0000-7000-8000-0000cab10014") + + +async def seed_campaign_watcher_agent(kernel: Kernel) -> None: + """Seed the CampaignWatcher Agent + co-registered Actor (idempotent).""" + identity = AgentSeedIdentity( + agent_id=CAMPAIGN_WATCHER_AGENT_ID, + name=CAMPAIGN_WATCHER_AGENT_NAME, + kind=CAMPAIGN_WATCHER_AGENT_KIND, + version=CAMPAIGN_WATCHER_AGENT_VERSION, + description=CAMPAIGN_WATCHER_AGENT_DESCRIPTION, + model_ref=_DETERMINISTIC_MODEL_REF, + prompt_template_id=None, + agent_event_id=_AGENT_EVENT_ID, + actor_event_id=_ACTOR_EVENT_ID, + correlation_id=_BOOTSTRAP_CORRELATION_ID, + command_name="SeedCampaignWatcherAgent", + ) + await seed_agent(kernel, identity) + + +__all__ = [ + "CAMPAIGN_WATCHER_AGENT_DESCRIPTION", + "CAMPAIGN_WATCHER_AGENT_ID", + "CAMPAIGN_WATCHER_AGENT_KIND", + "CAMPAIGN_WATCHER_AGENT_NAME", + "CAMPAIGN_WATCHER_AGENT_VERSION", + "seed_campaign_watcher_agent", +] diff --git a/apps/api/src/cora/api/_campaign_watcher.py b/apps/api/src/cora/api/_campaign_watcher.py new file mode 100644 index 0000000000..21afacdf3c --- /dev/null +++ b/apps/api/src/cora/api/_campaign_watcher.py @@ -0,0 +1,206 @@ +"""CampaignWatcher runtime: the 9th seeded agent (deterministic flag-only). + +A periodic background task, hosted at the composition root (`cora.api`) because +it reads the Campaign BC AND composes Decision BC events; only `cora.api` may +depend on both (same placement rationale as `_clearance_watcher`, +`_calibration_watcher`, `_procedure_watcher`). The agent-invariant mechanics (the +staleness rule, the per-episode Decision id, the DecisionRegistered envelope, and +the periodic loop / lifespan) live in `cora.api._flag_watcher`; this module owns +only what is specific to campaigns. See [[project-campaign-watcher-design]]. + +## What v1 does + +Each tick it lists `Held` campaigns (operator-paused), selects those whose +`last_status_changed_at` (the time they were held) has sat past the +operator-config staleness window without being resumed or closed, and records one +`Decision(context=CampaignProgress, choice=Stuck)` per stuck EPISODE. It is +FLAG-ONLY: it issues NO command (it surfaces the forgotten pause so a human +resumes or closes the campaign). v1 watches only `Held`; `Planned` (a campaign +that has legitimately not started yet) is deferred to a later variant with its +own window. + +## No activity fold needed + +Unlike `_procedure_watcher` (whose Running state churns activity without advancing +the status timestamp), `Held` makes no run-EXECUTION progress: `last_status_changed_at` +(set on `CampaignHeld`, advanced only by `resume_campaign` / `close_campaign`) is +the correct clock and no recency fold is required. Membership curation +(`add_run_to_campaign` / remove) is permitted while Held but deliberately touches +only `run_count`, never the status clock, so a long forgotten Hold still flags. A +defensive `status == Held` re-check guards a future filter widening (mirrors +`_calibration_watcher`). + +## Idempotency / edge-trigger: one Stuck flag per episode + +The Decision id is `uuid5(namespace, "decision:{campaign_id}:{last_status_changed_at}")`, +so re-ticks while the campaign is still Held hit the same id (`ConcurrencyError` +no-op). `resume_campaign` advances `last_status_changed_at` (and leaves Held), and +`close_campaign` / `abandon_campaign` leave the Held filter, so a resumed campaign +that is re-held opens a fresh episode. + +## Fail-safe and bounded + +Flag-only and reversible by construction: the worst outcome is an advisory +Decision a human can ignore. The runtime gates on `Actor.active`, so deactivating +the agent Actor stops it. Off by default (`settings.campaign_watcher_enabled`). +""" + +from __future__ import annotations + +import contextlib +from typing import TYPE_CHECKING +from uuid import UUID + +from cora.access.aggregates.actor import load_actor +from cora.agent.seed_campaign_watcher import CAMPAIGN_WATCHER_AGENT_ID +from cora.api._flag_watcher import ( + derive_watcher_decision_id, + flag_watcher_lifespan, + is_stalled, + record_watcher_decision, +) +from cora.campaign.features.list_campaigns import ListCampaigns +from cora.decision.aggregates.decision import DECISION_CONTEXT_CAMPAIGN_PROGRESS +from cora.infrastructure.routing import NIL_SENTINEL_ID + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from datetime import datetime + + from cora.campaign.features.list_campaigns import CampaignSummaryItem + from cora.campaign.features.list_campaigns.handler import Handler as ListCampaignsHandler + from cora.infrastructure.kernel import Kernel + +_LOG_PREFIX = "campaign_watcher" +_RULE = "agent:CampaignWatcher:v1" +_COMMAND_NAME = "CampaignWatcherTick" +_PAGE_LIMIT = 100 +_CHOICE_STUCK = "Stuck" +_STATUS_HELD = "Held" + +# Stable namespace for deriving the deterministic Decision id from the campaign +# id + the held-episode timestamp (cab1 block, distinct from the seed envelope +# ids), so one Stuck flag is written per stuck episode and re-ticks are no-ops. +_DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-0000cab10002") + + +def _derive_decision_id(campaign_id: UUID, last_status_changed_at: datetime) -> UUID: + """Deterministic CampaignProgress Decision id for one stuck episode.""" + return derive_watcher_decision_id(_DECISION_NAMESPACE, campaign_id, last_status_changed_at) + + +async def _record_decision( + deps: Kernel, + *, + campaign_id: UUID, + name: str, + last_status_changed_at: datetime, + now: datetime, +) -> None: + """Append one DecisionRegistered(context=CampaignProgress, choice=Stuck).""" + stuck_seconds = int((now - last_status_changed_at).total_seconds()) + await record_watcher_decision( + deps, + agent_id=CAMPAIGN_WATCHER_AGENT_ID, + context=DECISION_CONTEXT_CAMPAIGN_PROGRESS, + choice=_CHOICE_STUCK, + rule=_RULE, + command_name=_COMMAND_NAME, + decision_id=_derive_decision_id(campaign_id, last_status_changed_at), + entity_id=campaign_id, + now=now, + reasoning=( + f"Campaign ({name}) has sat Held for {stuck_seconds}s without being resumed " + "or closed (past the staleness window); surfaced for operator follow-up." + ), + inputs={ + "campaign_id": str(campaign_id), + "name": name, + "last_status_changed_at": last_status_changed_at.isoformat(), + "stuck_seconds": str(stuck_seconds), + "occurred_at": now.isoformat(), + }, + log_prefix=_LOG_PREFIX, + ) + + +async def _drain_held_campaigns( + list_campaigns: ListCampaignsHandler, deps: Kernel +) -> list[CampaignSummaryItem]: + """Page through list_campaigns filtered to status Held.""" + items: list[CampaignSummaryItem] = [] + cursor: str | None = None + while True: + page = await list_campaigns( + ListCampaigns(statuses=[_STATUS_HELD], cursor=cursor, limit=_PAGE_LIMIT), + principal_id=CAMPAIGN_WATCHER_AGENT_ID, + correlation_id=deps.id_generator.new_id(), + surface_id=NIL_SENTINEL_ID, + ) + items.extend(page.items) + if page.next_cursor is None: + break + cursor = page.next_cursor + return items + + +async def _watch_tick( + *, + deps: Kernel, + list_campaigns: ListCampaignsHandler, +) -> None: + """One watch sweep over all Held campaigns.""" + actor = await load_actor(deps.event_store, CAMPAIGN_WATCHER_AGENT_ID) + if actor is None or not actor.active: + # Agent not seeded yet, or deactivated by an operator: stand down. + return + + now = deps.clock.now() + stale_after = deps.settings.campaign_watcher_stale_after_seconds + for item in await _drain_held_campaigns(list_campaigns, deps): + # Defensive: the drain filters to Held, but re-check so a future filter + # change cannot widen what gets flagged. + if item.status != _STATUS_HELD: + continue + if item.last_status_changed_at is None: + # No status-change timestamp recorded: cannot evaluate; defer. + continue + if not is_stalled(item.last_status_changed_at, now, stale_after): + continue + await _record_decision( + deps, + campaign_id=item.campaign_id, + name=item.name, + last_status_changed_at=item.last_status_changed_at, + now=now, + ) + + +@contextlib.asynccontextmanager +async def campaign_watcher_lifespan( + deps: Kernel, + *, + list_campaigns: ListCampaignsHandler, + interval_seconds: float | None = None, +) -> AsyncGenerator[None]: + """Spawn the CampaignWatcher loop for the duration of the context. + + No-op unless `settings.campaign_watcher_enabled` is True (default off, so a + deployment opts in explicitly). + """ + + async def tick() -> None: + await _watch_tick(deps=deps, list_campaigns=list_campaigns) + + async with flag_watcher_lifespan( + enabled=deps.settings.campaign_watcher_enabled, + default_tick_seconds=deps.settings.campaign_watcher_tick_seconds, + log_prefix=_LOG_PREFIX, + task_name="campaign-watcher", + tick=tick, + interval_seconds=interval_seconds, + ): + yield + + +__all__ = ["campaign_watcher_lifespan", "is_stalled"] diff --git a/apps/api/src/cora/api/main.py b/apps/api/src/cora/api/main.py index 91f91f7540..7543b4d2f0 100644 --- a/apps/api/src/cora/api/main.py +++ b/apps/api/src/cora/api/main.py @@ -55,6 +55,7 @@ register_agent_subscribers, register_agent_tools, seed_calibration_watcher_agent, + seed_campaign_watcher_agent, seed_caution_drafter_agent, seed_caution_promoter_agent, seed_clearance_expirer_agent, @@ -65,6 +66,7 @@ wire_agent, ) from cora.api._calibration_watcher import calibration_watcher_lifespan +from cora.api._campaign_watcher import campaign_watcher_lifespan from cora.api._clearance_expirer import clearance_expirer_lifespan from cora.api._clearance_watcher import clearance_watcher_lifespan from cora.api._compute_runtime import ComputeRuntime @@ -804,6 +806,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: await seed_calibration_watcher_agent(deps) # same shape for ProcedureWatcher (deterministic flag-only agent). await seed_procedure_watcher_agent(deps) + # same shape for CampaignWatcher (deterministic flag-only agent). + await seed_campaign_watcher_agent(deps) # Drain Federation-owned projections so the Postgres-backed # FacilityLookup.list_active() resolves the self-Facility row @@ -884,6 +888,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: deps, list_procedures=app.state.operation.list_procedures, ), + campaign_watcher_lifespan( + deps, + list_campaigns=app.state.campaign.list_campaigns, + ), ): yield finally: diff --git a/apps/api/src/cora/decision/aggregates/decision/__init__.py b/apps/api/src/cora/decision/aggregates/decision/__init__.py index f30f167827..2e2b5f3e65 100644 --- a/apps/api/src/cora/decision/aggregates/decision/__init__.py +++ b/apps/api/src/cora/decision/aggregates/decision/__init__.py @@ -28,6 +28,7 @@ from cora.decision.aggregates.decision.read import load_decision from cora.decision.aggregates.decision.state import ( CALIBRATION_VERIFICATION_CHOICES, + CAMPAIGN_PROGRESS_CHOICES, CAUTION_PROMOTION_CHOICES, CAUTION_PROPOSAL_CHOICES, CLEARANCE_EXPIRY_CHOICES, @@ -39,6 +40,7 @@ DECISION_ALTERNATIVES_MAX_ENTRIES, DECISION_CHOICE_MAX_LENGTH, DECISION_CONTEXT_CALIBRATION_VERIFICATION, + DECISION_CONTEXT_CAMPAIGN_PROGRESS, DECISION_CONTEXT_CAUTION_PROMOTION, DECISION_CONTEXT_CAUTION_PROPOSAL, DECISION_CONTEXT_CLEARANCE_EXPIRY, @@ -66,6 +68,7 @@ RUN_DEBRIEF_CHOICES, RUN_SUPERVISION_CHOICES, CalibrationVerificationChoice, + CampaignProgressChoice, CautionPromotionChoice, CautionProposalChoice, ClearanceExpiryChoice, @@ -111,6 +114,7 @@ __all__ = [ "CALIBRATION_VERIFICATION_CHOICES", + "CAMPAIGN_PROGRESS_CHOICES", "CAUTION_PROMOTION_CHOICES", "CAUTION_PROPOSAL_CHOICES", "CLEARANCE_EXPIRY_CHOICES", @@ -122,6 +126,7 @@ "DECISION_ALTERNATIVE_ENTRY_MAX_LENGTH", "DECISION_CHOICE_MAX_LENGTH", "DECISION_CONTEXT_CALIBRATION_VERIFICATION", + "DECISION_CONTEXT_CAMPAIGN_PROGRESS", "DECISION_CONTEXT_CAUTION_PROMOTION", "DECISION_CONTEXT_CAUTION_PROPOSAL", "DECISION_CONTEXT_CLEARANCE_EXPIRY", @@ -156,6 +161,7 @@ "RUN_DEBRIEF_CHOICES", "RUN_SUPERVISION_CHOICES", "CalibrationVerificationChoice", + "CampaignProgressChoice", "CautionPromotionChoice", "CautionProposalChoice", "ClearanceExpiryChoice", diff --git a/apps/api/src/cora/decision/aggregates/decision/state.py b/apps/api/src/cora/decision/aggregates/decision/state.py index 18d3f3d2dc..ce36afe252 100644 --- a/apps/api/src/cora/decision/aggregates/decision/state.py +++ b/apps/api/src/cora/decision/aggregates/decision/state.py @@ -492,6 +492,30 @@ PROCEDURE_PROGRESS_CHOICES: Final = frozenset({"Stall"}) +# Context for the CampaignWatcher agent's Decisions about the CAMPAIGN-PROGRESS +# lifecycle dimension it patrols: whether a campaign is still advancing, or has +# sat Held (operator-paused) past the operator window without being resumed or +# closed. Open-ended convention identical to ClearanceProgress / ProcedureProgress; +# the closed choice vocabulary lives in the `CampaignProgressChoice` Literal below. +# FLAG-ONLY: the agent records one Decision per stuck episode and issues NO +# command. The context noun is `CampaignProgress` (the lifecycle dimension being +# judged, family-clean with ClearanceProgress / ProcedureProgress); the agent kind +# is `CampaignWatcher` (the doer) -- the same dimension-vs-doer asymmetry as the +# Clearance, Calibration, and Procedure trios. +DECISION_CONTEXT_CAMPAIGN_PROGRESS = "CampaignProgress" + + +# Closed `choice` value set for `context = "CampaignProgress"` Decisions. +# Projection-validated, not domain-enforced. Single value today (the agent only +# ever flags a stuck-Held campaign); the Literal exists for symmetry and so a +# future qualified disposition can land additively. `Stuck` is globally unique in +# the shared, filtered DecisionChoice projection column (naming-r3: `Flag` is +# taken by ClearanceProgress, `Stale` by CalibrationVerification, and `Stall` by +# ProcedureProgress, so this context owns its own disposition token). +CampaignProgressChoice = Literal["Stuck"] +CAMPAIGN_PROGRESS_CHOICES: Final = frozenset({"Stuck"}) + + # acceptance-signal capture: closed 3-value rating set on # the new `DecisionRated` event. `useful` and `misleading` are # operator-affirmative; `ignored` is a positive marker ("operator saw diff --git a/apps/api/src/cora/infrastructure/config.py b/apps/api/src/cora/infrastructure/config.py index 732f0faea8..5a893009bb 100644 --- a/apps/api/src/cora/infrastructure/config.py +++ b/apps/api/src/cora/infrastructure/config.py @@ -301,6 +301,17 @@ class Settings(BaseSettings): procedure_watcher_tick_seconds: float = 300.0 procedure_watcher_stale_after_seconds: float = 3600.0 + # `campaign_watcher_enabled` gates the CampaignWatcher background runtime + # (9th seeded agent, deterministic flag-only). Default off: deployments opt in + # explicitly. `campaign_watcher_tick_seconds` is the sweep cadence (>= 0.1s). + # `campaign_watcher_stale_after_seconds` is how long a campaign may sit Held + # (operator-paused) without being resumed or closed before it is flagged; a + # forgotten pause is a slow failure, so the default is a week (off by default; + # an operator sets the real window on enable). + campaign_watcher_enabled: bool = False + campaign_watcher_tick_seconds: float = 300.0 + campaign_watcher_stale_after_seconds: float = 604800.0 + # Edge auth # `identity_providers` is the list of IdPs CORA accepts tokens # from. Empty (default) keeps the legacy X-Principal-Id-with- @@ -664,3 +675,27 @@ def _validate_procedure_watcher_stale_after_seconds(cls, value: float) -> float: ) raise ValueError(msg) return value + + @field_validator("campaign_watcher_tick_seconds") + @classmethod + def _validate_campaign_watcher_tick_seconds(cls, value: float) -> float: + """Floor of 0.1s prevents a tight watch-sweep loop.""" + if value < 0.1: + msg = ( + f"campaign_watcher_tick_seconds must be >= 0.1, got {value}; " + "values below 100ms would tight-loop the watcher" + ) + raise ValueError(msg) + return value + + @field_validator("campaign_watcher_stale_after_seconds") + @classmethod + def _validate_campaign_watcher_stale_after_seconds(cls, value: float) -> float: + """Must be positive: a non-positive window would flag every Held campaign.""" + if value <= 0: + msg = ( + f"campaign_watcher_stale_after_seconds must be > 0, got {value}; " + "a non-positive window would flag every Held campaign" + ) + raise ValueError(msg) + return value diff --git a/apps/api/tests/unit/agent/test_campaign_watcher_seed.py b/apps/api/tests/unit/agent/test_campaign_watcher_seed.py new file mode 100644 index 0000000000..b3d21cbce2 --- /dev/null +++ b/apps/api/tests/unit/agent/test_campaign_watcher_seed.py @@ -0,0 +1,107 @@ +"""Unit tests for the CampaignWatcher Agent bootstrap seed. + +CampaignWatcher is the ninth seeded agent and a deterministic flag-only watcher: +no prompt template and a sentinel ModelRef (it is rule-based, a periodic +staleness comparison over Held campaigns, never builds an LLM). These tests pin +that shape alongside the shared seed scaffolding. +""" + +from datetime import UTC, datetime + +import pytest + +from cora.agent.aggregates.agent import load_agent +from cora.agent.seed_campaign_watcher import ( + CAMPAIGN_WATCHER_AGENT_ID, + CAMPAIGN_WATCHER_AGENT_KIND, + CAMPAIGN_WATCHER_AGENT_NAME, + CAMPAIGN_WATCHER_AGENT_VERSION, + seed_campaign_watcher_agent, +) +from cora.infrastructure.config import Settings +from cora.infrastructure.deps import make_inmemory_kernel +from cora.infrastructure.kernel import Kernel +from cora.infrastructure.ports import AllowAllAuthorize, FakeClock, FixedIdGenerator + + +def _kernel() -> Kernel: + settings = Settings() # type: ignore[call-arg] + return make_inmemory_kernel( + settings=settings, + clock=FakeClock(datetime(2026, 6, 22, 14, 0, 0, tzinfo=UTC)), + id_generator=FixedIdGenerator([]), + authz=AllowAllAuthorize(), + ) + + +@pytest.mark.unit +async def test_seed_creates_campaign_watcher_at_pinned_id() -> None: + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + + agent = await load_agent(kernel.event_store, CAMPAIGN_WATCHER_AGENT_ID) + assert agent is not None + assert agent.id == CAMPAIGN_WATCHER_AGENT_ID + assert agent.name.value == CAMPAIGN_WATCHER_AGENT_NAME + assert agent.kind.value == CAMPAIGN_WATCHER_AGENT_KIND + assert agent.version.value == CAMPAIGN_WATCHER_AGENT_VERSION + + +@pytest.mark.unit +async def test_seed_is_deterministic_no_prompt_sentinel_model() -> None: + """Deterministic agent: no prompt template, sentinel (non-LLM) model_ref.""" + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + + agent = await load_agent(kernel.event_store, CAMPAIGN_WATCHER_AGENT_ID) + assert agent is not None + assert agent.prompt_template_id is None + assert agent.model_ref.provider == "deterministic" + assert agent.model_ref.model == "agent:CampaignWatcher:v1" + + +@pytest.mark.unit +async def test_seed_creates_co_registered_actor() -> None: + """The cross-BC genesis: Actor (kind=agent) at the pinned id.""" + from cora.access.aggregates.actor import load_actor + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + + actor = await load_actor(kernel.event_store, CAMPAIGN_WATCHER_AGENT_ID) + assert actor is not None + assert actor.id == CAMPAIGN_WATCHER_AGENT_ID + assert actor.kind.value == "agent" + + +@pytest.mark.unit +async def test_seed_is_idempotent() -> None: + """Re-running the seed is a no-op (ConcurrencyError-as-success pattern).""" + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + await seed_campaign_watcher_agent(kernel) + + +@pytest.mark.unit +async def test_campaign_watcher_id_distinct_from_all_prior_agents() -> None: + """The nine seeded agents share the UUID-range scheme but must NOT collide.""" + from cora.agent.seed import RUN_DEBRIEFER_AGENT_ID + from cora.agent.seed_calibration_watcher import CALIBRATION_WATCHER_AGENT_ID + from cora.agent.seed_caution_drafter import CAUTION_DRAFTER_AGENT_ID + from cora.agent.seed_caution_promoter import CAUTION_PROMOTER_AGENT_ID + from cora.agent.seed_clearance_expirer import CLEARANCE_EXPIRER_AGENT_ID + from cora.agent.seed_clearance_watcher import CLEARANCE_WATCHER_AGENT_ID + from cora.agent.seed_procedure_watcher import PROCEDURE_WATCHER_AGENT_ID + from cora.agent.seed_run_supervisor import RUN_SUPERVISOR_AGENT_ID + + prior = { + RUN_DEBRIEFER_AGENT_ID, + CAUTION_DRAFTER_AGENT_ID, + RUN_SUPERVISOR_AGENT_ID, + CAUTION_PROMOTER_AGENT_ID, + CLEARANCE_EXPIRER_AGENT_ID, + CLEARANCE_WATCHER_AGENT_ID, + CALIBRATION_WATCHER_AGENT_ID, + PROCEDURE_WATCHER_AGENT_ID, + } + assert CAMPAIGN_WATCHER_AGENT_ID not in prior diff --git a/apps/api/tests/unit/api/test_campaign_watcher.py b/apps/api/tests/unit/api/test_campaign_watcher.py new file mode 100644 index 0000000000..1eafbcbfed --- /dev/null +++ b/apps/api/tests/unit/api/test_campaign_watcher.py @@ -0,0 +1,351 @@ +"""Tests for the CampaignWatcher runtime (cora.api._campaign_watcher). + +Covers the pure staleness rule (is_stalled, from the shared scaffold) on both +sides of the inclusive boundary, plus a fakes-driven tick that exercises the +drain -> flag Decision loop, the Held-only / defensive-status guard, the +cannot-tell defer, the paginated drain, the Actor.active revocation + deactivation +gates, idempotency, and the disabled / failing-tick lifespan behavior. +""" + +# white-box test of the runtime internals (private functions / constants) +# pyright: reportPrivateUsage=false + +import asyncio +from datetime import UTC, datetime, timedelta +from uuid import UUID, uuid4 + +import pytest + +from cora.agent.seed_campaign_watcher import ( + CAMPAIGN_WATCHER_AGENT_ID, + seed_campaign_watcher_agent, +) +from cora.api._campaign_watcher import ( + _derive_decision_id, + campaign_watcher_lifespan, + is_stalled, +) +from cora.campaign.features.list_campaigns import ( + CampaignListPage, + CampaignSummaryItem, + ListCampaigns, +) +from cora.decision.aggregates.decision import load_decision +from cora.infrastructure.adapters.in_memory_event_store import InMemoryEventStore +from cora.infrastructure.config import Settings +from cora.infrastructure.deps import make_inmemory_kernel +from cora.infrastructure.kernel import Kernel +from cora.infrastructure.ports import AllowAllAuthorize, FakeClock, UUIDv7Generator +from cora.infrastructure.routing import NIL_SENTINEL_ID + +_NOW = datetime(2026, 6, 22, 12, 0, 0, tzinfo=UTC) +_STALE_AFTER = 604800.0 # 7 days +_OLD = _NOW - timedelta(days=14) # clearly stale at a 7-day window +_RECENT = _NOW - timedelta(hours=1) # fresh +_BOUNDARY = _NOW - timedelta(seconds=int(_STALE_AFTER)) + + +# ---------- pure rule: is_stalled ---------- + + +@pytest.mark.unit +def test_is_stalled_when_held_old() -> None: + assert is_stalled(_OLD, _NOW, _STALE_AFTER) is True + + +@pytest.mark.unit +def test_is_not_stalled_when_held_recent() -> None: + assert is_stalled(_RECENT, _NOW, _STALE_AFTER) is False + + +@pytest.mark.unit +def test_stalled_is_inclusive_at_boundary() -> None: + """Elapsed == window FLAGS (inclusive >=); pins the `>`-vs-`>=` mutant.""" + assert is_stalled(_BOUNDARY, _NOW, _STALE_AFTER) is True + + +# ---------- tick: full loop with fakes ---------- + + +def _kernel(*, enabled: bool = False, stale_after: float = _STALE_AFTER) -> Kernel: + settings = Settings( # type: ignore[call-arg] + campaign_watcher_enabled=enabled, + campaign_watcher_stale_after_seconds=stale_after, + ) + return make_inmemory_kernel( + settings=settings, + clock=FakeClock(_NOW), + id_generator=UUIDv7Generator(), + authz=AllowAllAuthorize(), + ) + + +def _item( + campaign_id: UUID, + *, + status: str = "Held", + last_status_changed_at: datetime | None, +) -> CampaignSummaryItem: + return CampaignSummaryItem( + campaign_id=campaign_id, + name="winter-tomo-series", + intent="Series", + status=status, + lead_actor_id=uuid4(), + subject_id=None, + description=None, + tags=[], + external_id=None, + run_count=0, + registered_at=_OLD, + started_at=_OLD, + last_status_changed_at=last_status_changed_at, + last_status_reason="paused for beam study", + ) + + +def _make_list_campaigns(items: list[CampaignSummaryItem], *, honor_filter: bool = True): + async def list_campaigns( + query: ListCampaigns, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> CampaignListPage: + wanted = query.statuses + if honor_filter and wanted: + matching = [i for i in items if i.status in wanted] + else: + matching = list(items) + return CampaignListPage(items=matching, next_cursor=None) + + return list_campaigns + + +async def _campaign_progress_decision_count(kernel: Kernel) -> int: + """Count CampaignProgress Decisions written to the event store.""" + store = kernel.event_store + assert isinstance(store, InMemoryEventStore) + count = 0 + for stream_type, stream_id in store._streams: + if stream_type != "Decision": + continue + decision = await load_decision(store, stream_id) + if decision is not None and decision.context.value == "CampaignProgress": + count += 1 + return count + + +@pytest.mark.unit +async def test_tick_flags_stuck_held_and_records_decision() -> None: + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_OLD)]) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + decision = await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) + assert decision is not None + assert decision.context.value == "CampaignProgress" + assert decision.choice.value == "Stuck" + + +@pytest.mark.unit +async def test_tick_skips_fresh_held() -> None: + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_RECENT)]) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _RECENT)) is None + + +@pytest.mark.unit +async def test_tick_does_not_flag_when_status_timestamp_missing() -> None: + """cannot-tell -> defer: a Held row with no last_status_changed_at is skipped.""" + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=None)]) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await _campaign_progress_decision_count(kernel) == 0 + + +@pytest.mark.unit +async def test_tick_skips_non_held_status_even_if_unfiltered() -> None: + """Defensive guard: a non-Held campaign is never flagged, even if the drain + returned it. Pins the status check against a filter widening.""" + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + cid = uuid4() + list_campaigns = _make_list_campaigns( + [_item(cid, status="Active", last_status_changed_at=_OLD)], honor_filter=False + ) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is None + + +@pytest.mark.unit +async def test_tick_drains_paginated_campaigns() -> None: + """The drain follows next_cursor across pages, so a stuck campaign on a later + page is still flagged (pins the cursor-advance against a mutant).""" + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + page1_cid, page2_cid = uuid4(), uuid4() + + async def list_campaigns( + query: ListCampaigns, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> CampaignListPage: + if query.cursor is None: + return CampaignListPage( + items=[_item(page1_cid, last_status_changed_at=_OLD)], next_cursor="page2" + ) + return CampaignListPage( + items=[_item(page2_cid, last_status_changed_at=_OLD)], next_cursor=None + ) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await load_decision(kernel.event_store, _derive_decision_id(page2_cid, _OLD)) is not None + + +@pytest.mark.unit +async def test_record_decision_is_idempotent_on_repeated_episode() -> None: + """Re-flagging the same stuck episode is a ConcurrencyError no-op, not a crash.""" + from cora.api._campaign_watcher import _record_decision + + kernel = _kernel() + cid = uuid4() + await _record_decision(kernel, campaign_id=cid, name="c", last_status_changed_at=_OLD, now=_NOW) + await _record_decision(kernel, campaign_id=cid, name="c", last_status_changed_at=_OLD, now=_NOW) + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is not None + + +@pytest.mark.unit +async def test_tick_is_noop_when_watcher_actor_absent() -> None: + """Revocation gate: with no seeded (active) CampaignWatcher Actor, do nothing.""" + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() # NOT seeded + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_OLD)]) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is None + + +@pytest.mark.unit +async def test_tick_is_noop_when_watcher_actor_deactivated() -> None: + """Kill switch: an operator deactivating the agent Actor stands the watcher + down even while seeded. Pins the `not actor.active` disjunct of the gate.""" + from cora.access.features import deactivate_actor + from cora.access.features.deactivate_actor import DeactivateActor + from cora.api._campaign_watcher import _watch_tick + + kernel = _kernel() + await seed_campaign_watcher_agent(kernel) + await deactivate_actor.bind(kernel)( + DeactivateActor(actor_id=CAMPAIGN_WATCHER_AGENT_ID), + principal_id=CAMPAIGN_WATCHER_AGENT_ID, + correlation_id=uuid4(), + ) + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_OLD)]) + + await _watch_tick(deps=kernel, list_campaigns=list_campaigns) + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is None + + +@pytest.mark.unit +async def test_lifespan_is_noop_when_disabled() -> None: + """Default settings (campaign_watcher_enabled=False): clean no-op, no task.""" + kernel = _kernel() + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_OLD)]) + + async with campaign_watcher_lifespan(kernel, list_campaigns=list_campaigns): + pass + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is None + + +@pytest.mark.unit +async def test_lifespan_enabled_runs_the_loop_and_flags() -> None: + """Enabled: the lifespan spawns the loop, which flags a stuck campaign.""" + kernel = _kernel(enabled=True) + await seed_campaign_watcher_agent(kernel) + cid = uuid4() + list_campaigns = _make_list_campaigns([_item(cid, last_status_changed_at=_OLD)]) + + async with campaign_watcher_lifespan( + kernel, list_campaigns=list_campaigns, interval_seconds=0.01 + ): + await asyncio.sleep(0.1) + + assert await load_decision(kernel.event_store, _derive_decision_id(cid, _OLD)) is not None + + +@pytest.mark.unit +async def test_loop_survives_a_failing_tick() -> None: + """A tick that raises is logged and the loop keeps going; lifespan exits cleanly.""" + kernel = _kernel(enabled=True) + await seed_campaign_watcher_agent(kernel) + + async def failing_list_campaigns( + query: ListCampaigns, + *, + principal_id: UUID, + correlation_id: UUID, + surface_id: UUID = NIL_SENTINEL_ID, + ) -> CampaignListPage: + raise RuntimeError("list_campaigns boom") + + async with campaign_watcher_lifespan( + kernel, list_campaigns=failing_list_campaigns, interval_seconds=0.01 + ): + await asyncio.sleep(0.05) + + +@pytest.mark.unit +def test_campaign_watcher_tick_seconds_rejects_sub_floor() -> None: + with pytest.raises(ValueError, match="campaign_watcher_tick_seconds"): + Settings(campaign_watcher_tick_seconds=0.05) # type: ignore[call-arg] + + +@pytest.mark.unit +def test_campaign_watcher_stale_after_rejects_non_positive() -> None: + with pytest.raises(ValueError, match="campaign_watcher_stale_after_seconds"): + Settings(campaign_watcher_stale_after_seconds=0.0) # type: ignore[call-arg] + + +@pytest.mark.unit +def test_campaign_watcher_settings_accept_valid() -> None: + settings = Settings( # type: ignore[call-arg] + campaign_watcher_tick_seconds=120.0, + campaign_watcher_stale_after_seconds=86400.0, + ) + assert settings.campaign_watcher_tick_seconds == 120.0 + assert settings.campaign_watcher_stale_after_seconds == 86400.0 diff --git a/apps/api/tests/unit/decision/test_campaign_progress_vocab.py b/apps/api/tests/unit/decision/test_campaign_progress_vocab.py new file mode 100644 index 0000000000..48298717dc --- /dev/null +++ b/apps/api/tests/unit/decision/test_campaign_progress_vocab.py @@ -0,0 +1,70 @@ +"""Tests for the CampaignProgress Decision vocabulary. + +Covers the DECISION_CONTEXT_CAMPAIGN_PROGRESS context constant, the closed +CAMPAIGN_PROGRESS_CHOICES set, its parity with the CampaignProgressChoice +Literal, and that `Stuck` does not collide in the shared, globally-filtered +DecisionChoice projection column. The CampaignWatcher agent is flag-only (one +Decision per stuck-Held episode); the context noun is `CampaignProgress` (the +lifecycle dimension) while the agent kind is `CampaignWatcher` (a deliberate +context-noun-vs-doer asymmetry, not drift). + +The disjointness check unions EVERY closed sibling choice set, including +REACTION_DISMISSAL_CHOICES (imported from the submodule because it is not +re-exported from the package). +""" + +from typing import get_args + +import pytest + +from cora.decision.aggregates.decision import ( + CALIBRATION_VERIFICATION_CHOICES, + CAMPAIGN_PROGRESS_CHOICES, + CAUTION_PROMOTION_CHOICES, + CAUTION_PROPOSAL_CHOICES, + CLEARANCE_EXPIRY_CHOICES, + CLEARANCE_PROGRESS_CHOICES, + DECISION_CONTEXT_CAMPAIGN_PROGRESS, + PROCEDURE_PROGRESS_CHOICES, + RUN_DEBRIEF_CHOICES, + RUN_SUPERVISION_CHOICES, + CampaignProgressChoice, +) +from cora.decision.aggregates.decision.state import REACTION_DISMISSAL_CHOICES + + +@pytest.mark.unit +def test_decision_context_campaign_progress_constant() -> None: + assert DECISION_CONTEXT_CAMPAIGN_PROGRESS == "CampaignProgress" + + +@pytest.mark.unit +def test_campaign_progress_choices_closed_set() -> None: + assert frozenset({"Stuck"}) == CAMPAIGN_PROGRESS_CHOICES + + +@pytest.mark.unit +def test_campaign_progress_choices_match_literal() -> None: + """The frozenset and the Literal stay in lockstep.""" + assert frozenset(get_args(CampaignProgressChoice)) == CAMPAIGN_PROGRESS_CHOICES + + +@pytest.mark.unit +def test_campaign_progress_choice_is_unique_in_shared_namespace() -> None: + """`Stuck` does not collide with any sibling context's choice values in the + globally-filtered DecisionChoice projection column (Flag / Stale / Stall / + Expire / EventDismissed / ...). naming-r3 chose `Stuck` precisely because + `Stall` was already taken by ProcedureProgress.""" + siblings = ( + CALIBRATION_VERIFICATION_CHOICES + | CAUTION_PROMOTION_CHOICES + | CAUTION_PROPOSAL_CHOICES + | CLEARANCE_EXPIRY_CHOICES + | CLEARANCE_PROGRESS_CHOICES + | PROCEDURE_PROGRESS_CHOICES + | REACTION_DISMISSAL_CHOICES + | RUN_DEBRIEF_CHOICES + | RUN_SUPERVISION_CHOICES + ) + assert CAMPAIGN_PROGRESS_CHOICES.isdisjoint(siblings) + assert "Stuck" in CAMPAIGN_PROGRESS_CHOICES diff --git a/docs/architecture/modules/agent/index.md b/docs/architecture/modules/agent/index.md index 5b1689162b..20f45e64b7 100644 --- a/docs/architecture/modules/agent/index.md +++ b/docs/architecture/modules/agent/index.md @@ -1,4 +1,4 @@ -# Agent module stable +# Agent module stable ## Purpose & Scope @@ -16,13 +16,13 @@ An Agent carries five roles: ### The agent fleet -Eight agents are seeded today. They split two ways. By **how they decide**: the two LLM agents (RunDebriefer, CautionDrafter) call a model; the six deterministic agents (RunSupervisor, CautionPromoter, ClearanceExpirer, ClearanceWatcher, CalibrationWatcher, ProcedureWatcher) apply a fixed rule and carry a sentinel `model_ref` with no prompt template. By **what they do**: passive agents only advise (they write a Decision and stop); active agents decide and then act, but only by issuing an existing spine command through the same authorized path a human uses, so the resulting record is byte-identical whether a human or the agent acted. +Nine agents are seeded today. They split two ways. By **how they decide**: the two LLM agents (RunDebriefer, CautionDrafter) call a model; the seven deterministic agents (RunSupervisor, CautionPromoter, ClearanceExpirer, ClearanceWatcher, CalibrationWatcher, ProcedureWatcher, CampaignWatcher) apply a fixed rule and carry a sentinel `model_ref` with no prompt template. By **what they do**: passive agents only advise (they write a Decision and stop); active agents decide and then act, but only by issuing an existing spine command through the same authorized path a human uses, so the resulting record is byte-identical whether a human or the agent acted. The runtime that drives an agent takes one of three host shapes: - **On-demand slice.** A REST/MCP call invokes the agent directly (`regenerate_run_debrief`). - **Event-triggered subscriber.** A subscriber in the projection worker reacts to a domain event: RunDebriefer and CautionDrafter on a terminal Run, CautionPromoter on a registered `CautionProposal` Decision. -- **Composition-root periodic loop.** A background task sweeps on a timer: RunSupervisor watches in-flight Runs and issues `hold_run` / `resume_run` / `stop_run` as facility beam is lost and returns; ClearanceExpirer sweeps Active clearances and issues `expire_clearance` once a validity window has passed; ClearanceWatcher watches front-of-lifecycle clearances (Submitted, UnderReview, Approved) and records a flag Decision when one stalls past an operator window; CalibrationWatcher watches Provisional calibrations and records a flag Decision when one's newest revision has sat unverified past an operator window; ProcedureWatcher watches in-conduct procedures (Running, Held) and records a flag Decision when one has sat past an operator window without progressing, folding in the latest activity recency first so an actively-logging Running conduct is not falsely flagged. +- **Composition-root periodic loop.** A background task sweeps on a timer: RunSupervisor watches in-flight Runs and issues `hold_run` / `resume_run` / `stop_run` as facility beam is lost and returns; ClearanceExpirer sweeps Active clearances and issues `expire_clearance` once a validity window has passed; ClearanceWatcher watches front-of-lifecycle clearances (Submitted, UnderReview, Approved) and records a flag Decision when one stalls past an operator window; CalibrationWatcher watches Provisional calibrations and records a flag Decision when one's newest revision has sat unverified past an operator window; ProcedureWatcher watches in-conduct procedures (Running, Held) and records a flag Decision when one has sat past an operator window without progressing, folding in the latest activity recency first so an actively-logging Running conduct is not falsely flagged; CampaignWatcher watches Held campaigns and records a flag Decision when one has sat operator-paused past an operator window without being resumed or closed. | Agent | Decides | Host | Acts | |---|---|---|---| @@ -34,8 +34,9 @@ The runtime that drives an agent takes one of three host shapes: | ClearanceWatcher | deterministic | periodic loop | writes a `ClearanceProgress` flag Decision (passive) | | CalibrationWatcher | deterministic | periodic loop | writes a `CalibrationVerification` (Stale) flag Decision (passive) | | ProcedureWatcher | deterministic | periodic loop | writes a `ProcedureProgress` (Stall) flag Decision (passive) | +| CampaignWatcher | deterministic | periodic loop | writes a `CampaignProgress` (Stuck) flag Decision (passive) | -The active runtimes (RunSupervisor, CautionPromoter, ClearanceExpirer) ship off by default, gate every actuation through the Authorize port like any principal, and stand down the moment their Actor is deactivated. None of them reaches past the spine onto the real-time floor: an active agent only issues a command the spine already exposes. ClearanceWatcher, CalibrationWatcher, and ProcedureWatcher are passive (each records a flag Decision and issues no command) and likewise ship off by default. RunSupervisor additionally carries shadow observe-only rules (run-liveness, plus signal-quality and signal-stall against a live Run's observation channels) that log a would-flag and take no further action; each is a separate opt-in above the agent's own enable, and advise / act promotions are deferred. +The active runtimes (RunSupervisor, CautionPromoter, ClearanceExpirer) ship off by default, gate every actuation through the Authorize port like any principal, and stand down the moment their Actor is deactivated. None of them reaches past the spine onto the real-time floor: an active agent only issues a command the spine already exposes. ClearanceWatcher, CalibrationWatcher, ProcedureWatcher, and CampaignWatcher are passive (each records a flag Decision and issues no command) and likewise ship off by default. RunSupervisor additionally carries shadow observe-only rules (run-liveness, plus signal-quality and signal-stall against a live Run's observation channels) that log a would-flag and take no further action; each is a separate opt-in above the agent's own enable, and advise / act promotions are deferred.