Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions apps/api/src/cora/operation/acquisitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ def _check_trigger_constraints(self) -> CollectParams:
return self


async def _run_collect_cycle(ctx: ActionContext, params: CollectParams) -> Mapping[str, Any]:
async def run_collect_cycle(ctx: ActionContext, params: CollectParams) -> Mapping[str, Any]:
"""One collect cycle: configure detector, arm, poll until Done, read state.

Internal helper shared by the `collect` action body and the
composing `discrete` / `continuous` bodies. Takes a validated
`CollectParams` (or any subclass that exposes the
same fields, e.g., `DiscreteParams` inherits all of them), so the
callers don't re-validate or re-wrap `ActionContext` per cycle.
Shared helper used by the `collect` action body and the composing
bodies `discrete` / `continuous` (this module) and `flats`
(`cora.operation.staging`). Takes a validated `CollectParams` (or any
subclass that exposes the same fields, e.g., `DiscreteParams` /
`FlatsParams` inherit all of them), so the callers don't re-validate or
re-wrap `ActionContext` per cycle.
Returns the same evidence Mapping the `collect` action body returns,
so per-point composition stays uniform.
"""
Expand Down Expand Up @@ -179,7 +180,7 @@ async def collect(ctx: ActionContext) -> Mapping[str, Any]:
`source` as evidence-only fields (the trigger EMITTER is configured
by caller-authored setpoint steps before this action step).
"""
return await _run_collect_cycle(ctx, CollectParams.model_validate(ctx.params))
return await run_collect_cycle(ctx, CollectParams.model_validate(ctx.params))


class DiscreteParams(CollectParams):
Expand Down Expand Up @@ -236,7 +237,7 @@ async def discrete(ctx: ActionContext) -> Mapping[str, Any]:
await ctx.control_port.write(params.axis, point, wait=True)
if params.wait > 0:
await asyncio.sleep(params.wait)
cycle = await _run_collect_cycle(ctx, params)
cycle = await run_collect_cycle(ctx, params)
results.append({"point": point, "collect": cycle})
return {
"axis": params.axis,
Expand Down Expand Up @@ -362,4 +363,5 @@ async def continuous(ctx: ActionContext) -> Mapping[str, Any]:
"collect",
"continuous",
"discrete",
"run_collect_cycle",
]
27 changes: 20 additions & 7 deletions apps/api/src/cora/operation/ports/control_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,26 @@ def __init__(self, address: str, reason: str) -> None:


class ControlValueCoercionError(Exception):
"""Adapter cannot unpack the substrate value into a `Reading` shape.

Triggered when an adapter sees a novel structured type (e.g., a
new EPICS V4 NT variant) or a substrate primitive that does not
fit the closed `ReadingKind` set. Carries the substrate's raw
type label so operators can extend the kind set in a follow-up
rather than silently dropping data.
"""A control value cannot be coerced to the kind a consumer requires.

`target_kind` names the kind the value could not satisfy; it is a
free-form label, not constrained to `ReadingKind`. Three live cases,
all carrying the raw type label so the failure is debuggable rather
than a silently dropped value:

- Adapter read-unpack: an adapter sees a structured type or a
substrate primitive that does not fit the closed `ReadingKind`
set; `target_kind` is the `ReadingKind` it was unpacking toward.
- Adapter write-coercion: a write value cannot be encoded for the
substrate; `target_kind` is an operation label (e.g. the PVA
adapter's `"pva put"`), not a `ReadingKind`.
- Consumer-side: a `Reading` unpacked cleanly but its `value` is
the wrong type for a downstream consumer (e.g., an action body
needs a numeric axis position but read a categorical leaf). Here
`target_kind` is a LOGICAL kind the consumer names (e.g.
`"number"`). Action bodies raise this so the Conductor records a
structured step failure (it is a member of the Conductor's caught
`_CONTROL_ERRORS`) instead of letting a bare `TypeError` escape.
"""

def __init__(self, address: str, raw_type: str, target_kind: str) -> None:
Expand Down
150 changes: 150 additions & 0 deletions apps/api/src/cora/operation/staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Sample-staging action bodies for the Conductor (composition, not primitives).

The scan primitives in `cora.operation.acquisitions` (`collect` /
`discrete` / `continuous`) are the acquisition-MOTION taxonomy: single /
stepped / swept capture. This module holds a different KIND of action: a
ceremony STAGING composition that brackets an acquisition with a
save-and-restore of an axis. `flats` is `collect` plus sample retraction,
not a fourth scan primitive, so it lives here rather than alongside the
primitives.

## Why staging needs a body at all (the conduct variable-binding gap)

