Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/api/src/cora/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
CALIBRATION_WATCHER_AGENT_ID,
seed_calibration_watcher_agent,
)
from cora.agent.seed_campaign_watcher import (
CAMPAIGN_WATCHER_AGENT_ID,
seed_campaign_watcher_agent,
)
from cora.agent.seed_caution_drafter import seed_caution_drafter_agent
from cora.agent.seed_caution_promoter import (
CAUTION_PROMOTER_AGENT_ID,
Expand All @@ -67,6 +71,7 @@

__all__ = [
"CALIBRATION_WATCHER_AGENT_ID",
"CAMPAIGN_WATCHER_AGENT_ID",
"CAUTION_PROMOTER_AGENT_ID",
"CLEARANCE_EXPIRER_AGENT_ID",
"CLEARANCE_WATCHER_AGENT_ID",
Expand All @@ -84,6 +89,7 @@
"register_agent_subscribers",
"register_agent_tools",
"seed_calibration_watcher_agent",
"seed_campaign_watcher_agent",
"seed_caution_drafter_agent",
"seed_caution_promoter_agent",
"seed_clearance_expirer_agent",
Expand Down
104 changes: 104 additions & 0 deletions apps/api/src/cora/agent/seed_campaign_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Bootstrap-time seed for the CampaignWatcher Agent.

The CampaignWatcher runtime (CORA's 9th seeded agent, a composition-root periodic
loop, deterministic flag-only) needs an Agent record (and its co-registered
Actor) at the pinned `CAMPAIGN_WATCHER_AGENT_ID` so it can author CampaignProgress
Decisions (`decided_by = ActorId(CAMPAIGN_WATCHER_AGENT_ID)`) as an agent-kind
principal. Mirrors `cora.agent.seed_procedure_watcher` except for the per-agent
constants; the shared scaffolding lives in `cora.agent._agent_seed`, and the
runtime mechanics in `cora.api._flag_watcher`.

Per [[project-campaign-watcher-design]]:
- Pinned UUID in the `cab1` block (ninth seeded identity, sibling to
RunDebriefer `aaaa00XX`, CautionDrafter `bbbb00XX`, RunSupervisor
`cccc00XX`, CautionPromoter `dddd00XX`, ClearanceExpirer `eeee00XX`,
ClearanceWatcher `ffff00XX`, CalibrationWatcher `ca1100XX`, ProcedureWatcher
`0c0c00XX`); deployment-stable forever. `cab1` is distinct from calibration's
`ca11`.
- DETERMINISTIC agent (rule-based, NOT LLM): no prompt template and a sentinel
`ModelRef` (`provider="deterministic"`), never used to build an LLM (the
runtime is a periodic staleness comparison over Held campaigns).
- FLAG-ONLY: the runtime issues NO command. It records one
Decision(context=CampaignProgress, choice=Stuck) per stuck-Held episode for a
human to act on. Permission to record Decisions is granted at agent-definition
time (the RunDebriefer stance); it issues no write command, though it does
issue an authz-gated ListCampaigns read each tick.
"""

from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from cora.agent._agent_seed import AgentSeedIdentity, seed_agent
from cora.agent.aggregates.agent import ModelRef

if TYPE_CHECKING:
from cora.infrastructure.kernel import Kernel


# ---------------------------------------------------------------------------
# CampaignWatcher agent identity (deployment-stable constants)
# ---------------------------------------------------------------------------

# Treat as FOREVER-STABLE. Same change-cost rationale as the other seeded
# agents: changing this orphans every prior CampaignWatcher-authored Decision
# (their actor_id pointers go stale). UUID is in the deployment-controlled
# `cab1` block (ninth seeded identity).
CAMPAIGN_WATCHER_AGENT_ID = UUID("01900000-0000-7000-8000-0000cab10010")
CAMPAIGN_WATCHER_AGENT_NAME = "CampaignWatcher"
CAMPAIGN_WATCHER_AGENT_KIND = "CampaignWatcher"
CAMPAIGN_WATCHER_AGENT_VERSION = "1.0.0"
CAMPAIGN_WATCHER_AGENT_DESCRIPTION = (
"Deterministic in-loop agent: watches Held campaigns (operator-paused) and "
"records one Decision(context=CampaignProgress, choice=Stuck) per campaign "
"that has sat Held past the staleness window without being resumed or closed. "
"Flag-only (advise a human); issues no command."
)


# Sentinel model ref: CampaignWatcher is rule-based, not an LLM agent. The Agent
# aggregate requires a ModelRef; this value is never used to build an LLM (the
# runtime is a periodic staleness comparison, no build_llm call).
_DETERMINISTIC_MODEL_REF = ModelRef(
provider="deterministic",
model="agent:CampaignWatcher:v1",
snapshot_pin=None,
)


# ---------------------------------------------------------------------------
# Deterministic IDs for the bootstrap write envelope
# ---------------------------------------------------------------------------

_AGENT_EVENT_ID = UUID("01900000-0000-7000-8000-0000cab10012")
_ACTOR_EVENT_ID = UUID("01900000-0000-7000-8000-0000cab10013")
_BOOTSTRAP_CORRELATION_ID = UUID("01900000-0000-7000-8000-0000cab10014")


async def seed_campaign_watcher_agent(kernel: Kernel) -> None:
"""Seed the CampaignWatcher Agent + co-registered Actor (idempotent)."""
identity = AgentSeedIdentity(
agent_id=CAMPAIGN_WATCHER_AGENT_ID,
name=CAMPAIGN_WATCHER_AGENT_NAME,
kind=CAMPAIGN_WATCHER_AGENT_KIND,
version=CAMPAIGN_WATCHER_AGENT_VERSION,
description=CAMPAIGN_WATCHER_AGENT_DESCRIPTION,
model_ref=_DETERMINISTIC_MODEL_REF,
prompt_template_id=None,
agent_event_id=_AGENT_EVENT_ID,
actor_event_id=_ACTOR_EVENT_ID,
correlation_id=_BOOTSTRAP_CORRELATION_ID,
command_name="SeedCampaignWatcherAgent",
)
await seed_agent(kernel, identity)


__all__ = [
"CAMPAIGN_WATCHER_AGENT_DESCRIPTION",
"CAMPAIGN_WATCHER_AGENT_ID",
"CAMPAIGN_WATCHER_AGENT_KIND",
"CAMPAIGN_WATCHER_AGENT_NAME",
"CAMPAIGN_WATCHER_AGENT_VERSION",
"seed_campaign_watcher_agent",
]
206 changes: 206 additions & 0 deletions apps/api/src/cora/api/_campaign_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""CampaignWatcher runtime: the 9th seeded agent (deterministic flag-only).

A periodic background task, hosted at the composition root (`cora.api`) because
it reads the Campaign BC AND composes Decision BC events; only `cora.api` may
depend on both (same placement rationale as `_clearance_watcher`,
`_calibration_watcher`, `_procedure_watcher`). The agent-invariant mechanics (the
staleness rule, the per-episode Decision id, the DecisionRegistered envelope, and
the periodic loop / lifespan) live in `cora.api._flag_watcher`; this module owns
only what is specific to campaigns. See [[project-campaign-watcher-design]].

## What v1 does

Each tick it lists `Held` campaigns (operator-paused), selects those whose
`last_status_changed_at` (the time they were held) has sat past the
operator-config staleness window without being resumed or closed, and records one
`Decision(context=CampaignProgress, choice=Stuck)` per stuck EPISODE. It is
FLAG-ONLY: it issues NO command (it surfaces the forgotten pause so a human
resumes or closes the campaign). v1 watches only `Held`; `Planned` (a campaign
that has legitimately not started yet) is deferred to a later variant with its
own window.

## No activity fold needed

Unlike `_procedure_watcher` (whose Running state churns activity without advancing
the status timestamp), `Held` makes no run-EXECUTION progress: `last_status_changed_at`
(set on `CampaignHeld`, advanced only by `resume_campaign` / `close_campaign`) is
the correct clock and no recency fold is required. Membership curation
(`add_run_to_campaign` / remove) is permitted while Held but deliberately touches
only `run_count`, never the status clock, so a long forgotten Hold still flags. A
defensive `status == Held` re-check guards a future filter widening (mirrors
`_calibration_watcher`).

## Idempotency / edge-trigger: one Stuck flag per episode

The Decision id is `uuid5(namespace, "decision:{campaign_id}:{last_status_changed_at}")`,
so re-ticks while the campaign is still Held hit the same id (`ConcurrencyError`
no-op). `resume_campaign` advances `last_status_changed_at` (and leaves Held), and
`close_campaign` / `abandon_campaign` leave the Held filter, so a resumed campaign
that is re-held opens a fresh episode.

## Fail-safe and bounded

Flag-only and reversible by construction: the worst outcome is an advisory
Decision a human can ignore. The runtime gates on `Actor.active`, so deactivating
the agent Actor stops it. Off by default (`settings.campaign_watcher_enabled`).
"""

from __future__ import annotations

import contextlib
from typing import TYPE_CHECKING
from uuid import UUID

from cora.access.aggregates.actor import load_actor
from cora.agent.seed_campaign_watcher import CAMPAIGN_WATCHER_AGENT_ID
from cora.api._flag_watcher import (
derive_watcher_decision_id,
flag_watcher_lifespan,
is_stalled,
record_watcher_decision,
)
from cora.campaign.features.list_campaigns import ListCampaigns
from cora.decision.aggregates.decision import DECISION_CONTEXT_CAMPAIGN_PROGRESS
from cora.infrastructure.routing import NIL_SENTINEL_ID

if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from datetime import datetime

from cora.campaign.features.list_campaigns import CampaignSummaryItem
from cora.campaign.features.list_campaigns.handler import Handler as ListCampaignsHandler
from cora.infrastructure.kernel import Kernel

_LOG_PREFIX = "campaign_watcher"
_RULE = "agent:CampaignWatcher:v1"
_COMMAND_NAME = "CampaignWatcherTick"
_PAGE_LIMIT = 100
_CHOICE_STUCK = "Stuck"
_STATUS_HELD = "Held"

# Stable namespace for deriving the deterministic Decision id from the campaign
# id + the held-episode timestamp (cab1 block, distinct from the seed envelope
# ids), so one Stuck flag is written per stuck episode and re-ticks are no-ops.
_DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-0000cab10002")


def _derive_decision_id(campaign_id: UUID, last_status_changed_at: datetime) -> UUID:
"""Deterministic CampaignProgress Decision id for one stuck episode."""
return derive_watcher_decision_id(_DECISION_NAMESPACE, campaign_id, last_status_changed_at)


async def _record_decision(
deps: Kernel,
*,
campaign_id: UUID,
name: str,
last_status_changed_at: datetime,
now: datetime,
) -> None:
"""Append one DecisionRegistered(context=CampaignProgress, choice=Stuck)."""
stuck_seconds = int((now - last_status_changed_at).total_seconds())
await record_watcher_decision(
deps,
agent_id=CAMPAIGN_WATCHER_AGENT_ID,
context=DECISION_CONTEXT_CAMPAIGN_PROGRESS,
choice=_CHOICE_STUCK,
rule=_RULE,
command_name=_COMMAND_NAME,
decision_id=_derive_decision_id(campaign_id, last_status_changed_at),
entity_id=campaign_id,
now=now,
reasoning=(
f"Campaign ({name}) has sat Held for {stuck_seconds}s without being resumed "
"or closed (past the staleness window); surfaced for operator follow-up."
),
inputs={
"campaign_id": str(campaign_id),
"name": name,
"last_status_changed_at": last_status_changed_at.isoformat(),
"stuck_seconds": str(stuck_seconds),
"occurred_at": now.isoformat(),
},
log_prefix=_LOG_PREFIX,
)


async def _drain_held_campaigns(
list_campaigns: ListCampaignsHandler, deps: Kernel
) -> list[CampaignSummaryItem]:
"""Page through list_campaigns filtered to status Held."""
items: list[CampaignSummaryItem] = []
cursor: str | None = None
while True:
page = await list_campaigns(
ListCampaigns(statuses=[_STATUS_HELD], cursor=cursor, limit=_PAGE_LIMIT),
principal_id=CAMPAIGN_WATCHER_AGENT_ID,
correlation_id=deps.id_generator.new_id(),
surface_id=NIL_SENTINEL_ID,
)
items.extend(page.items)
if page.next_cursor is None:
break
cursor = page.next_cursor
return items


async def _watch_tick(
*,
deps: Kernel,
list_campaigns: ListCampaignsHandler,
) -> None:
"""One watch sweep over all Held campaigns."""
actor = await load_actor(deps.event_store, CAMPAIGN_WATCHER_AGENT_ID)
if actor is None or not actor.active:
# Agent not seeded yet, or deactivated by an operator: stand down.
return

now = deps.clock.now()
stale_after = deps.settings.campaign_watcher_stale_after_seconds
for item in await _drain_held_campaigns(list_campaigns, deps):
# Defensive: the drain filters to Held, but re-check so a future filter
# change cannot widen what gets flagged.
if item.status != _STATUS_HELD:
continue
if item.last_status_changed_at is None:
# No status-change timestamp recorded: cannot evaluate; defer.
continue
if not is_stalled(item.last_status_changed_at, now, stale_after):
continue
await _record_decision(
deps,
campaign_id=item.campaign_id,
name=item.name,
last_status_changed_at=item.last_status_changed_at,
now=now,
)


@contextlib.asynccontextmanager
async def campaign_watcher_lifespan(
deps: Kernel,
*,
list_campaigns: ListCampaignsHandler,
interval_seconds: float | None = None,
) -> AsyncGenerator[None]:
"""Spawn the CampaignWatcher loop for the duration of the context.

No-op unless `settings.campaign_watcher_enabled` is True (default off, so a
deployment opts in explicitly).
"""

async def tick() -> None:
await _watch_tick(deps=deps, list_campaigns=list_campaigns)

async with flag_watcher_lifespan(
enabled=deps.settings.campaign_watcher_enabled,
default_tick_seconds=deps.settings.campaign_watcher_tick_seconds,
log_prefix=_LOG_PREFIX,
task_name="campaign-watcher",
tick=tick,
interval_seconds=interval_seconds,
):
yield


__all__ = ["campaign_watcher_lifespan", "is_stalled"]
8 changes: 8 additions & 0 deletions apps/api/src/cora/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
register_agent_subscribers,
register_agent_tools,
seed_calibration_watcher_agent,
seed_campaign_watcher_agent,
seed_caution_drafter_agent,
seed_caution_promoter_agent,
seed_clearance_expirer_agent,
Expand All @@ -65,6 +66,7 @@
wire_agent,
)
from cora.api._calibration_watcher import calibration_watcher_lifespan
from cora.api._campaign_watcher import campaign_watcher_lifespan
from cora.api._clearance_expirer import clearance_expirer_lifespan
from cora.api._clearance_watcher import clearance_watcher_lifespan
from cora.api._compute_runtime import ComputeRuntime
Expand Down Expand Up @@ -804,6 +806,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
await seed_calibration_watcher_agent(deps)
# same shape for ProcedureWatcher (deterministic flag-only agent).
await seed_procedure_watcher_agent(deps)
# same shape for CampaignWatcher (deterministic flag-only agent).
await seed_campaign_watcher_agent(deps)

# Drain Federation-owned projections so the Postgres-backed
# FacilityLookup.list_active() resolves the self-Facility row
Expand Down Expand Up @@ -884,6 +888,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
deps,
list_procedures=app.state.operation.list_procedures,
),
campaign_watcher_lifespan(
deps,
list_campaigns=app.state.campaign.list_campaigns,
),
):
yield
finally:
Expand Down
Loading
Loading