diff --git a/CLAUDE.md b/CLAUDE.md index 632e26c..0d8a252 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,11 +67,12 @@ This applies to every PLAN.md generated by `/gsd-plan-phase`, every PROJECT.md " Standalone live adapter for Citizen Weather Observer Program (PWS) data over the APRS-IS TCP stream. **Deliberately isolated from the parity-critical core** — a hot-rooftop or indoor PWS must never corrupt a Kalshi NHIGH/NLOW settlement target. -- **Never wired into** `research()`, `merge/observations.py` (`SOURCE_PRIORITY`), or `live/_sources.py`. CWOP carries its own `schema.cwop.v1` — `schema.observation.v1`'s `observation_type` enum is untouched. These four files are the parity firewall; do not register "cwop" in any of them. -- **Public surface (sync unless noted):** `nearby()` (start-here ICAO→scan), `scan()`, `stream()` (async generator), `snapshot()` (→ `schema.cwop.v1` DataFrame), `latest()`. Source tag `"cwop.live"`. Errors raise `NoCWOPDataError` (subclass of `NoLiveDataError`) — never return `[]`/`None`. -- **File layout:** `weather/_aprs.py` (in-house MIT parser, no GPLv2 aprslib), `weather/_fetchers/_aprs_stream.py` (`_CWOPReaderThread` daemon owns socket → `threading.Queue` → sync/async consumers; `loop.call_soon_threadsafe` bridges async), `weather/cwop/*` (public API + `_schema.py` + `_registry.py`), `weather/qc/_cwop.py` (QC). Pure stdlib (`socket` + `re`); `[cwop]` extra is empty. +- **Never wired into** `research()`, `merge/observations.py` (`SOURCE_PRIORITY`), or `live/_sources.py`. CWOP carries its own `schema.cwop.v1` — `schema.observation.v1`'s `observation_type` enum is untouched. These four files are the parity firewall; do not register "cwop" in any of them. This holds in v0.2: persistence + `history()` add NO wiring into those four files. +- **Public surface (sync unless noted):** `nearby()` (start-here ICAO→scan), `scan()`, `stream()` (async generator), `snapshot()` (→ `schema.cwop.v1` DataFrame; `persist=True` also writes the backtest cache), `latest()`. v0.2 adds `history(station, from_date, to_date, *, qc_status=None)` (→ `schema.cwop.v1` DataFrame for backtest replay) and `persist_observations(observations)`. Source tag `"cwop.live"` (live) / `"cwop.cache"` (persisted-then-read); `schema.cwop.v1` accepts both via `_registered_sources`. Errors raise `NoCWOPDataError` (subclass of `NoLiveDataError`) — never return `[]`/`None`. +- **File layout:** `weather/_aprs.py` (in-house MIT parser, no GPLv2 aprslib), `weather/_fetchers/_aprs_stream.py` (`_CWOPReaderThread` daemon owns socket → `threading.Queue` → sync/async consumers; `loop.call_soon_threadsafe` bridges async), `weather/cwop/*` (public API + `_schema.py` + `_registry.py` + v0.2 `_cache.py`/`_history.py`), `weather/qc/_cwop.py` (QC). Pure stdlib (`socket` + `re`) for the live path; persistence pulls `pyarrow`/`filelock`. `[cwop]` extra pulls pandas (for `snapshot()`/`history()` DataFrames). +- **Persistence (v0.2):** monthly parquet at `$HOME/.mostlyright/cache/cwop/{station}/{year}/{month}.parquet` (honors `MOSTLYRIGHT_CACHE_DIR`). Standalone `weather/cwop/_cache.py` — does NOT import the parity-coupled `weather/cache.py`; `filelock`-guarded read-modify-write merge, dedup by `(station_id, observed_at)` first-seen-wins. **NO current-month skip** (CWOP is ephemeral — unlike re-fetchable AWC/IEM/GHCNh, the current month is the only chance to retain it). CWOP station ids (CW0875, ham callsigns) are NOT ICAO, so the cache uses its own `_CWOP_STATION_RE` path validator, not `STATION_CODE_RE`. - **6-layer QC thresholds:** each layer scores 0.0–1.0; combined = weighted geometric mean. `< 0.3` → dropped, `0.3–0.7` → flagged, `> 0.7` → clean. Layers: range (0.20), temporal consistency (0.20), indoor detection (0.15), buddy/ASOS check (0.20), solar-radiation bias (0.10), reliability (0.15). Layers needing history (indoor ≥24h, solar ≥7d) pass (1.0) until enough data accumulates. -- **v0.1 limits:** live-only (no persistence/backtest), no `research()` integration. Persistence + research wiring are v0.2. See [`docs/cwop-adapter.md`](docs/cwop-adapter.md). +- **Why no `include_cwop` on `research()`:** `research()` aggregates `max/min(temp_f)` source-blind, so a hot-rooftop/indoor PWS would corrupt a Kalshi NHIGH/NLOW target. CWOP reaches models ONLY through `cwop.history()` (a DataFrame the strategy joins itself), never the settlement join. See [`docs/cwop-adapter.md`](docs/cwop-adapter.md). ## Testing diff --git a/docs/cwop-adapter.md b/docs/cwop-adapter.md index 356174d..61d3fd5 100644 --- a/docs/cwop-adapter.md +++ b/docs/cwop-adapter.md @@ -10,7 +10,9 @@ stations report every 5–15 minutes. The only real-time path is a TCP stream > parity-frozen `schema.observation.v1`). A hot-rooftop or indoor PWS must never > silently corrupt a Kalshi NHIGH/NLOW settlement target — so CWOP lives on its > own island, gated by a 6-layer QC pipeline. There is no `research()` -> integration and no backtesting in v0.1. +> integration: CWOP reaches ML strategies only through its **own** access path +> (`cwop.history()`), which returns a `schema.cwop.v1` DataFrame the strategy +> joins itself — fully aware it is unofficial PWS data. ## Install @@ -47,6 +49,48 @@ async for obs in cwop.stream("CW0875"): print(obs.observed_at, obs.temp_f) ``` +## Persistence + backtest (v0.2) + +CWOP data is ephemeral — the APRS-IS stream has no REST backfill, so the only +way to build a history for model training is to **persist what you collect**. +v0.2 adds a monthly parquet cache and a `history()` replay query, both behind +the same firewall as the live surface (CWOP never enters `research()`/merge). + +```python +from datetime import date +from mostlyright.weather import cwop + +# Collect AND persist in one call (side effect — the returned live frame is +# unchanged and still tagged source="cwop.live"). +cwop.snapshot("CW0875", duration_seconds=120, persist=True) + +# Or persist the output of any collection path (e.g. a long stream run). +obs = [o async for o in cwop.stream("CW0875")] # illustrative +cwop.persist_observations(obs) + +# Replay persisted history for backtesting — returns a schema.cwop.v1 frame +# tagged source="cwop.cache" that your strategy joins itself. +hist = cwop.history("CW0875", date(2026, 1, 1), date(2026, 6, 30)) + +# Drop QC-rejected PWS rows up front: +clean = cwop.history("CW0875", date(2026, 1, 1), date(2026, 6, 30), qc_status="clean") +``` + +- **Cache layout:** `$HOME/.mostlyright/cache/cwop/{station}/{year}/{month}.parquet` + (honors `MOSTLYRIGHT_CACHE_DIR`). `filelock`-guarded read-modify-write merge; + dedup by `(station_id, observed_at)`, first-seen wins. +- **No current-month skip.** Unlike the parity observation cache (which skips + the mutable current month because AWC/IEM/GHCNh are re-fetchable), CWOP + persists everything — the current month is exactly what a live collector + needs to keep. +- **Source identity:** live pulls are `"cwop.live"`; anything read back from the + cache is `"cwop.cache"`. `schema.cwop.v1` accepts both. +- **Why no `include_cwop=True` on `research()`?** `research()` aggregates + `max(temp_f)/min(temp_f)` source-blind, so a single hot-rooftop or indoor PWS + would raise `obs_high_f` / lower `obs_low_f` and corrupt a Kalshi NHIGH/NLOW + training target. CWOP therefore stays on its own `history()` path; the + strategy decides how (and whether) to fold it in. + Every failure mode raises `NoCWOPDataError` (a `NoLiveDataError` subclass) with the requested station/area and a `reason` — the API never returns `[]`/`None`. @@ -92,10 +136,15 @@ geometric mean. Status: `<0.3` dropped, `0.3–0.7` flagged, `>0.7` clean. Layers without enough history pass (1.0) until data accumulates, so a freshly-seen station is scored mostly on the range + temporal layers. -## Known limitations (v0.1) +## Known limitations (v0.2) -- **Live-only** — no persistence, no backtesting. CWOP data is ephemeral. -- **No `research()` integration** — CWOP cannot be used for model training yet. +- **Persistence is collect-then-replay** — CWOP has no REST backfill, so + `history()` only returns what you previously persisted (via `snapshot(persist=True)` + or `persist_observations()`). There is no way to retrieve CWOP data for a date + before you started collecting. +- **No `research()` integration** — by design. CWOP reaches models only through + the standalone `history()` path (see *Persistence + backtest* above); it is + never folded into the parity-critical settlement join. - **Buddy check needs an ASOS** within 50km; degraded scoring otherwise. - **Solar-bias / indoor detection need 7d / 24h** of history (the live session rarely has it), so they pass until enough data accumulates. diff --git a/packages/weather/src/mostlyright/weather/cwop/__init__.py b/packages/weather/src/mostlyright/weather/cwop/__init__.py index 424420f..7f72b74 100644 --- a/packages/weather/src/mostlyright/weather/cwop/__init__.py +++ b/packages/weather/src/mostlyright/weather/cwop/__init__.py @@ -15,9 +15,19 @@ coordinates and scan around it. - :func:`scan` — discover stations near a lat/lon by listening on APRS-IS. - :func:`stream` — async generator of fresh observations. -- :func:`snapshot` — collect a fixed window into a ``schema.cwop.v1`` DataFrame. +- :func:`snapshot` — collect a fixed window into a ``schema.cwop.v1`` DataFrame + (pass ``persist=True`` to also write it to the backtest cache). - :func:`latest` — the most-recent observation for one station. +Persistence + backtest (v0.2): + +- :func:`history` — replay persisted CWOP observations for a date range as a + ``schema.cwop.v1`` DataFrame (``source="cwop.cache"``). The parity-safe access + path for ML strategies — CWOP is NEVER wired into ``research()`` / merge / + ``live._sources``; a strategy joins this frame itself. +- :func:`persist_observations` — write a list of observations to the monthly + parquet cache (the collection seam for ``snapshot()`` / ``stream()`` output). + Data classes: :class:`CWOPStation`, :class:`CWOPObservation`. Errors: :class:`NoCWOPDataError`. @@ -29,17 +39,23 @@ from mostlyright.core.exceptions import NoCWOPDataError +from ._cache import persist_observations +from ._history import history from ._scan import CWOPStation, nearby, scan -from ._schema import CWOPObservation +from ._schema import CWOP_CACHE_SOURCE, CWOP_SOURCE, CWOPObservation from ._snapshot import latest, snapshot from ._stream import stream __all__ = [ + "CWOP_CACHE_SOURCE", + "CWOP_SOURCE", "CWOPObservation", "CWOPStation", "NoCWOPDataError", + "history", "latest", "nearby", + "persist_observations", "scan", "snapshot", "stream", diff --git a/packages/weather/src/mostlyright/weather/cwop/_cache.py b/packages/weather/src/mostlyright/weather/cwop/_cache.py new file mode 100644 index 0000000..eb15350 --- /dev/null +++ b/packages/weather/src/mostlyright/weather/cwop/_cache.py @@ -0,0 +1,401 @@ +"""CWOP persistence — monthly parquet cache for backtest replay (v0.2). + +Path layout:: + + $HOME/.mostlyright/cache/cwop///.parquet + +Honors ``MOSTLYRIGHT_CACHE_DIR`` (the shared SDK cache root) so relocating the +cache moves CWOP partitions with everything else. + +**Deliberately a standalone island.** This module does NOT import +``mostlyright.weather.cache`` — that module is parity-coupled (it pulls +``OBSERVATION_SCHEMA`` and the LST tz map, and applies the +current-LST-month-skip rule designed for *re-fetchable* AWC/IEM/GHCNh data). +CWOP is the opposite: a live-only APRS-IS stream with NO REST backfill, so its +data is **ephemeral** — persisting the current month is the ONLY way to retain +CWOP history for backtesting. There is therefore NO current-month skip here. + +Re-using the parity cache's atomic-write helper would couple CWOP persistence +to the parity firewall's invalidation semantics; the ~20-line atomic writer is +duplicated on purpose so CWOP stays on its own island (see CLAUDE.md "CWOP +adapter"). + +Concurrency: writes are a read-modify-write *merge* into an existing monthly +partition, serialized under a single per-path ``FileLock`` so two writers +targeting the same ``(station, year, month)`` cannot lost-update each other — +the second writer reads the first's already-committed rows and dedups on top. +This mirrors the satellite cache tier's merge contract. + +Source identity: persisted rows are stamped ``source="cwop.cache"`` (distinct +from the live ``"cwop.live"``) so a consumer reading them back via +:func:`mostlyright.weather.cwop.history` knows the provenance. CWOP data still +never enters the parity-frozen merge/research path. +""" + +from __future__ import annotations + +import logging +import os +import re +from datetime import UTC, date, datetime +from pathlib import Path +from typing import TYPE_CHECKING + +import pyarrow as pa +import pyarrow.parquet as pq +from filelock import FileLock +from mostlyright._internal._bounds import assert_path_under +from mostlyright._internal._cache_dir import resolve_cache_root_without_v1 + +from ._schema import CWOP_CACHE_SOURCE + +if TYPE_CHECKING: + from ._schema import CWOPObservation + +logger = logging.getLogger(__name__) + +#: FileLock timeout in seconds (mirrors the observation/satellite cache tiers). +LOCK_TIMEOUT_SECONDS = 30 + +# CWOP station IDs are NOT 4-letter ICAO codes, so ``STATION_CODE_RE`` +# (``[A-Z]{3,4}``) is the wrong validator. PWS ids are CW/DW/EW/FW/GW + 4 +# digits; ham-callsign reporters add letters and an optional ``-SSID`` suffix. +# The path-safe subset: start alphanumeric, then uppercase alnum or hyphen, +# 3-12 chars total. This forbids ``/``, ``\``, ``.`` and ``..`` so the id can +# never escape its single path segment. ``assert_path_under`` is the final +# defense-in-depth backstop. +_CWOP_STATION_RE = re.compile(r"\A[A-Z0-9][A-Z0-9-]{2,11}\Z") + + +def _validate_cwop_station(station: object) -> str: + """Normalize + path-validate a CWOP station id, returning the upper form. + + Raises: + TypeError: ``station`` is not a ``str``. + ValueError: ``station`` does not match the path-safe CWOP id pattern. + """ + if not isinstance(station, str): + raise TypeError(f"station must be a str, got {type(station).__name__}") + normalized = station.strip().upper() + if not _CWOP_STATION_RE.match(normalized): + raise ValueError( + f"station={station!r} is not a path-safe CWOP id " + f"(expected {_CWOP_STATION_RE.pattern}, e.g. 'CW0875')" + ) + return normalized + + +def _cwop_cache_root() -> Path: + """Return ``/cwop`` (honors ``MOSTLYRIGHT_CACHE_DIR``). + + Resolved on each call so tests can monkeypatch the env var between cases. + """ + return resolve_cache_root_without_v1() / "cwop" + + +def cwop_cache_path(station: str, year: int, month: int) -> Path: + """Return the parquet partition path for ``(station, year, month)``. + + Example:: + + cwop_cache_path("CW0875", 2026, 6) + # -> ~/.mostlyright/cache/cwop/CW0875/2026/06.parquet + + Validates ``station`` against the CWOP id pattern and asserts the resolved + path stays under the CWOP cache root (path-traversal backstop). + """ + safe = _validate_cwop_station(station) + root = _cwop_cache_root() + raw = root / safe / f"{year:04d}" / f"{month:02d}.parquet" + assert_path_under(raw, root, field="cwop_cache_path") + return raw + + +# --------------------------------------------------------------------------- +# Dedup + atomic write +# --------------------------------------------------------------------------- +def _ensure_utc(value: object) -> object: + """Coerce a ``datetime`` cell to tz-aware UTC; pass anything else through. + + Naive datetimes are assumed UTC (consistent with :func:`persist_observations` + bucketing); aware datetimes in another zone are converted to UTC. This is + applied to ``observed_at``/``knowledge_time`` at the write chokepoint so the + dedup key is stable (a naive and an aware spelling of the same instant must + collapse) AND the on-disk pyarrow column is uniformly ``timestamp[us, UTC]`` + rather than a tz-stripped mix. + """ + if isinstance(value, datetime): + return value.replace(tzinfo=UTC) if value.tzinfo is None else value.astimezone(UTC) + return value + + +def _normalize_row_timestamps(row: dict) -> dict: + """Return ``row`` with ``observed_at``/``knowledge_time`` normalized to UTC.""" + return { + **row, + "observed_at": _ensure_utc(row.get("observed_at")), + "knowledge_time": _ensure_utc(row.get("knowledge_time")), + } + + +def _dedup_cwop_rows(rows: list[dict]) -> list[dict]: + """First-seen-wins dedup by ``(station_id, observed_at)``. + + Deterministic and order-preserving: the first row carrying a given + ``(station_id, observed_at)`` key survives, later duplicates are dropped. + A station reports one observation per timestamp; re-persisting an + overlapping window must not double-count it. + """ + seen: set[tuple[object, object]] = set() + out: list[dict] = [] + for row in rows: + key = (row.get("station_id"), row.get("observed_at")) + if key in seen: + continue + seen.add(key) + out.append(row) + return out + + +def _write_table_atomic(path: Path, table: pa.Table) -> None: + """Write ``table`` to ``path`` via ``.tmp`` + ``os.replace``. + + Caller MUST already hold ``FileLock(str(path) + '.lock')`` — this is the + inner write half of the read-modify-write merge in :func:`write_cwop_cache`. + ``os.replace`` is atomic on POSIX and Windows, so a concurrent reader sees + either the old partition or the new one, never a truncated parquet. + Parquet ``version="2.6"`` + ``coerce_timestamps="us"`` keep the bytes stable + across pyarrow versions and preserve microsecond timestamps on roundtrip. + """ + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(".tmp") + pq.write_table(table, tmp, version="2.6", coerce_timestamps="us") + os.replace(tmp, path) + + +def write_cwop_cache(station: str, year: int, month: int, rows: list[dict]) -> int: + """Merge ``rows`` into the ``(station, year, month)`` partition. Idempotent. + + Stamps ``source="cwop.cache"`` on every persisted row (the persistence + provenance tag) and dedups by ``(station_id, observed_at)`` against any rows + already on disk. No current-month skip — CWOP data is ephemeral and the + current month is exactly what a live collector wants to retain. + + Returns the total number of rows in the partition after the merge. No-op + (returns the existing count, or 0) when ``rows`` is empty. + + The entire read → concat → dedup → write runs under ONE ``FileLock`` so + concurrent same-partition writers cannot lost-update. + """ + safe_station = _validate_cwop_station(station) + path = cwop_cache_path(station, year, month) + lock = FileLock(str(path) + ".lock", timeout=LOCK_TIMEOUT_SECONDS) + with lock: + existing: list[dict] = [] + if path.exists(): + try: + existing = pq.read_table(path).to_pylist() + except (FileNotFoundError, OSError): + existing = [] + if not rows and not existing: + return 0 + # Stamp cache-source provenance AND the normalized (upper) station id + # AND normalize timestamps to UTC-aware BEFORE dedup. Stamping the + # partition's normalized station onto every row is authoritative (like + # ``source``): a row persisted via a lowercase id (``cw0875``) lands in + # the same ``CW0875`` partition, so its on-disk ``station_id`` and the + # ``(station_id, observed_at)`` dedup key must match the partition or + # casing variants would store as distinct rows and re-surface as + # duplicates from ``history()``. + stamped = [ + _normalize_row_timestamps( + {**row, "source": CWOP_CACHE_SOURCE, "station_id": safe_station} + ) + for row in rows + ] + merged = _dedup_cwop_rows(existing + stamped) + if not merged: + return 0 + table = pa.Table.from_pylist(merged) + _write_table_atomic(path, table) + return len(merged) + + +def read_cwop_cache(station: str, year: int, month: int) -> list[dict] | None: + """Return persisted rows for ``(station, year, month)`` or ``None`` on miss. + + ``None`` when the partition file does not exist (or is unlinked between the + ``exists()`` check and the read — treated as a miss, mirroring the + observation cache TOCTOU handling). + """ + path = cwop_cache_path(station, year, month) + if not path.exists(): + return None + try: + return pq.read_table(path).to_pylist() + except (FileNotFoundError, OSError): + return None + + +def invalidate_cwop(station: str, year: int, month: int) -> bool: + """Remove the ``(station, year, month)`` partition; return whether removed. + + Acquires the same ``FileLock`` as :func:`write_cwop_cache` so an + invalidation racing a write runs strictly before or after it. + """ + path = cwop_cache_path(station, year, month) + lock = FileLock(str(path) + ".lock", timeout=LOCK_TIMEOUT_SECONDS) + with lock: + if path.exists(): + path.unlink() + return True + return False + + +# --------------------------------------------------------------------------- +# Window read + observation persistence +# --------------------------------------------------------------------------- +def _as_utc_datetime(value: date | datetime, *, end_of_day: bool) -> datetime: + """Coerce a ``date``/``datetime`` to a tz-aware UTC ``datetime``. + + A bare ``date`` becomes midnight UTC (``end_of_day=False``) or the last + microsecond of that day (``end_of_day=True``) so an inclusive + ``[from_date, to_date]`` day range captures the whole final day. A naive + ``datetime`` is assumed UTC; a tz-aware one is converted to UTC. + """ + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + # date (but not datetime — datetime is a date subclass, handled above) + if end_of_day: + return datetime(value.year, value.month, value.day, 23, 59, 59, 999999, tzinfo=UTC) + return datetime(value.year, value.month, value.day, tzinfo=UTC) + + +def _iter_year_months(start: datetime, end: datetime) -> list[tuple[int, int]]: + """Yield every ``(year, month)`` partition overlapping ``[start, end]``.""" + if end < start: + return [] + months: list[tuple[int, int]] = [] + y, m = start.year, start.month + while (y, m) <= (end.year, end.month): + months.append((y, m)) + if m == 12: + y, m = y + 1, 1 + else: + m += 1 + return months + + +def read_cwop_window( + station: str, + from_date: date | datetime, + to_date: date | datetime, +) -> list[dict]: + """Read all persisted rows for ``station`` in ``[from_date, to_date]``. + + Inclusive on both ends. Iterates the monthly partitions overlapping the + range, filters rows by ``observed_at`` falling inside the window, and + returns them sorted by ``observed_at`` (then ``station_id`` for stability). + Returns an empty list when nothing is persisted in range — callers raise + :class:`NoCWOPDataError`. + """ + start = _as_utc_datetime(from_date, end_of_day=False) + end = _as_utc_datetime(to_date, end_of_day=True) + out: list[dict] = [] + for year, month in _iter_year_months(start, end): + rows = read_cwop_cache(station, year, month) + if not rows: + continue + for row in rows: + observed = _coerce_observed(row.get("observed_at")) + if observed is None: + continue + if start <= observed <= end: + out.append(row) + out.sort(key=lambda r: (_sort_key(r.get("observed_at")), str(r.get("station_id")))) + return out + + +def _coerce_observed(value: object) -> datetime | None: + """Coerce a parquet ``observed_at`` cell to a tz-aware UTC ``datetime``.""" + if isinstance(value, datetime): + return value if value.tzinfo is not None else value.replace(tzinfo=UTC) + return None + + +def _sort_key(value: object) -> datetime: + """Total-order sort key for ``observed_at`` (nulls sort first).""" + coerced = _coerce_observed(value) + return coerced if coerced is not None else datetime.min.replace(tzinfo=UTC) + + +def persist_observations(observations: list[CWOPObservation]) -> int: + """Persist live ``CWOPObservation`` rows to the monthly parquet cache. + + Groups observations by ``(station_id, year, month)`` of their UTC + ``observed_at`` (so a window straddling a month boundary lands in the right + partitions) and merge-writes each partition. Returns the number of distinct + observations written (post-dedup is applied per partition by + :func:`write_cwop_cache`; this count is the number SUBMITTED). + + This is the collection-side seam: feed it the output of ``snapshot()`` / + ``stream()`` to build a persisted CWOP history for backtesting. + """ + if not observations: + return 0 + + buckets: dict[tuple[str, int, int], list[dict]] = {} + for obs in observations: + observed = obs.observed_at + if observed.tzinfo is None: + observed = observed.replace(tzinfo=UTC) + else: + observed = observed.astimezone(UTC) + key = (obs.station_id, observed.year, observed.month) + buckets.setdefault(key, []).append(_observation_to_row(obs)) + + written = 0 + for (station, year, month), rows in buckets.items(): + write_cwop_cache(station, year, month, rows) + written += len(rows) + return written + + +def _observation_to_row(obs: CWOPObservation) -> dict: + """Flatten a :class:`CWOPObservation` to a persistable parquet row dict. + + ``source`` is intentionally omitted here — :func:`write_cwop_cache` stamps + ``"cwop.cache"`` on every row at write time so the persisted provenance is + authoritative regardless of the live tag the observation carried. + """ + return { + "station_id": obs.station_id, + "observed_at": obs.observed_at, + "knowledge_time": obs.knowledge_time, + "temp_f": obs.temp_f, + "humidity": obs.humidity, + "pressure_mb": obs.pressure_mb, + "wind_speed_mph": obs.wind_speed_mph, + "wind_dir_deg": obs.wind_dir_deg, + "wind_gust_mph": obs.wind_gust_mph, + "rain_1h_in": obs.rain_1h_in, + "luminosity": obs.luminosity, + "lat": obs.lat, + "lon": obs.lon, + "raw_aprs": obs.raw_aprs, + "qc_score": obs.qc_score, + "qc_status": obs.qc_status, + } + + +__all__ = [ + "CWOP_CACHE_SOURCE", + "cwop_cache_path", + "invalidate_cwop", + "persist_observations", + "read_cwop_cache", + "read_cwop_window", + "write_cwop_cache", +] diff --git a/packages/weather/src/mostlyright/weather/cwop/_history.py b/packages/weather/src/mostlyright/weather/cwop/_history.py new file mode 100644 index 0000000..35a0c80 --- /dev/null +++ b/packages/weather/src/mostlyright/weather/cwop/_history.py @@ -0,0 +1,120 @@ +"""``cwop.history()`` — backtest replay of persisted CWOP observations. + +The standalone, parity-safe access path for ML strategies (v0.2). CWOP is NEVER +wired into ``research()`` / ``merge.observations`` / ``live._sources`` — +``research()`` aggregates ``max(temp_f)/min(temp_f)`` source-blind, so a single +hot-rooftop or indoor PWS would corrupt a Kalshi NHIGH/NLOW training target. +Instead, CWOP exposes its own ``history()`` returning a ``schema.cwop.v1`` +DataFrame that a strategy joins itself, on its own terms, fully aware it is +unofficial PWS data. + +Reads come from the monthly parquet cache written by +:func:`mostlyright.weather.cwop.persist_observations` (or ``snapshot(persist=True)``), +so the returned frame is tagged ``source="cwop.cache"``. +""" + +from __future__ import annotations + +from datetime import date, datetime +from typing import TYPE_CHECKING + +from mostlyright.core.exceptions import NoCWOPDataError + +from ._cache import _as_utc_datetime, read_cwop_window +from ._schema import ( + CWOP_CACHE_SOURCE, + CWOP_QC_STATUS_VALUES, + CWOPObservation, + build_cwop_dataframe, + validate_cwop_dataframe, +) + +if TYPE_CHECKING: + import pandas as pd + +__all__ = ["history"] + + +def _row_to_observation(row: dict) -> CWOPObservation: + """Rebuild a :class:`CWOPObservation` from a persisted parquet row dict.""" + return CWOPObservation( + station_id=row["station_id"], + observed_at=row["observed_at"], + knowledge_time=row["knowledge_time"], + lat=row.get("lat"), + lon=row.get("lon"), + raw_aprs=row.get("raw_aprs", ""), + temp_f=row.get("temp_f"), + humidity=row.get("humidity"), + pressure_mb=row.get("pressure_mb"), + wind_speed_mph=row.get("wind_speed_mph"), + wind_dir_deg=row.get("wind_dir_deg"), + wind_gust_mph=row.get("wind_gust_mph"), + rain_1h_in=row.get("rain_1h_in"), + luminosity=row.get("luminosity"), + source=CWOP_CACHE_SOURCE, + qc_score=row.get("qc_score"), + # `or` (not dict default) so an explicit null cell coerces to "unknown" + # — the enum column is non-nullable, so None would fail validation. + qc_status=row.get("qc_status") or "unknown", + ) + + +def history( + station: str, + from_date: date | datetime, + to_date: date | datetime, + *, + qc_status: str | None = None, +) -> pd.DataFrame: + """Return persisted CWOP observations for ``station`` in ``[from_date, to_date]``. + + The backtest-replay entry point: reads the monthly parquet cache and returns + a ``schema.cwop.v1`` DataFrame (``source="cwop.cache"``, validated before + return) that an ML strategy can join against its official-obs features. The + date range is **inclusive** on both ends; bare ``date`` arguments cover the + whole calendar day in UTC. + + Args: + station: CWOP station id (e.g. ``"CW0875"``). + from_date: window start (inclusive). ``date`` or ``datetime``. + to_date: window end (inclusive). ``date`` or ``datetime``. + qc_status: optional filter — keep only rows with this QC verdict + (``"clean"`` / ``"flagged"`` / ``"dropped"`` / ``"unknown"``). A + backtest typically passes ``"clean"`` to drop QC-rejected PWS rows. + + Returns: + A ``schema.cwop.v1`` DataFrame sorted by ``observed_at``. + + Raises: + ValueError: ``from_date`` is after ``to_date``, or ``qc_status`` is not + a recognized QC verdict. + NoCWOPDataError: no persisted CWOP data in range (after the optional + ``qc_status`` filter) — never returns an empty frame. + """ + if qc_status is not None and qc_status not in CWOP_QC_STATUS_VALUES: + raise ValueError(f"qc_status={qc_status!r} must be one of {CWOP_QC_STATUS_VALUES} or None") + # Compare with the SAME normalization read_cwop_window applies (from = start + # of day, to = end of day) so the guard never rejects a valid asymmetric + # range like (datetime 18:00, date same-day) that the window would accept. + if _as_utc_datetime(from_date, end_of_day=False) > _as_utc_datetime(to_date, end_of_day=True): + raise ValueError(f"from_date {from_date!r} is after to_date {to_date!r}") + + rows = read_cwop_window(station, from_date, to_date) + if qc_status is not None: + # Filter on the SAME coercion _row_to_observation applies (null → + # "unknown") so the predicate matches the value the caller sees in the + # returned frame — a null-qc row is reachable via qc_status="unknown". + rows = [r for r in rows if (r.get("qc_status") or "unknown") == qc_status] + + if not rows: + suffix = f" with qc_status={qc_status!r}" if qc_status is not None else "" + raise NoCWOPDataError( + station, + f"no persisted CWOP observations in [{from_date}, {to_date}]{suffix}", + ) + + observations = [_row_to_observation(r) for r in rows] + df = build_cwop_dataframe(observations, source=CWOP_CACHE_SOURCE) + validate_cwop_dataframe(df) + return df diff --git a/packages/weather/src/mostlyright/weather/cwop/_schema.py b/packages/weather/src/mostlyright/weather/cwop/_schema.py index 2e856e6..87e536b 100644 --- a/packages/weather/src/mostlyright/weather/cwop/_schema.py +++ b/packages/weather/src/mostlyright/weather/cwop/_schema.py @@ -6,9 +6,10 @@ imported, so a base install that never touches CWOP pays nothing and the canonical observation/forecast/settlement schemas are untouched. -Source identity is the single fixed tag ``"cwop.live"`` (``_registered_source``) -— CWOP data never enters the merge/research path, so there is no union of -sources here. +Source identity is the CWOP-only union ``{"cwop.live", "cwop.cache"}`` +(``_registered_sources``) — ``"cwop.live"`` for fresh APRS-IS pulls, +``"cwop.cache"`` for persisted-then-read history (v0.2). CWOP data never enters +the merge/research path, so this union is local to ``schema.cwop.v1``. Also home to :class:`CWOPObservation`, the row model the public surface yields, and the :func:`build_cwop_dataframe` / :func:`validate_cwop_dataframe` helpers @@ -27,9 +28,20 @@ if TYPE_CHECKING: import pandas as pd -#: Source-identity tag stamped on every CWOP row + ``df.attrs["source"]``. +#: Source-identity tag stamped on every CWOP row + ``df.attrs["source"]`` for +#: data pulled LIVE off the APRS-IS stream. CWOP_SOURCE = "cwop.live" +#: Source-identity tag for data read back from the persisted parquet cache +#: (``cwop.history()`` / backtest replay). A consumer joining CWOP history into +#: a model sees ``"cwop.cache"`` and knows the provenance is persisted-then-read, +#: NOT a fresh socket pull. CWOP carries BOTH tags in its own ``schema.cwop.v1`` +#: source union — it still never enters the parity-frozen merge/research path. +CWOP_CACHE_SOURCE = "cwop.cache" + +#: The full set of source identities ``schema.cwop.v1`` accepts (live + cache). +CWOP_SOURCES: frozenset[str] = frozenset({CWOP_SOURCE, CWOP_CACHE_SOURCE}) + #: QC verdict vocabulary (mirrors the qc/_cwop.py thresholds). ``"unknown"`` is #: the pre-QC default carried by a freshly-parsed observation. CWOP_QC_STATUS_VALUES: tuple[str, ...] = ("unknown", "clean", "flagged", "dropped") @@ -68,15 +80,21 @@ class CWOPObservation: class CwopSchema(Schema): """``schema.cwop.v1`` — one row per parsed CWOP weather observation. - Standalone (NOT a member of ``schema.observation.v1``). Single registered - source ``"cwop.live"``. Nullable numeric fields reflect that a PWS may not - report every variable in every packet. + Standalone (NOT a member of ``schema.observation.v1``). Registered source + union ``{"cwop.live", "cwop.cache"}`` (live pull vs persisted-then-read). + Nullable numeric fields reflect that a PWS may not report every variable in + every packet. """ schema_id = "schema.cwop.v1" - #: Single permitted source identity (singular guard, not a union). - _registered_source: ClassVar[str] = CWOP_SOURCE + #: Permitted source identities. ``cwop.live`` for fresh APRS-IS pulls, + #: ``cwop.cache`` for persisted-then-read history (v0.2). Declared as the + #: union (plural) ONLY — the validator's source-drift audit keys off the + #: legacy singular ``_registered_source``, so setting it here would log a + #: spurious ``source_drift_allowed`` event on every legitimate cache-frame + #: read and pollute the train/infer-mismatch audit trail. + _registered_sources: ClassVar[frozenset[str]] = CWOP_SOURCES COLUMNS: ClassVar[list[ColumnSpec]] = [ ColumnSpec(name="station_id", dtype="string", units=None, nullable=False), @@ -128,7 +146,7 @@ class CwopSchema(Schema): dtype="string", units=None, nullable=False, - notes="per-row source identity == df.attrs['source'] == 'cwop.live'", + notes="per-row source identity == df.attrs['source'] ('cwop.live' | 'cwop.cache')", ), ColumnSpec( name="qc_score", @@ -174,13 +192,21 @@ def build_cwop_dataframe( observations: list[CWOPObservation], *, retrieved_at: datetime | None = None, + source: str = CWOP_SOURCE, ) -> pd.DataFrame: """Assemble CWOP observations into a ``schema.cwop.v1`` DataFrame. - Stamps ``df.attrs["source"] = "cwop.live"`` + ``df.attrs["retrieved_at"]`` + Stamps ``df.attrs["source"] = source`` + ``df.attrs["retrieved_at"]`` (the validator's required provenance) and coerces dtypes so the frame passes :func:`validate_cwop_dataframe` (tz-aware UTC timestamps, nullable ``Int64`` integer columns, ``float64`` numerics). + + ``source`` defaults to ``"cwop.live"`` (the live ``snapshot()`` path). The + persisted-history path (:func:`mostlyright.weather.cwop.history`) passes + ``"cwop.cache"`` so the frame is tagged as cache-read provenance. The + per-row ``source`` column is overwritten to match ``source`` so the + validator's row-vs-attrs source check passes uniformly regardless of what + tag the individual ``CWOPObservation`` instances carried. """ import pandas as pd @@ -203,7 +229,7 @@ def build_cwop_dataframe( "lat": o.lat, "lon": o.lon, "raw_aprs": o.raw_aprs, - "source": o.source, + "source": source, "qc_score": o.qc_score, "qc_status": o.qc_status, } @@ -221,7 +247,7 @@ def build_cwop_dataframe( for col in ("station_id", "raw_aprs", "source", "qc_status"): df[col] = df[col].astype("object") - df.attrs["source"] = CWOP_SOURCE + df.attrs["source"] = source df.attrs["retrieved_at"] = retrieved_at return df @@ -232,8 +258,10 @@ def validate_cwop_dataframe(df: pd.DataFrame) -> None: __all__ = [ + "CWOP_CACHE_SOURCE", "CWOP_QC_STATUS_VALUES", "CWOP_SOURCE", + "CWOP_SOURCES", "CWOPObservation", "CwopSchema", "build_cwop_dataframe", diff --git a/packages/weather/src/mostlyright/weather/cwop/_snapshot.py b/packages/weather/src/mostlyright/weather/cwop/_snapshot.py index 84be05e..501a39b 100644 --- a/packages/weather/src/mostlyright/weather/cwop/_snapshot.py +++ b/packages/weather/src/mostlyright/weather/cwop/_snapshot.py @@ -105,12 +105,20 @@ def snapshot( *, duration_seconds: int = 60, qc: bool = True, + persist: bool = False, ) -> pd.DataFrame: """Collect CWOP observations for ``duration_seconds`` into a DataFrame. Synchronous. Returns a ``schema.cwop.v1`` frame (validated before return), with QC scores/status populated when ``qc=True``. + When ``persist=True`` the collected observations are also written to the + monthly parquet cache (``$HOME/.mostlyright/cache/cwop/...``) so they can be + replayed later via :func:`mostlyright.weather.cwop.history` for backtesting. + Persistence is a side effect — the returned (live) frame is unchanged and + still tagged ``source="cwop.live"``; only the on-disk rows carry the + ``"cwop.cache"`` provenance tag. + Raises: NoCWOPDataError: no observations arrived during the window. """ @@ -124,6 +132,12 @@ def snapshot( history: dict[str, list[dict[str, Any]]] = {} observations = [_packet_to_observation(p, qc=qc, history=history) for p in packets] + if persist: + # Local import keeps the cache (pyarrow) dependency lazy — callers who + # never persist pay nothing. + from ._cache import persist_observations + + persist_observations(observations) df = build_cwop_dataframe(observations) validate_cwop_dataframe(df) return df diff --git a/tests/test_cwop_cache.py b/tests/test_cwop_cache.py new file mode 100644 index 0000000..af82d63 --- /dev/null +++ b/tests/test_cwop_cache.py @@ -0,0 +1,267 @@ +"""Tests for the CWOP persistence layer (``weather.cwop._cache``). + +The parquet cache root is isolated per-test via ``MOSTLYRIGHT_CACHE_DIR`` so a +test never touches the user's real ``~/.mostlyright/cache``. +""" + +from __future__ import annotations + +from datetime import UTC, date, datetime + +import pytest +from mostlyright.weather.cwop import _cache +from mostlyright.weather.cwop._cache import ( + CWOP_CACHE_SOURCE, + cwop_cache_path, + invalidate_cwop, + persist_observations, + read_cwop_cache, + read_cwop_window, + write_cwop_cache, +) +from mostlyright.weather.cwop._schema import CWOPObservation + + +@pytest.fixture(autouse=True) +def _isolated_cache(tmp_path, monkeypatch): + """Point the shared cache root at a tmpdir for every test in this module.""" + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + return tmp_path + + +def _obs(station="CW0875", *, ts: datetime, temp=72.0, qc_status="clean", qc_score=0.9): + return CWOPObservation( + station_id=station, + observed_at=ts, + knowledge_time=ts, + lat=40.7, + lon=-73.9, + raw_aprs=f"{station}>APRS:_{ts:%d%H%M}", + temp_f=temp, + humidity=55, + qc_status=qc_status, + qc_score=qc_score, + ) + + +# --------------------------------------------------------------------------- +# Path construction + validation +# --------------------------------------------------------------------------- +def test_cache_path_layout(_isolated_cache): + path = cwop_cache_path("CW0875", 2026, 6) + assert path == _isolated_cache / "cwop" / "CW0875" / "2026" / "06.parquet" + + +def test_cache_path_zero_pads_month(_isolated_cache): + assert cwop_cache_path("CW0875", 2026, 1).name == "01.parquet" + + +def test_cache_path_normalizes_case(_isolated_cache): + assert cwop_cache_path("cw0875", 2026, 6).parent.parent.name == "CW0875" + + +@pytest.mark.parametrize("bad", ["../etc", "CW/0875", "CW.0875", "A", "AB", "", "x" * 13]) +def test_cache_path_rejects_unsafe_station(bad): + with pytest.raises(ValueError, match="path-safe CWOP id"): + cwop_cache_path(bad, 2026, 6) + + +def test_cache_path_rejects_non_str(): + with pytest.raises(TypeError): + cwop_cache_path(12345, 2026, 6) # type: ignore[arg-type] + + +def test_cache_path_accepts_ham_callsign_with_ssid(_isolated_cache): + # Ham-callsign reporters carry a -SSID suffix; hyphen is path-safe. + path = cwop_cache_path("N0CALL-1", 2026, 6) + assert path.parent.parent.name == "N0CALL-1" + + +# --------------------------------------------------------------------------- +# write / read / merge / dedup +# --------------------------------------------------------------------------- +def test_write_then_read_roundtrip(): + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + rows = [_observation_row(_obs(ts=ts))] + count = write_cwop_cache("CW0875", 2026, 6, rows) + assert count == 1 + back = read_cwop_cache("CW0875", 2026, 6) + assert back is not None and len(back) == 1 + assert back[0]["station_id"] == "CW0875" + + +def test_write_stamps_cache_source(): + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + row = _observation_row(_obs(ts=ts)) + row["source"] = "cwop.live" # caller-provided live tag is overwritten + write_cwop_cache("CW0875", 2026, 6, [row]) + back = read_cwop_cache("CW0875", 2026, 6) + assert back[0]["source"] == CWOP_CACHE_SOURCE + + +def test_write_empty_is_noop(): + assert write_cwop_cache("CW0875", 2026, 6, []) == 0 + assert read_cwop_cache("CW0875", 2026, 6) is None + + +def test_read_miss_returns_none(): + assert read_cwop_cache("CW0875", 2099, 1) is None + + +def test_merge_accumulates_distinct_observations(): + t1 = datetime(2026, 6, 15, 12, tzinfo=UTC) + t2 = datetime(2026, 6, 15, 12, 10, tzinfo=UTC) + write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=t1))]) + total = write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=t2))]) + assert total == 2 + assert len(read_cwop_cache("CW0875", 2026, 6)) == 2 + + +def test_merge_dedups_repeated_observation(): + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + row = _observation_row(_obs(ts=ts)) + write_cwop_cache("CW0875", 2026, 6, [row]) + # Re-persisting the same (station, observed_at) is idempotent. + total = write_cwop_cache("CW0875", 2026, 6, [row]) + assert total == 1 + + +def test_merge_keeps_first_seen_on_duplicate_key(): + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + first = _observation_row(_obs(ts=ts, temp=70.0)) + write_cwop_cache("CW0875", 2026, 6, [first]) + second = _observation_row(_obs(ts=ts, temp=99.0)) # same key, different temp + write_cwop_cache("CW0875", 2026, 6, [second]) + back = read_cwop_cache("CW0875", 2026, 6) + assert len(back) == 1 + assert back[0]["temp_f"] == 70.0 # first-seen wins + + +# --------------------------------------------------------------------------- +# invalidate +# --------------------------------------------------------------------------- +def test_invalidate_removes_partition(): + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=ts))]) + assert invalidate_cwop("CW0875", 2026, 6) is True + assert read_cwop_cache("CW0875", 2026, 6) is None + + +def test_invalidate_absent_returns_false(): + assert invalidate_cwop("CW0875", 2099, 1) is False + + +# --------------------------------------------------------------------------- +# window read +# --------------------------------------------------------------------------- +def test_window_spans_multiple_months(): + persist_observations( + [ + _obs(ts=datetime(2026, 5, 31, 23, tzinfo=UTC), temp=60.0), + _obs(ts=datetime(2026, 6, 1, 1, tzinfo=UTC), temp=61.0), + _obs(ts=datetime(2026, 7, 2, 12, tzinfo=UTC), temp=80.0), + ] + ) + rows = read_cwop_window("CW0875", date(2026, 5, 1), date(2026, 6, 30)) + temps = [r["temp_f"] for r in rows] + assert temps == [60.0, 61.0] # July excluded, sorted by observed_at + + +def test_window_inclusive_bounds(): + persist_observations([_obs(ts=datetime(2026, 6, 15, 0, 0, 0, tzinfo=UTC))]) + rows = read_cwop_window("CW0875", date(2026, 6, 15), date(2026, 6, 15)) + assert len(rows) == 1 # bare date covers the whole day + + +def test_window_excludes_out_of_range(): + persist_observations([_obs(ts=datetime(2026, 6, 20, 12, tzinfo=UTC))]) + assert read_cwop_window("CW0875", date(2026, 6, 1), date(2026, 6, 10)) == [] + + +def test_window_reversed_range_is_empty(): + persist_observations([_obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC))]) + assert read_cwop_window("CW0875", date(2026, 6, 30), date(2026, 6, 1)) == [] + + +def test_window_accepts_datetime_bounds(): + persist_observations( + [ + _obs(ts=datetime(2026, 6, 15, 9, tzinfo=UTC), temp=65.0), + _obs(ts=datetime(2026, 6, 15, 18, tzinfo=UTC), temp=85.0), + ] + ) + rows = read_cwop_window( + "CW0875", + datetime(2026, 6, 15, 12, tzinfo=UTC), + datetime(2026, 6, 15, 23, tzinfo=UTC), + ) + assert [r["temp_f"] for r in rows] == [85.0] + + +# --------------------------------------------------------------------------- +# persist_observations grouping +# --------------------------------------------------------------------------- +def test_persist_groups_by_station_and_month(): + written = persist_observations( + [ + _obs("CW0875", ts=datetime(2026, 6, 1, 12, tzinfo=UTC)), + _obs("CW0875", ts=datetime(2026, 7, 1, 12, tzinfo=UTC)), + _obs("DW1234", ts=datetime(2026, 6, 1, 12, tzinfo=UTC)), + ] + ) + assert written == 3 + assert read_cwop_cache("CW0875", 2026, 6) is not None + assert read_cwop_cache("CW0875", 2026, 7) is not None + assert read_cwop_cache("DW1234", 2026, 6) is not None + assert read_cwop_cache("DW1234", 2026, 7) is None + + +def test_persist_empty_returns_zero(): + assert persist_observations([]) == 0 + + +def test_persist_naive_datetime_treated_as_utc(): + naive = datetime(2026, 6, 15, 12) # no tzinfo + persist_observations([_obs(ts=naive)]) + assert read_cwop_cache("CW0875", 2026, 6) is not None + + +def test_persist_current_month_not_skipped(): + """CWOP is ephemeral — the current month MUST persist (no skip rule).""" + now = datetime.now(UTC) + persist_observations([_obs(ts=now)]) + assert read_cwop_cache("CW0875", now.year, now.month) is not None + + +def test_mixed_naive_and_aware_same_instant_dedups(): + """BL-01: a naive and an aware spelling of the same instant collapse to one.""" + aware = datetime(2026, 6, 15, 12, tzinfo=UTC) + naive = datetime(2026, 6, 15, 12) # same instant, no tzinfo + write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=aware, temp=70.0))]) + total = write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=naive, temp=99.0))]) + back = read_cwop_cache("CW0875", 2026, 6) + assert total == 1 + assert len(back) == 1 + assert back[0]["temp_f"] == 70.0 # first-seen wins, not double-counted + + +def test_persisted_timestamps_are_utc_aware(): + """BL-01: on-disk observed_at is uniformly tz-aware UTC, not tz-stripped.""" + write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=datetime(2026, 6, 15, 12)))]) + back = read_cwop_cache("CW0875", 2026, 6) + assert back[0]["observed_at"].tzinfo is not None + + +def test_non_utc_aware_normalized_to_utc(): + """BL-01: an aware non-UTC datetime is stored as its UTC instant.""" + from datetime import timedelta, timezone + + est = timezone(timedelta(hours=-5)) + ts = datetime(2026, 6, 15, 7, tzinfo=est) # == 12:00 UTC + write_cwop_cache("CW0875", 2026, 6, [_observation_row(_obs(ts=ts))]) + back = read_cwop_cache("CW0875", 2026, 6) + assert back[0]["observed_at"] == datetime(2026, 6, 15, 12, tzinfo=UTC) + + +def _observation_row(obs: CWOPObservation) -> dict: + return _cache._observation_to_row(obs) diff --git a/tests/test_cwop_history.py b/tests/test_cwop_history.py new file mode 100644 index 0000000..8bb359b --- /dev/null +++ b/tests/test_cwop_history.py @@ -0,0 +1,193 @@ +"""Tests for ``cwop.history()`` — backtest replay of persisted CWOP data. + +Covers the parity-safe access path: ``history()`` returns a ``schema.cwop.v1`` +DataFrame tagged ``source="cwop.cache"`` and never touches research/merge. +""" + +from __future__ import annotations + +from datetime import UTC, date, datetime + +import pytest +from mostlyright.weather import cwop +from mostlyright.weather.cwop import history +from mostlyright.weather.cwop._schema import ( + CWOP_CACHE_SOURCE, + CWOPObservation, + validate_cwop_dataframe, +) + + +@pytest.fixture(autouse=True) +def _isolated_cache(tmp_path, monkeypatch): + monkeypatch.setenv("MOSTLYRIGHT_CACHE_DIR", str(tmp_path)) + return tmp_path + + +def _obs(station="CW0875", *, ts: datetime, temp=72.0, qc_status="clean"): + return CWOPObservation( + station_id=station, + observed_at=ts, + knowledge_time=ts, + lat=40.7, + lon=-73.9, + raw_aprs=f"{station}>APRS:_{ts:%d%H%M}", + temp_f=temp, + humidity=55, + qc_status=qc_status, + qc_score=0.9 if qc_status == "clean" else 0.2, + ) + + +def test_history_returns_validated_cache_frame(): + cwop.persist_observations([_obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC))]) + df = history("CW0875", date(2026, 6, 1), date(2026, 6, 30)) + assert len(df) == 1 + assert df.attrs["source"] == CWOP_CACHE_SOURCE + assert (df["source"] == CWOP_CACHE_SOURCE).all() + validate_cwop_dataframe(df) # must not raise + + +def test_history_sorted_by_observed_at(): + cwop.persist_observations( + [ + _obs(ts=datetime(2026, 6, 15, 18, tzinfo=UTC), temp=85.0), + _obs(ts=datetime(2026, 6, 15, 6, tzinfo=UTC), temp=60.0), + _obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC), temp=72.0), + ] + ) + df = history("CW0875", date(2026, 6, 15), date(2026, 6, 15)) + assert list(df["temp_f"]) == [60.0, 72.0, 85.0] + + +def test_history_preserves_temp_and_qc(): + cwop.persist_observations([_obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC), temp=68.5)]) + df = history("CW0875", date(2026, 6, 1), date(2026, 6, 30)) + assert df["temp_f"].iloc[0] == 68.5 + assert df["qc_status"].iloc[0] == "clean" + + +def test_history_qc_status_filter(): + cwop.persist_observations( + [ + _obs(ts=datetime(2026, 6, 15, 6, tzinfo=UTC), qc_status="clean"), + _obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC), qc_status="flagged"), + ] + ) + df = history("CW0875", date(2026, 6, 15), date(2026, 6, 15), qc_status="clean") + assert len(df) == 1 + assert df["qc_status"].iloc[0] == "clean" + + +def test_history_empty_raises_no_cwop_data(): + with pytest.raises(cwop.NoCWOPDataError): + history("CW0875", date(2026, 6, 1), date(2026, 6, 30)) + + +def test_history_qc_filter_to_empty_raises(): + cwop.persist_observations([_obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC), qc_status="clean")]) + with pytest.raises(cwop.NoCWOPDataError, match="dropped"): + history("CW0875", date(2026, 6, 15), date(2026, 6, 15), qc_status="dropped") + + +def test_history_rejects_bad_qc_status(): + with pytest.raises(ValueError, match="qc_status"): + history("CW0875", date(2026, 6, 1), date(2026, 6, 30), qc_status="bogus") + + +def test_history_rejects_reversed_range(): + with pytest.raises(ValueError, match="after"): + history("CW0875", date(2026, 6, 30), date(2026, 6, 1)) + + +def test_history_excludes_other_station(): + cwop.persist_observations([_obs("DW1234", ts=datetime(2026, 6, 15, 12, tzinfo=UTC))]) + with pytest.raises(cwop.NoCWOPDataError): + history("CW0875", date(2026, 6, 1), date(2026, 6, 30)) + + +def test_history_accepts_asymmetric_intraday_range(): + """MD-01: (datetime mid-day, bare date same-day) is valid, must not ValueError.""" + cwop.persist_observations([_obs(ts=datetime(2026, 6, 15, 20, tzinfo=UTC))]) + df = history("CW0875", datetime(2026, 6, 15, 18, tzinfo=UTC), date(2026, 6, 15)) + assert len(df) == 1 + + +def test_history_cache_frame_logs_no_source_drift_audit(): + """MD-02: a legitimate cwop.cache read must NOT record a source-drift audit.""" + from mostlyright.core.validator import validate_dataframe + + cwop.persist_observations([_obs(ts=datetime(2026, 6, 15, 12, tzinfo=UTC))]) + df = history("CW0875", date(2026, 6, 15), date(2026, 6, 15)) + reg = validate_dataframe(df, "schema.cwop.v1") + events = [e.get("event") for e in reg.audit_log()] + assert "source_drift_allowed" not in events + + +def test_history_coerces_null_qc_status(monkeypatch): + """LW-01: a persisted row with qc_status=None reads back as 'unknown', not error.""" + from mostlyright.weather.cwop._cache import write_cwop_cache + + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + row = { + "station_id": "CW0875", + "observed_at": ts, + "knowledge_time": ts, + "temp_f": 70.0, + "raw_aprs": "x", + "qc_status": None, + } + write_cwop_cache("CW0875", 2026, 6, [row]) + df = history("CW0875", date(2026, 6, 15), date(2026, 6, 15)) + assert df["qc_status"].iloc[0] == "unknown" + + +def test_history_null_qc_reachable_via_unknown_filter(): + """qc_status='unknown' must surface persisted rows stored with a null verdict.""" + from mostlyright.weather.cwop._cache import write_cwop_cache + + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + write_cwop_cache( + "CW0875", + 2026, + 6, + [ + { + "station_id": "CW0875", + "observed_at": ts, + "knowledge_time": ts, + "temp_f": 70.0, + "raw_aprs": "x", + "qc_status": None, + } + ], + ) + df = history("CW0875", date(2026, 6, 15), date(2026, 6, 15), qc_status="unknown") + assert len(df) == 1 + assert df["qc_status"].iloc[0] == "unknown" + + +def test_history_round_trips_from_snapshot_persist(monkeypatch): + """snapshot(persist=True) then history() — the full collect→replay loop.""" + import mostlyright.weather.cwop._snapshot as snap + from mostlyright.weather._aprs import APRSPacket + + ts = datetime(2026, 6, 15, 12, tzinfo=UTC) + packet = APRSPacket( + raw="CW0875>APRS,TCPIP*:_06151200c000s000t072", + source_call="CW0875", + lat=40.7, + lon=-73.9, + timestamp=ts, + weather={"temp_f": 72.0, "humidity": 55}, + receive_time=ts, + ) + monkeypatch.setattr(snap, "_collect_packets", lambda *a, **k: [packet]) + + df_live = cwop.snapshot("CW0875", duration_seconds=1, qc=False, persist=True) + assert df_live.attrs["source"] == "cwop.live" # live frame unchanged + + df_hist = history("CW0875", date(2026, 6, 15), date(2026, 6, 15)) + assert df_hist.attrs["source"] == CWOP_CACHE_SOURCE + assert len(df_hist) == 1 + assert df_hist["temp_f"].iloc[0] == 72.0