A flat-field capture retracts the sample off the beam, collects, then
restores the sample to its aligned centre. The restore target is the
position read at runtime, and CORA has no relative-move primitive, so the
restore is an absolute write of the read-back value. The conduct step
model is static: `RecipeSetpointStep.value` is a literal or a `BindingRef`,
never "the value I just read at runtime". So a read-then-restore cannot be
expressed as recipe steps today and must live inside a body.

`flats` is therefore a pragmatic stand-in for a missing conduct capability.
The principled fix is conduct-level runtime VARIABLE BINDING (read a value
into a binding, reference it in a later setpoint step), which would turn
the save-and-restore into three ordinary recipe steps (read axis -> collect
-> setpoint from the read) and RETIRE this body. Variable binding is the
designated next design; see [[project_flat_dark_prologue_design]]. Until it
lands, keep staging compositions here and out of the scan-primitive family.
"""

from __future__ import annotations

import math
from typing import TYPE_CHECKING, Any

from pydantic import Field, model_validator

from cora.operation.acquisitions import CollectParams, run_collect_cycle
from cora.operation.ports.control_port import ControlValueCoercionError

if TYPE_CHECKING:
from collections.abc import Mapping

from cora.operation.conductor import ActionContext


def _require_numeric(value: Any, address: str) -> float:
"""Return `value` as a number or raise a Conductor-recordable failure.

The save-and-restore arithmetic (`saved + clearance`) needs a numeric
axis read. `Reading.value` is typed `Any`; a non-numeric read (a
mis-addressed or categorical leaf) would otherwise raise a bare
`TypeError` that escapes the Conductor's `_CONTROL_ERRORS`-only catch
and strands the Procedure in Running. Mapping it to
`ControlValueCoercionError` (in `_CONTROL_ERRORS`) lets the Conductor
record a structured step failure instead. The read precedes any axis
move, so nothing has actuated when this raises.

Non-finite floats (NaN / +-inf) are also rejected: a NaN/inf axis read
(EPICS UDF, uninitialized record) would otherwise propagate through the
`saved + clearance` arithmetic into an absolute write of an undefined
setpoint with no recorded failure.
"""
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ControlValueCoercionError(address, type(value).__name__, "number")
if not math.isfinite(value):
raise ControlValueCoercionError(address, repr(value), "finite number")
return value


class FlatsParams(CollectParams):
"""Validated parameters for the `flats` staging action body.

Extends `CollectParams` with the sample-retraction definition (`axis`
+ `clearance`). Inherits the detector / trigger / dwell / repetitions
fields and the three conditional `@model_validator` rules unchanged: a
flat capture runs the same collect cycle as `collect`, just with the
sample retracted.

`clearance` is a SIGNED NONZERO offset added to the saved position to
form the off-centre target. CORA has no relative-move primitive, so the
retract and the restore are both absolute writes (read the position, add
the clearance, write it; later write the saved value back). The sign
chooses the retraction direction; the magnitude is the clear-of-beam
travel. Zero is rejected (it would leave the sample in the beam and
label an in-beam capture a flat-field), mirroring
`ContinuousParams._check_sweep_range`'s zero-range guard. Per
[[project_units_design]] both `dwell` (inherited) and `clearance` carry
the canonical unit annotation; `clearance` is mm.
"""

axis: str
clearance: float = Field(
...,
json_schema_extra={"unit": {"system": "udunits", "code": "mm"}},
)

@model_validator(mode="after")
def _check_clearance_nonzero(self) -> FlatsParams:
if self.clearance == 0:
raise ValueError("clearance must be nonzero (zero leaves the sample in the beam)")
return self


async def flats(ctx: ActionContext) -> Mapping[str, Any]:
"""Flat-field capture with the sample retracted, then restored.

Save-and-restore around one `collect` cycle: read the current `axis`
position, drive it off the beam centre by `clearance` (an absolute
write, since CORA has no relative-move primitive), run the collect
cycle to capture the flat frames, then restore the axis to the saved
position. Both the retract and the restore are blocking writes
(`wait=True`) so the frames land at the settled off-centre position and
the sample is back at its aligned centre before the body returns.

Runs the shared `run_collect_cycle` with the already-validated
`FlatsParams` (a `CollectParams` subclass), the same way `discrete` and
`continuous` compose a collect cycle without re-validating.

On a `Control*Error` mid-cycle the exception propagates unchanged (no
rollback try/finally, matching `collect` / `discrete` / `continuous`):
the Conductor records the step failure and the axis is left retracted
(off the beam), an acceptable fault state for a sample-out capture.
Operators reconcile the retracted axis via state inspection, as with
any halted conduct.

