From 87630d743125c46023b0e61787b39c4cdae1ac6f Mon Sep 17 00:00:00 2001 From: Doga Gursoy Date: Mon, 22 Jun 2026 11:35:53 +0300 Subject: [PATCH] refactor(agent): extract the shared flag-only-watcher scaffold The three deterministic flag-only watchers (ClearanceWatcher, CalibrationWatcher, ProcedureWatcher) had fired the rule-of-three: each was a near-clone carrying the same agent-invariant mechanics. This hoists those into a new cora.api._flag_watcher module: - is_stalled: the pure staleness comparison (inclusive >= boundary). - derive_watcher_decision_id: the per-episode deterministic id (uuid5(namespace, "decision:{entity_id}:{episode_at}")) that makes a re-flag of the same stall episode a ConcurrencyError no-op. - record_watcher_decision: the DecisionRegistered envelope + idempotent append. - flag_watcher_lifespan: the off-by-default gate, the periodic loop (a failed tick is logged and retried, cancellation propagates), and task teardown. Each watcher keeps what genuinely differs per agent: its drain (which list query + status filter), its recency fold (clearance UnderReview review-step; procedure Running activity recency; calibration none), its clock source, its Decision vocabulary, and its namespace UUID. Each also keeps a thin per-agent _record_decision / _derive_decision_id / is_stalled surface delegating to the scaffold, so behavior (and every existing test) is unchanged: all 56 watcher unit tests pass verbatim, the behavior-preservation proof. naming: the shared envelope/id helpers are record_watcher_decision / derive_watcher_decision_id, not *_flag_* -- "flag" is the agent-family adjective (flag-only watcher), but a Decision's choice can literally be "Flag" (owned by ClearanceProgress), so it must not modify "decision". A new test_flag_watcher.py pins the loop's cancel-propagation contract. Co-Authored-By: Claude Opus 4.8 --- apps/api/src/cora/api/_calibration_watcher.py | 167 ++++----------- apps/api/src/cora/api/_clearance_watcher.py | 169 ++++----------- apps/api/src/cora/api/_flag_watcher.py | 194 ++++++++++++++++++ apps/api/src/cora/api/_procedure_watcher.py | 171 +++++---------- apps/api/tests/unit/api/test_flag_watcher.py | 41 ++++ 5 files changed, 371 insertions(+), 371 deletions(-) create mode 100644 apps/api/src/cora/api/_flag_watcher.py create mode 100644 apps/api/tests/unit/api/test_flag_watcher.py diff --git a/apps/api/src/cora/api/_calibration_watcher.py b/apps/api/src/cora/api/_calibration_watcher.py index 7c5f46bf8a..895d1e8abb 100644 --- a/apps/api/src/cora/api/_calibration_watcher.py +++ b/apps/api/src/cora/api/_calibration_watcher.py @@ -3,8 +3,10 @@ A periodic background task, hosted at the composition root (`cora.api`) because it reads the Calibration BC AND composes Decision BC events; only `cora.api` may depend on both (same placement rationale as `_clearance_watcher`, -`_clearance_expirer`, `_run_supervisor`). See -[[project-calibration-watcher-design]]. +`_clearance_expirer`, `_run_supervisor`). The agent-invariant mechanics (the +staleness rule, the per-episode Decision id, the DecisionRegistered envelope, and +the periodic loop / lifespan) live in `cora.api._flag_watcher`; this module owns +only what is specific to calibrations. See [[project-calibration-watcher-design]]. ## What v1 does @@ -35,32 +37,21 @@ from __future__ import annotations -import asyncio import contextlib from typing import TYPE_CHECKING -from uuid import UUID, uuid5 +from uuid import UUID from cora.access.aggregates.actor import load_actor from cora.agent.seed_calibration_watcher import CALIBRATION_WATCHER_AGENT_ID -from cora.calibration.features.list_calibrations import ListCalibrations -from cora.decision.aggregates.decision import ( - DECISION_CONTEXT_CALIBRATION_VERIFICATION, - DecisionChoice, - DecisionConfidenceSource, - DecisionContext, - DecisionRegistered, - DecisionRule, - event_type_name, - to_payload, - validate_confidence, - validate_inputs, - validate_reasoning, +from cora.api._flag_watcher import ( + derive_watcher_decision_id, + flag_watcher_lifespan, + is_stalled, + record_watcher_decision, ) -from cora.infrastructure.event_envelope import to_new_event -from cora.infrastructure.logging import get_logger -from cora.infrastructure.ports import ConcurrencyError +from cora.calibration.features.list_calibrations import ListCalibrations +from cora.decision.aggregates.decision import DECISION_CONTEXT_CALIBRATION_VERIFICATION from cora.infrastructure.routing import NIL_SENTINEL_ID -from cora.shared.identity import ActorId if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -72,11 +63,9 @@ ) from cora.infrastructure.kernel import Kernel -_log = get_logger(__name__) - +_LOG_PREFIX = "calibration_watcher" _RULE = "agent:CalibrationWatcher:v1" _COMMAND_NAME = "CalibrationWatcherTick" -_STREAM_TYPE = "Decision" _PAGE_LIMIT = 100 _CHOICE_STALE = "Stale" _STATUS_PROVISIONAL = "Provisional" @@ -88,18 +77,9 @@ _DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-0000ca110002") -def is_stalled(last_revised_at: datetime, now: datetime, stale_after_seconds: float) -> bool: - """Pure rule: a Provisional calibration is stale once its newest revision has - sat past the staleness window without verification. - - Inclusive boundary: elapsed == window FLAGS (`>=`). - """ - return (now - last_revised_at).total_seconds() >= stale_after_seconds - - def _derive_decision_id(calibration_id: UUID, last_revised_at: datetime) -> UUID: """Deterministic CalibrationVerification Decision id for one stale episode.""" - return uuid5(_DECISION_NAMESPACE, f"decision:{calibration_id}:{last_revised_at.isoformat()}") + return derive_watcher_decision_id(_DECISION_NAMESPACE, calibration_id, last_revised_at) async def _record_decision( @@ -110,64 +90,32 @@ async def _record_decision( last_revised_at: datetime, now: datetime, ) -> None: - """Append one DecisionRegistered(context=CalibrationVerification, choice=Stale). - - Idempotent: the deterministic id makes a re-flag of the same stale episode a - ConcurrencyError no-op (mirrors `_clearance_watcher._record_decision`). - """ - decision_id = _derive_decision_id(calibration_id, last_revised_at) + """Append one DecisionRegistered(context=CalibrationVerification, choice=Stale).""" stale_seconds = int((now - last_revised_at).total_seconds()) - domain_event = DecisionRegistered( - decision_id=decision_id, - decided_by=ActorId(CALIBRATION_WATCHER_AGENT_ID), - context=DecisionContext(DECISION_CONTEXT_CALIBRATION_VERIFICATION).value, - choice=DecisionChoice(_CHOICE_STALE).value, - parent_id=None, - override_kind=None, - rule=DecisionRule(_RULE).value, - reasoning=validate_reasoning( + await record_watcher_decision( + deps, + agent_id=CALIBRATION_WATCHER_AGENT_ID, + context=DECISION_CONTEXT_CALIBRATION_VERIFICATION, + choice=_CHOICE_STALE, + rule=_RULE, + command_name=_COMMAND_NAME, + decision_id=_derive_decision_id(calibration_id, last_revised_at), + entity_id=calibration_id, + now=now, + reasoning=( f"Calibration ({quantity}) has had a Provisional revision unverified for " f"{stale_seconds}s (past the staleness window); surfaced for operator " "re-verification before a Run acquires against it." ), - confidence=validate_confidence(None), - confidence_source=DecisionConfidenceSource.SELF_REPORTED, - alternatives=(), - inputs=validate_inputs( - { - "calibration_id": str(calibration_id), - "quantity": quantity, - "last_revised_at": last_revised_at.isoformat(), - "stale_seconds": str(stale_seconds), - "occurred_at": now.isoformat(), - } - ), - reasoning_signature=None, - occurred_at=now, - ) - new_event = to_new_event( - event_type=event_type_name(domain_event), - payload=to_payload(domain_event), - occurred_at=now, - event_id=uuid5(decision_id, "event:0"), - command_name=_COMMAND_NAME, - correlation_id=deps.id_generator.new_id(), - causation_id=None, - principal_id=CALIBRATION_WATCHER_AGENT_ID, + inputs={ + "calibration_id": str(calibration_id), + "quantity": quantity, + "last_revised_at": last_revised_at.isoformat(), + "stale_seconds": str(stale_seconds), + "occurred_at": now.isoformat(), + }, + log_prefix=_LOG_PREFIX, ) - try: - await deps.event_store.append( - stream_type=_STREAM_TYPE, - stream_id=decision_id, - expected_version=0, - events=[new_event], - ) - except ConcurrencyError: - _log.info( - "calibration_watcher.decision_already_written", calibration_id=str(calibration_id) - ) - return - _log.info("calibration_watcher.flagged", calibration_id=str(calibration_id)) async def _drain_provisional_calibrations( @@ -223,22 +171,6 @@ async def _watch_tick( ) -async def _watch_loop( - deps: Kernel, - list_calibrations: ListCalibrationsHandler, - interval_seconds: float, -) -> None: - """Periodic watch loop. A failed tick is logged; the next tick retries.""" - while True: - try: - await _watch_tick(deps=deps, list_calibrations=list_calibrations) - except asyncio.CancelledError: - raise - except Exception: - _log.exception("calibration_watcher.tick_failed") - await asyncio.sleep(interval_seconds) - - @contextlib.asynccontextmanager async def calibration_watcher_lifespan( deps: Kernel, @@ -251,28 +183,19 @@ async def calibration_watcher_lifespan( No-op unless `settings.calibration_watcher_enabled` is True (default off, so a deployment opts in explicitly). """ - if not deps.settings.calibration_watcher_enabled: - _log.info("calibration_watcher.skipped", reason="disabled") - yield - return - interval = ( - interval_seconds - if interval_seconds is not None - else deps.settings.calibration_watcher_tick_seconds - ) - _log.info("calibration_watcher.started", interval_seconds=interval) - task = asyncio.create_task( - _watch_loop(deps, list_calibrations, interval), - name="calibration-watcher", - ) - try: + async def tick() -> None: + await _watch_tick(deps=deps, list_calibrations=list_calibrations) + + async with flag_watcher_lifespan( + enabled=deps.settings.calibration_watcher_enabled, + default_tick_seconds=deps.settings.calibration_watcher_tick_seconds, + log_prefix=_LOG_PREFIX, + task_name="calibration-watcher", + tick=tick, + interval_seconds=interval_seconds, + ): yield - finally: - task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await task - _log.info("calibration_watcher.stopped") __all__ = ["calibration_watcher_lifespan", "is_stalled"] diff --git a/apps/api/src/cora/api/_clearance_watcher.py b/apps/api/src/cora/api/_clearance_watcher.py index c9c23ea893..b0d2d71911 100644 --- a/apps/api/src/cora/api/_clearance_watcher.py +++ b/apps/api/src/cora/api/_clearance_watcher.py @@ -3,8 +3,11 @@ A periodic background task, hosted at the composition root (`cora.api`) because it reads the Safety BC AND composes Decision BC events; only `cora.api` may depend on both (same placement rationale as `_clearance_expirer`, -`_run_supervisor`, and `_enclosure_permit_observer`). See -[[project-clearance-watcher-design]]. +`_run_supervisor`, and `_enclosure_permit_observer`). The agent-invariant +mechanics (the staleness rule, the per-episode Decision id, the +DecisionRegistered envelope, and the periodic loop / lifespan) live in +`cora.api._flag_watcher`; this module owns only what is specific to clearances, +chiefly the UnderReview review-step fold. See [[project-clearance-watcher-design]]. ## What v1 does @@ -49,33 +52,22 @@ from __future__ import annotations -import asyncio import contextlib from typing import TYPE_CHECKING -from uuid import UUID, uuid5 +from uuid import UUID from cora.access.aggregates.actor import load_actor from cora.agent.seed_clearance_watcher import CLEARANCE_WATCHER_AGENT_ID -from cora.decision.aggregates.decision import ( - DECISION_CONTEXT_CLEARANCE_PROGRESS, - DecisionChoice, - DecisionConfidenceSource, - DecisionContext, - DecisionRegistered, - DecisionRule, - event_type_name, - to_payload, - validate_confidence, - validate_inputs, - validate_reasoning, +from cora.api._flag_watcher import ( + derive_watcher_decision_id, + flag_watcher_lifespan, + is_stalled, + record_watcher_decision, ) -from cora.infrastructure.event_envelope import to_new_event -from cora.infrastructure.logging import get_logger -from cora.infrastructure.ports import ConcurrencyError +from cora.decision.aggregates.decision import DECISION_CONTEXT_CLEARANCE_PROGRESS from cora.infrastructure.routing import NIL_SENTINEL_ID from cora.safety.features.get_clearance import GetClearance from cora.safety.features.list_clearances import ListClearances -from cora.shared.identity import ActorId if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -87,11 +79,9 @@ from cora.safety.features.list_clearances.handler import Handler as ListClearancesHandler from cora.safety.features.list_clearances.query import ClearanceStatusFilter -_log = get_logger(__name__) - +_LOG_PREFIX = "clearance_watcher" _RULE = "agent:ClearanceWatcher:v1" _COMMAND_NAME = "ClearanceWatcherTick" -_STREAM_TYPE = "Decision" _PAGE_LIMIT = 100 _CHOICE_FLAG = "Flag" _STATUS_UNDER_REVIEW = "UnderReview" @@ -107,18 +97,9 @@ _DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-0000ffff0002") -def is_stalled(last_progress_at: datetime, now: datetime, stale_after_seconds: float) -> bool: - """Pure rule: a front-of-lifecycle clearance is stalled once it has sat past - the staleness window without progress. - - Inclusive boundary: elapsed == window FLAGS (`>=`). - """ - return (now - last_progress_at).total_seconds() >= stale_after_seconds - - def _derive_decision_id(clearance_id: UUID, last_progress_at: datetime) -> UUID: """Deterministic ClearanceProgress Decision id for one stall episode.""" - return uuid5(_DECISION_NAMESPACE, f"decision:{clearance_id}:{last_progress_at.isoformat()}") + return derive_watcher_decision_id(_DECISION_NAMESPACE, clearance_id, last_progress_at) async def _record_decision( @@ -129,61 +110,31 @@ async def _record_decision( last_progress_at: datetime, now: datetime, ) -> None: - """Append one DecisionRegistered(context=ClearanceProgress, choice=Flag). - - Idempotent: the deterministic id makes a re-flag of the same stall episode a - ConcurrencyError no-op (mirrors `_clearance_expirer._record_decision`). - """ - decision_id = _derive_decision_id(clearance_id, last_progress_at) + """Append one DecisionRegistered(context=ClearanceProgress, choice=Flag).""" stalled_seconds = int((now - last_progress_at).total_seconds()) - domain_event = DecisionRegistered( - decision_id=decision_id, - decided_by=ActorId(CLEARANCE_WATCHER_AGENT_ID), - context=DecisionContext(DECISION_CONTEXT_CLEARANCE_PROGRESS).value, - choice=DecisionChoice(_CHOICE_FLAG).value, - parent_id=None, - override_kind=None, - rule=DecisionRule(_RULE).value, - reasoning=validate_reasoning( + await record_watcher_decision( + deps, + agent_id=CLEARANCE_WATCHER_AGENT_ID, + context=DECISION_CONTEXT_CLEARANCE_PROGRESS, + choice=_CHOICE_FLAG, + rule=_RULE, + command_name=_COMMAND_NAME, + decision_id=_derive_decision_id(clearance_id, last_progress_at), + entity_id=clearance_id, + now=now, + reasoning=( f"Clearance has been {status} for {stalled_seconds}s without progressing " "toward Active (past the staleness window); surfaced for operator follow-up." ), - confidence=validate_confidence(None), - confidence_source=DecisionConfidenceSource.SELF_REPORTED, - alternatives=(), - inputs=validate_inputs( - { - "clearance_id": str(clearance_id), - "status": status, - "last_progress_at": last_progress_at.isoformat(), - "stalled_seconds": str(stalled_seconds), - "occurred_at": now.isoformat(), - } - ), - reasoning_signature=None, - occurred_at=now, - ) - new_event = to_new_event( - event_type=event_type_name(domain_event), - payload=to_payload(domain_event), - occurred_at=now, - event_id=uuid5(decision_id, "event:0"), - command_name=_COMMAND_NAME, - correlation_id=deps.id_generator.new_id(), - causation_id=None, - principal_id=CLEARANCE_WATCHER_AGENT_ID, + inputs={ + "clearance_id": str(clearance_id), + "status": status, + "last_progress_at": last_progress_at.isoformat(), + "stalled_seconds": str(stalled_seconds), + "occurred_at": now.isoformat(), + }, + log_prefix=_LOG_PREFIX, ) - try: - await deps.event_store.append( - stream_type=_STREAM_TYPE, - stream_id=decision_id, - expected_version=0, - events=[new_event], - ) - except ConcurrencyError: - _log.info("clearance_watcher.decision_already_written", clearance_id=str(clearance_id)) - return - _log.info("clearance_watcher.flagged", clearance_id=str(clearance_id), status=status) async def _drain_watched_clearances( @@ -269,27 +220,6 @@ async def _watch_tick( ) -async def _watch_loop( - deps: Kernel, - list_clearances: ListClearancesHandler, - get_clearance: GetClearanceHandler, - interval_seconds: float, -) -> None: - """Periodic watch loop. A failed tick is logged; the next tick retries.""" - while True: - try: - await _watch_tick( - deps=deps, - list_clearances=list_clearances, - get_clearance=get_clearance, - ) - except asyncio.CancelledError: - raise - except Exception: - _log.exception("clearance_watcher.tick_failed") - await asyncio.sleep(interval_seconds) - - @contextlib.asynccontextmanager async def clearance_watcher_lifespan( deps: Kernel, @@ -303,28 +233,19 @@ async def clearance_watcher_lifespan( No-op unless `settings.clearance_watcher_enabled` is True (default off, so a deployment opts in explicitly). """ - if not deps.settings.clearance_watcher_enabled: - _log.info("clearance_watcher.skipped", reason="disabled") - yield - return - interval = ( - interval_seconds - if interval_seconds is not None - else deps.settings.clearance_watcher_tick_seconds - ) - _log.info("clearance_watcher.started", interval_seconds=interval) - task = asyncio.create_task( - _watch_loop(deps, list_clearances, get_clearance, interval), - name="clearance-watcher", - ) - try: + async def tick() -> None: + await _watch_tick(deps=deps, list_clearances=list_clearances, get_clearance=get_clearance) + + async with flag_watcher_lifespan( + enabled=deps.settings.clearance_watcher_enabled, + default_tick_seconds=deps.settings.clearance_watcher_tick_seconds, + log_prefix=_LOG_PREFIX, + task_name="clearance-watcher", + tick=tick, + interval_seconds=interval_seconds, + ): yield - finally: - task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await task - _log.info("clearance_watcher.stopped") __all__ = ["clearance_watcher_lifespan", "is_stalled"] diff --git a/apps/api/src/cora/api/_flag_watcher.py b/apps/api/src/cora/api/_flag_watcher.py new file mode 100644 index 0000000000..718f90c7f9 --- /dev/null +++ b/apps/api/src/cora/api/_flag_watcher.py @@ -0,0 +1,194 @@ +"""Shared scaffold for the deterministic flag-only watcher agents. + +Three near-identical composition-root watchers (ClearanceWatcher, +CalibrationWatcher, ProcedureWatcher) fired the rule-of-three: each is a periodic +loop that drains a list query, clocks each candidate against an operator +staleness window, and records ONE flag `Decision` per stall EPISODE, issuing no +command. The agent-INVARIANT mechanics live here; what genuinely differs per +agent (which list query it drains, which timestamp clocks it, whether it folds in +a recency signal to avoid false-flagging an active entity, and the Decision +context/choice vocabulary) stays in each watcher's own `_watch_tick`. + +This module owns four invariants: + +- `is_stalled` -- the pure staleness comparison (inclusive `>=` boundary). +- `derive_watcher_decision_id` -- the per-episode deterministic id + (`uuid5(namespace, "decision:{entity_id}:{episode_at}")`) that makes a re-flag + of the same stall episode a `ConcurrencyError` no-op. +- `record_watcher_decision` -- the `DecisionRegistered` envelope + append, including + the idempotent ConcurrencyError swallow. The caller passes the per-agent + vocabulary (context / choice / rule) and the already-formatted reasoning + + inputs (which carry entity-specific fields). +- `flag_watcher_lifespan` -- the off-by-default gate, the periodic loop (a failed + tick is logged and retried, cancellation propagates), and task teardown. The + caller passes its `tick` closure (which closes over its own query handlers). + +Each watcher keeps a thin per-agent `_record_decision` / `_derive_decision_id` / +`is_stalled` surface delegating here, so its behavior (and its tests) are +unchanged by the extraction. +""" + +from __future__ import annotations + +import asyncio +import contextlib +from typing import TYPE_CHECKING +from uuid import UUID, uuid5 + +from cora.decision.aggregates.decision import ( + DecisionChoice, + DecisionConfidenceSource, + DecisionContext, + DecisionRegistered, + DecisionRule, + event_type_name, + to_payload, + validate_confidence, + validate_inputs, + validate_reasoning, +) +from cora.infrastructure.event_envelope import to_new_event +from cora.infrastructure.logging import get_logger +from cora.infrastructure.ports import ConcurrencyError +from cora.shared.identity import ActorId + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Awaitable, Callable + from datetime import datetime + + from cora.infrastructure.kernel import Kernel + +_log = get_logger(__name__) + +_STREAM_TYPE = "Decision" + + +def is_stalled(at: datetime, now: datetime, stale_after_seconds: float) -> bool: + """Pure rule: an entity is stalled once it has sat past the staleness window + without progress. + + Inclusive boundary: elapsed == window FLAGS (`>=`). + """ + return (now - at).total_seconds() >= stale_after_seconds + + +def derive_watcher_decision_id(namespace: UUID, entity_id: UUID, episode_at: datetime) -> UUID: + """Deterministic flag-Decision id for one stall episode of one entity.""" + return uuid5(namespace, f"decision:{entity_id}:{episode_at.isoformat()}") + + +async def record_watcher_decision( + deps: Kernel, + *, + agent_id: UUID, + context: str, + choice: str, + rule: str, + command_name: str, + decision_id: UUID, + entity_id: UUID, + now: datetime, + reasoning: str, + inputs: dict[str, str], + log_prefix: str, +) -> None: + """Append one DecisionRegistered flag for a stall episode. + + Idempotent: `decision_id` is the per-episode deterministic id (the caller + derives it via `derive_watcher_decision_id` with its own namespace), so a re-flag + of the same episode is a ConcurrencyError no-op. The caller owns the per-agent + vocabulary and the entity-specific reasoning / inputs; `entity_id` is carried + for the log line. + """ + domain_event = DecisionRegistered( + decision_id=decision_id, + decided_by=ActorId(agent_id), + context=DecisionContext(context).value, + choice=DecisionChoice(choice).value, + parent_id=None, + override_kind=None, + rule=DecisionRule(rule).value, + reasoning=validate_reasoning(reasoning), + confidence=validate_confidence(None), + confidence_source=DecisionConfidenceSource.SELF_REPORTED, + alternatives=(), + inputs=validate_inputs(inputs), + reasoning_signature=None, + occurred_at=now, + ) + new_event = to_new_event( + event_type=event_type_name(domain_event), + payload=to_payload(domain_event), + occurred_at=now, + event_id=uuid5(decision_id, "event:0"), + command_name=command_name, + correlation_id=deps.id_generator.new_id(), + causation_id=None, + principal_id=agent_id, + ) + try: + await deps.event_store.append( + stream_type=_STREAM_TYPE, + stream_id=decision_id, + expected_version=0, + events=[new_event], + ) + except ConcurrencyError: + _log.info(f"{log_prefix}.decision_already_written", entity_id=str(entity_id)) + return + _log.info(f"{log_prefix}.flagged", entity_id=str(entity_id)) + + +async def _watch_loop( + tick: Callable[[], Awaitable[None]], interval_seconds: float, log_prefix: str +) -> None: + """Periodic watch loop. A failed tick is logged; the next tick retries.""" + while True: + try: + await tick() + except asyncio.CancelledError: + raise + except Exception: + _log.exception(f"{log_prefix}.tick_failed") + await asyncio.sleep(interval_seconds) + + +@contextlib.asynccontextmanager +async def flag_watcher_lifespan( + *, + enabled: bool, + default_tick_seconds: float, + log_prefix: str, + task_name: str, + tick: Callable[[], Awaitable[None]], + interval_seconds: float | None = None, +) -> AsyncGenerator[None]: + """Spawn a flag-watcher loop for the duration of the context. + + No-op unless `enabled` is True (the watchers ship off by default, so a + deployment opts in explicitly). The caller supplies the `tick` closure and + its own settings-derived `enabled` / `default_tick_seconds`. + """ + if not enabled: + _log.info(f"{log_prefix}.skipped", reason="disabled") + yield + return + + interval = interval_seconds if interval_seconds is not None else default_tick_seconds + _log.info(f"{log_prefix}.started", interval_seconds=interval) + task = asyncio.create_task(_watch_loop(tick, interval, log_prefix), name=task_name) + try: + yield + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + _log.info(f"{log_prefix}.stopped") + + +__all__ = [ + "derive_watcher_decision_id", + "flag_watcher_lifespan", + "is_stalled", + "record_watcher_decision", +] diff --git a/apps/api/src/cora/api/_procedure_watcher.py b/apps/api/src/cora/api/_procedure_watcher.py index 8461d0b638..935fd8a7c5 100644 --- a/apps/api/src/cora/api/_procedure_watcher.py +++ b/apps/api/src/cora/api/_procedure_watcher.py @@ -3,8 +3,11 @@ A periodic background task, hosted at the composition root (`cora.api`) because it reads the Operation BC AND composes Decision BC events; only `cora.api` may depend on both (same placement rationale as `_clearance_watcher`, -`_calibration_watcher`, and `_run_supervisor`). See -[[project-procedure-watcher-design]]. +`_calibration_watcher`, and `_run_supervisor`). The agent-invariant mechanics +(the staleness rule, the per-episode Decision id, the DecisionRegistered +envelope, and the periodic loop / lifespan) live in `cora.api._flag_watcher`; +this module owns only what is specific to procedures, chiefly the Running +activity-recency fold. See [[project-procedure-watcher-design]]. ## What v1 does @@ -50,36 +53,25 @@ from __future__ import annotations -import asyncio import contextlib from typing import TYPE_CHECKING -from uuid import UUID, uuid5 +from uuid import UUID from cora.access.aggregates.actor import load_actor from cora.agent.seed_procedure_watcher import PROCEDURE_WATCHER_AGENT_ID -from cora.decision.aggregates.decision import ( - DECISION_CONTEXT_PROCEDURE_PROGRESS, - DecisionChoice, - DecisionConfidenceSource, - DecisionContext, - DecisionRegistered, - DecisionRule, - event_type_name, - to_payload, - validate_confidence, - validate_inputs, - validate_reasoning, +from cora.api._flag_watcher import ( + derive_watcher_decision_id, + flag_watcher_lifespan, + is_stalled, + record_watcher_decision, ) -from cora.infrastructure.event_envelope import to_new_event -from cora.infrastructure.logging import get_logger -from cora.infrastructure.ports import ConcurrencyError +from cora.decision.aggregates.decision import DECISION_CONTEXT_PROCEDURE_PROGRESS from cora.infrastructure.routing import NIL_SENTINEL_ID from cora.operation.adapters.postgres_procedure_activity_lookup import ( PostgresProcedureActivityLookup, ) from cora.operation.features.list_procedures import ListProcedures from cora.operation.ports import InMemoryProcedureActivityLookup -from cora.shared.identity import ActorId if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -91,11 +83,9 @@ from cora.operation.features.list_procedures.query import ProcedureStatusFilter from cora.operation.ports import ProcedureActivityLookup -_log = get_logger(__name__) - +_LOG_PREFIX = "procedure_watcher" _RULE = "agent:ProcedureWatcher:v1" _COMMAND_NAME = "ProcedureWatcherTick" -_STREAM_TYPE = "Decision" _PAGE_LIMIT = 100 _CHOICE_STALL = "Stall" _STATUS_RUNNING = "Running" @@ -111,18 +101,9 @@ _DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-00000c0c0002") -def is_stalled(last_progress_at: datetime, now: datetime, stale_after_seconds: float) -> bool: - """Pure rule: an in-conduct procedure is stalled once it has sat past the - staleness window without progress. - - Inclusive boundary: elapsed == window FLAGS (`>=`). - """ - return (now - last_progress_at).total_seconds() >= stale_after_seconds - - def _derive_decision_id(procedure_id: UUID, last_progress_at: datetime) -> UUID: """Deterministic ProcedureProgress Decision id for one stall episode.""" - return uuid5(_DECISION_NAMESPACE, f"decision:{procedure_id}:{last_progress_at.isoformat()}") + return derive_watcher_decision_id(_DECISION_NAMESPACE, procedure_id, last_progress_at) def _default_activity_lookup(deps: Kernel) -> ProcedureActivityLookup: @@ -141,62 +122,32 @@ async def _record_decision( last_progress_at: datetime, now: datetime, ) -> None: - """Append one DecisionRegistered(context=ProcedureProgress, choice=Stall). - - Idempotent: the deterministic id makes a re-flag of the same stall episode a - ConcurrencyError no-op (mirrors `_calibration_watcher._record_decision`). - """ - decision_id = _derive_decision_id(procedure_id, last_progress_at) + """Append one DecisionRegistered(context=ProcedureProgress, choice=Stall).""" stalled_seconds = int((now - last_progress_at).total_seconds()) - domain_event = DecisionRegistered( - decision_id=decision_id, - decided_by=ActorId(PROCEDURE_WATCHER_AGENT_ID), - context=DecisionContext(DECISION_CONTEXT_PROCEDURE_PROGRESS).value, - choice=DecisionChoice(_CHOICE_STALL).value, - parent_id=None, - override_kind=None, - rule=DecisionRule(_RULE).value, - reasoning=validate_reasoning( + await record_watcher_decision( + deps, + agent_id=PROCEDURE_WATCHER_AGENT_ID, + context=DECISION_CONTEXT_PROCEDURE_PROGRESS, + choice=_CHOICE_STALL, + rule=_RULE, + command_name=_COMMAND_NAME, + decision_id=_derive_decision_id(procedure_id, last_progress_at), + entity_id=procedure_id, + now=now, + reasoning=( f"Procedure has been {status} for {stalled_seconds}s without progressing " "(past the staleness window, no recent activity); surfaced for operator " "follow-up." ), - confidence=validate_confidence(None), - confidence_source=DecisionConfidenceSource.SELF_REPORTED, - alternatives=(), - inputs=validate_inputs( - { - "procedure_id": str(procedure_id), - "status": status, - "last_progress_at": last_progress_at.isoformat(), - "stalled_seconds": str(stalled_seconds), - "occurred_at": now.isoformat(), - } - ), - reasoning_signature=None, - occurred_at=now, - ) - new_event = to_new_event( - event_type=event_type_name(domain_event), - payload=to_payload(domain_event), - occurred_at=now, - event_id=uuid5(decision_id, "event:0"), - command_name=_COMMAND_NAME, - correlation_id=deps.id_generator.new_id(), - causation_id=None, - principal_id=PROCEDURE_WATCHER_AGENT_ID, + inputs={ + "procedure_id": str(procedure_id), + "status": status, + "last_progress_at": last_progress_at.isoformat(), + "stalled_seconds": str(stalled_seconds), + "occurred_at": now.isoformat(), + }, + log_prefix=_LOG_PREFIX, ) - try: - await deps.event_store.append( - stream_type=_STREAM_TYPE, - stream_id=decision_id, - expected_version=0, - events=[new_event], - ) - except ConcurrencyError: - _log.info("procedure_watcher.decision_already_written", procedure_id=str(procedure_id)) - return - _log.info("procedure_watcher.flagged", procedure_id=str(procedure_id), status=status) async def _drain_watched_procedures( @@ -269,27 +220,6 @@ async def _watch_tick( ) -async def _watch_loop( - deps: Kernel, - list_procedures: ListProceduresHandler, - activity_lookup: ProcedureActivityLookup, - interval_seconds: float, -) -> None: - """Periodic watch loop. A failed tick is logged; the next tick retries.""" - while True: - try: - await _watch_tick( - deps=deps, - list_procedures=list_procedures, - activity_lookup=activity_lookup, - ) - except asyncio.CancelledError: - raise - except Exception: - _log.exception("procedure_watcher.tick_failed") - await asyncio.sleep(interval_seconds) - - @contextlib.asynccontextmanager async def procedure_watcher_lifespan( deps: Kernel, @@ -304,29 +234,20 @@ async def procedure_watcher_lifespan( deployment opts in explicitly). `activity_lookup` defaults to the BC-local `_default_activity_lookup(deps)` (Postgres when a pool is present). """ - if not deps.settings.procedure_watcher_enabled: - _log.info("procedure_watcher.skipped", reason="disabled") - yield - return - lookup = activity_lookup if activity_lookup is not None else _default_activity_lookup(deps) - interval = ( - interval_seconds - if interval_seconds is not None - else deps.settings.procedure_watcher_tick_seconds - ) - _log.info("procedure_watcher.started", interval_seconds=interval) - task = asyncio.create_task( - _watch_loop(deps, list_procedures, lookup, interval), - name="procedure-watcher", - ) - try: + + async def tick() -> None: + await _watch_tick(deps=deps, list_procedures=list_procedures, activity_lookup=lookup) + + async with flag_watcher_lifespan( + enabled=deps.settings.procedure_watcher_enabled, + default_tick_seconds=deps.settings.procedure_watcher_tick_seconds, + log_prefix=_LOG_PREFIX, + task_name="procedure-watcher", + tick=tick, + interval_seconds=interval_seconds, + ): yield - finally: - task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await task - _log.info("procedure_watcher.stopped") __all__ = ["is_stalled", "procedure_watcher_lifespan"] diff --git a/apps/api/tests/unit/api/test_flag_watcher.py b/apps/api/tests/unit/api/test_flag_watcher.py new file mode 100644 index 0000000000..6404dd9a66 --- /dev/null +++ b/apps/api/tests/unit/api/test_flag_watcher.py @@ -0,0 +1,41 @@ +"""Unit tests for the shared flag-only-watcher scaffold (cora.api._flag_watcher). + +The per-agent behavior (envelope, drain, fold, gates) is covered by each +watcher's own suite, which exercises record_watcher_decision / is_stalled / +derive_watcher_decision_id through the watchers. This module pins the one +scaffold contract those suites do not reach: the loop re-raises CancelledError so +a lifespan exit actually stops an in-flight tick rather than swallowing the +cancellation and hanging shutdown. +""" + +import asyncio + +import pytest + +from cora.api._flag_watcher import flag_watcher_lifespan + + +@pytest.mark.unit +async def test_in_flight_tick_is_cancelled_on_lifespan_exit() -> None: + """Exiting the lifespan while a tick is blocked cancels it cleanly: the loop + propagates CancelledError so task teardown completes without hanging.""" + started = asyncio.Event() + released = False + + async def tick() -> None: + nonlocal released + started.set() + await asyncio.Event().wait() # block forever until cancelled + released = True # unreachable: cancellation interrupts the wait + + async with flag_watcher_lifespan( + enabled=True, + default_tick_seconds=0.01, + log_prefix="test_watcher", + task_name="test-watcher", + tick=tick, + ): + await asyncio.wait_for(started.wait(), timeout=1.0) + + # Reaching here means the context exited without hanging on the blocked tick. + assert released is False