Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 45 additions & 122 deletions apps/api/src/cora/api/_calibration_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
A periodic background task, hosted at the composition root (`cora.api`) because
it reads the Calibration BC AND composes Decision BC events; only `cora.api` may
depend on both (same placement rationale as `_clearance_watcher`,
`_clearance_expirer`, `_run_supervisor`). See
[[project-calibration-watcher-design]].
`_clearance_expirer`, `_run_supervisor`). The agent-invariant mechanics (the
staleness rule, the per-episode Decision id, the DecisionRegistered envelope, and
the periodic loop / lifespan) live in `cora.api._flag_watcher`; this module owns
only what is specific to calibrations. See [[project-calibration-watcher-design]].

## What v1 does

Expand Down Expand Up @@ -35,32 +37,21 @@

from __future__ import annotations

import asyncio
import contextlib
from typing import TYPE_CHECKING
from uuid import UUID, uuid5
from uuid import UUID

from cora.access.aggregates.actor import load_actor
from cora.agent.seed_calibration_watcher import CALIBRATION_WATCHER_AGENT_ID
from cora.calibration.features.list_calibrations import ListCalibrations
from cora.decision.aggregates.decision import (
DECISION_CONTEXT_CALIBRATION_VERIFICATION,
DecisionChoice,
DecisionConfidenceSource,
DecisionContext,
DecisionRegistered,
DecisionRule,
event_type_name,
to_payload,
validate_confidence,
validate_inputs,
validate_reasoning,
from cora.api._flag_watcher import (
derive_watcher_decision_id,
flag_watcher_lifespan,
is_stalled,
record_watcher_decision,
)
from cora.infrastructure.event_envelope import to_new_event
from cora.infrastructure.logging import get_logger
from cora.infrastructure.ports import ConcurrencyError
from cora.calibration.features.list_calibrations import ListCalibrations
from cora.decision.aggregates.decision import DECISION_CONTEXT_CALIBRATION_VERIFICATION
from cora.infrastructure.routing import NIL_SENTINEL_ID
from cora.shared.identity import ActorId

if TYPE_CHECKING:
from collections.abc import AsyncGenerator
Expand All @@ -72,11 +63,9 @@
)
from cora.infrastructure.kernel import Kernel

_log = get_logger(__name__)

_LOG_PREFIX = "calibration_watcher"
_RULE = "agent:CalibrationWatcher:v1"
_COMMAND_NAME = "CalibrationWatcherTick"
_STREAM_TYPE = "Decision"
_PAGE_LIMIT = 100
_CHOICE_STALE = "Stale"
_STATUS_PROVISIONAL = "Provisional"
Expand All @@ -88,18 +77,9 @@
_DECISION_NAMESPACE = UUID("01900000-0000-7000-8000-0000ca110002")


def is_stalled(last_revised_at: datetime, now: datetime, stale_after_seconds: float) -> bool:
"""Pure rule: a Provisional calibration is stale once its newest revision has
sat past the staleness window without verification.

Inclusive boundary: elapsed == window FLAGS (`>=`).
"""
return (now - last_revised_at).total_seconds() >= stale_after_seconds


def _derive_decision_id(calibration_id: UUID, last_revised_at: datetime) -> UUID:
"""Deterministic CalibrationVerification Decision id for one stale episode."""
return uuid5(_DECISION_NAMESPACE, f"decision:{calibration_id}:{last_revised_at.isoformat()}")
return derive_watcher_decision_id(_DECISION_NAMESPACE, calibration_id, last_revised_at)