Evidence shape carries the save-and-restore positions (`saved_value`,
`offcenter_target`) plus the nested `collect` cycle evidence. The
restore landing is proven by re-reading the axis (see the integration
test), not by an echoed field here.
"""
params = FlatsParams.model_validate(ctx.params)
saved = await ctx.control_port.read(params.axis)
saved_value = _require_numeric(saved.value, params.axis)
offcenter_target = saved_value + params.clearance
await ctx.control_port.write(params.axis, offcenter_target, wait=True)
cycle = await run_collect_cycle(ctx, params)
await ctx.control_port.write(params.axis, saved_value, wait=True)
return {
"axis": params.axis,
"saved_value": saved_value,
"clearance": params.clearance,
"offcenter_target": offcenter_target,
"collect": cycle,
}


__all__ = [
"FlatsParams",
"flats",
]
7 changes: 5 additions & 2 deletions apps/api/src/cora/operation/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
try_conduct_procedure,
)
from cora.operation.ports.control_port import ControlPort
from cora.operation.staging import flats

_BC = "operation"

Expand Down Expand Up @@ -150,7 +151,9 @@ def wire_operation(deps: Kernel, *, control_port: ControlPort | None = None) ->
`ControlPortRegistry` with the configured substrate adapters per
prefix. The action registry is hand-seeded with the three
substrate-neutral scan-acquisition primitives `collect` +
`discrete` + `continuous`. Per-deployment registry-from-config
`discrete` + `continuous`, plus the `flats` staging action (a
save-and-restore composition over `collect`, from
`cora.operation.staging`). Per-deployment registry-from-config
plumbing remains deferred.
"""
step_store: ActivityStore = (
Expand Down Expand Up @@ -212,7 +215,7 @@ def wire_operation(deps: Kernel, *, control_port: ControlPort | None = None) ->
else build_control_port(deps.settings.control_port_routes)
)
action_registry = InMemoryActionRegistry(
{"collect": collect, "discrete": discrete, "continuous": continuous}
{"collect": collect, "discrete": discrete, "continuous": continuous, "flats": flats}
)
conductor = Conductor(
control_port=control_port,
Expand Down
28 changes: 22 additions & 6 deletions apps/api/tests/architecture/test_procedure_kind_naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,20 @@
)

# Whole-kind carve-outs, with rationale:
# first_light - whole-system milestone, no single subject
# {dark,flat}_baseline - capture-and-store; the trailing noun is the
# produced artifact, not the operation
# first_light - whole-system milestone, no single subject
# {dark,flat}_baseline - capture-and-store; the trailing noun is the
# produced artifact, not the operation
# normalization_baseline - the combined darks+flats normalization
# ceremony; same capture-and-store idiom as
# its two predecessors (trailing noun is the
# produced artifact), composing them rather
# than superseding them
CARVE_OUT_KINDS = frozenset(
{
"first_light",
"dark_baseline",
"flat_baseline",
"normalization_baseline",
}
)

Expand All @@ -66,8 +72,18 @@ def _scenario_files() -> list[Path]:
)


_REGISTRATION_COMMANDS = frozenset({"RegisterProcedure", "RegisterProcedureFromRecipe"})
"""The two procedure-registration commands that exist today, both carrying
an identically-shaped ``kind`` field that the noun-LAST convention governs.
A recipe-driven Procedure (``RegisterProcedureFromRecipe``) must satisfy it
too, else a verb-first recipe-path kind would slip past the guard. This is
an explicit enumeration, NOT a verb-agnostic match: a third
procedure-registration command with a ``kind`` field must be added here, or
its scenarios escape the scan."""


def _register_procedure_kinds(tree: ast.AST) -> list[tuple[int, str]]:
"""(lineno, kind) for every ``RegisterProcedure(kind="<literal>")`` call.
"""(lineno, kind) for every ``Register[Procedure|ProcedureFromRecipe](kind="<literal>")`` call.

Non-literal kinds (variables, f-strings) cannot be checked statically
and are skipped; scenario tests use string literals.
Expand All @@ -84,7 +100,7 @@ def _register_procedure_kinds(tree: ast.AST) -> list[tuple[int, str]]:
if isinstance(func, ast.Attribute)
else None
)
if name != "RegisterProcedure":
if name not in _REGISTRATION_COMMANDS:
continue
kind_kw = next((kw for kw in node.keywords if kw.arg == "kind"), None)
if kind_kw is None:
Expand Down Expand Up @@ -112,7 +128,7 @@ def test_scenario_procedure_kinds_follow_noun_last_convention() -> None:
violations.append(f" {path.name}:{lineno}: kind={kind!r}")

assert seen, (
"no RegisterProcedure(kind=...) literals found under "
"no Register[Procedure|ProcedureFromRecipe](kind=...) literals found under "
"tests/integration/scenarios/ -- scope regression?"
)

Expand Down
Loading
Loading