async def _record_decision(
Expand All @@ -110,64 +90,32 @@ async def _record_decision(
last_revised_at: datetime,
now: datetime,
) -> None:
"""Append one DecisionRegistered(context=CalibrationVerification, choice=Stale).

Idempotent: the deterministic id makes a re-flag of the same stale episode a
ConcurrencyError no-op (mirrors `_clearance_watcher._record_decision`).
"""
decision_id = _derive_decision_id(calibration_id, last_revised_at)
"""Append one DecisionRegistered(context=CalibrationVerification, choice=Stale)."""
stale_seconds = int((now - last_revised_at).total_seconds())
domain_event = DecisionRegistered(
decision_id=decision_id,
decided_by=ActorId(CALIBRATION_WATCHER_AGENT_ID),
context=DecisionContext(DECISION_CONTEXT_CALIBRATION_VERIFICATION).value,
choice=DecisionChoice(_CHOICE_STALE).value,
parent_id=None,
override_kind=None,
rule=DecisionRule(_RULE).value,
reasoning=validate_reasoning(
await record_watcher_decision(
deps,
agent_id=CALIBRATION_WATCHER_AGENT_ID,
context=DECISION_CONTEXT_CALIBRATION_VERIFICATION,
choice=_CHOICE_STALE,
rule=_RULE,
command_name=_COMMAND_NAME,
decision_id=_derive_decision_id(calibration_id, last_revised_at),
entity_id=calibration_id,
now=now,
reasoning=(
f"Calibration ({quantity}) has had a Provisional revision unverified for "
f"{stale_seconds}s (past the staleness window); surfaced for operator "
"re-verification before a Run acquires against it."
),
confidence=validate_confidence(None),
confidence_source=DecisionConfidenceSource.SELF_REPORTED,
alternatives=(),
inputs=validate_inputs(
{
"calibration_id": str(calibration_id),
"quantity": quantity,
"last_revised_at": last_revised_at.isoformat(),
"stale_seconds": str(stale_seconds),
"occurred_at": now.isoformat(),
}
),
reasoning_signature=None,
occurred_at=now,
)
new_event = to_new_event(
event_type=event_type_name(domain_event),
payload=to_payload(domain_event),
occurred_at=now,
event_id=uuid5(decision_id, "event:0"),
command_name=_COMMAND_NAME,
correlation_id=deps.id_generator.new_id(),
causation_id=None,
principal_id=CALIBRATION_WATCHER_AGENT_ID,
inputs={
"calibration_id": str(calibration_id),
"quantity": quantity,
"last_revised_at": last_revised_at.isoformat(),
"stale_seconds": str(stale_seconds),
"occurred_at": now.isoformat(),
},
log_prefix=_LOG_PREFIX,
)
try:
await deps.event_store.append(
stream_type=_STREAM_TYPE,
stream_id=decision_id,
expected_version=0,
events=[new_event],
)
except ConcurrencyError:
_log.info(
"calibration_watcher.decision_already_written", calibration_id=str(calibration_id)
)
return
_log.info("calibration_watcher.flagged", calibration_id=str(calibration_id))


async def _drain_provisional_calibrations(
Expand Down Expand Up @@ -223,22 +171,6 @@ async def _watch_tick(
)


async def _watch_loop(
deps: Kernel,
list_calibrations: ListCalibrationsHandler,
interval_seconds: float,
) -> None:
"""Periodic watch loop. A failed tick is logged; the next tick retries."""
while True:
try:
await _watch_tick(deps=deps, list_calibrations=list_calibrations)
except asyncio.CancelledError:
raise
except Exception:
_log.exception("calibration_watcher.tick_failed")
await asyncio.sleep(interval_seconds)


@contextlib.asynccontextmanager
async def calibration_watcher_lifespan(
deps: Kernel,
Expand All @@ -251,28 +183,19 @@ async def calibration_watcher_lifespan(
No-op unless `settings.calibration_watcher_enabled` is True (default off, so a
deployment opts in explicitly).
"""
if not deps.settings.calibration_watcher_enabled:
_log.info("calibration_watcher.skipped", reason="disabled")
yield
return

interval = (
interval_seconds
if interval_seconds is not None
else deps.settings.calibration_watcher_tick_seconds
)
_log.info("calibration_watcher.started", interval_seconds=interval)
task = asyncio.create_task(
_watch_loop(deps, list_calibrations, interval),
name="calibration-watcher",
)
try:
async def tick() -> None:
await _watch_tick(deps=deps, list_calibrations=list_calibrations)

async with flag_watcher_lifespan(
enabled=deps.settings.calibration_watcher_enabled,
default_tick_seconds=deps.settings.calibration_watcher_tick_seconds,
log_prefix=_LOG_PREFIX,
task_name="calibration-watcher",
tick=tick,
interval_seconds=interval_seconds,
):
yield
finally:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
_log.info("calibration_watcher.stopped")


__all__ = ["calibration_watcher_lifespan", "is_stalled"]
Loading
Loading