Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ This applies to every PLAN.md generated by `/gsd-plan-phase`, every PROJECT.md "

Standalone live adapter for Citizen Weather Observer Program (PWS) data over the APRS-IS TCP stream. **Deliberately isolated from the parity-critical core** — a hot-rooftop or indoor PWS must never corrupt a Kalshi NHIGH/NLOW settlement target.

- **Never wired into** `research()`, `merge/observations.py` (`SOURCE_PRIORITY`), or `live/_sources.py`. CWOP carries its own `schema.cwop.v1` — `schema.observation.v1`'s `observation_type` enum is untouched. These four files are the parity firewall; do not register "cwop" in any of them.
- **Public surface (sync unless noted):** `nearby()` (start-here ICAO→scan), `scan()`, `stream()` (async generator), `snapshot()` (→ `schema.cwop.v1` DataFrame), `latest()`. Source tag `"cwop.live"`. Errors raise `NoCWOPDataError` (subclass of `NoLiveDataError`) — never return `[]`/`None`.
- **File layout:** `weather/_aprs.py` (in-house MIT parser, no GPLv2 aprslib), `weather/_fetchers/_aprs_stream.py` (`_CWOPReaderThread` daemon owns socket → `threading.Queue` → sync/async consumers; `loop.call_soon_threadsafe` bridges async), `weather/cwop/*` (public API + `_schema.py` + `_registry.py`), `weather/qc/_cwop.py` (QC). Pure stdlib (`socket` + `re`); `[cwop]` extra is empty.
- **Never wired into** `research()`, `merge/observations.py` (`SOURCE_PRIORITY`), or `live/_sources.py`. CWOP carries its own `schema.cwop.v1` — `schema.observation.v1`'s `observation_type` enum is untouched. These four files are the parity firewall; do not register "cwop" in any of them. This holds in v0.2: persistence + `history()` add NO wiring into those four files.
- **Public surface (sync unless noted):** `nearby()` (start-here ICAO→scan), `scan()`, `stream()` (async generator), `snapshot()` (→ `schema.cwop.v1` DataFrame; `persist=True` also writes the backtest cache), `latest()`. v0.2 adds `history(station, from_date, to_date, *, qc_status=None)` (→ `schema.cwop.v1` DataFrame for backtest replay) and `persist_observations(observations)`. Source tag `"cwop.live"` (live) / `"cwop.cache"` (persisted-then-read); `schema.cwop.v1` accepts both via `_registered_sources`. Errors raise `NoCWOPDataError` (subclass of `NoLiveDataError`) — never return `[]`/`None`.
- **File layout:** `weather/_aprs.py` (in-house MIT parser, no GPLv2 aprslib), `weather/_fetchers/_aprs_stream.py` (`_CWOPReaderThread` daemon owns socket → `threading.Queue` → sync/async consumers; `loop.call_soon_threadsafe` bridges async), `weather/cwop/*` (public API + `_schema.py` + `_registry.py` + v0.2 `_cache.py`/`_history.py`), `weather/qc/_cwop.py` (QC). Pure stdlib (`socket` + `re`) for the live path; persistence pulls `pyarrow`/`filelock`. `[cwop]` extra pulls pandas (for `snapshot()`/`history()` DataFrames).
- **Persistence (v0.2):** monthly parquet at `$HOME/.mostlyright/cache/cwop/{station}/{year}/{month}.parquet` (honors `MOSTLYRIGHT_CACHE_DIR`). Standalone `weather/cwop/_cache.py` — does NOT import the parity-coupled `weather/cache.py`; `filelock`-guarded read-modify-write merge, dedup by `(station_id, observed_at)` first-seen-wins. **NO current-month skip** (CWOP is ephemeral — unlike re-fetchable AWC/IEM/GHCNh, the current month is the only chance to retain it). CWOP station ids (CW0875, ham callsigns) are NOT ICAO, so the cache uses its own `_CWOP_STATION_RE` path validator, not `STATION_CODE_RE`.
- **6-layer QC thresholds:** each layer scores 0.0–1.0; combined = weighted geometric mean. `< 0.3` → dropped, `0.3–0.7` → flagged, `> 0.7` → clean. Layers: range (0.20), temporal consistency (0.20), indoor detection (0.15), buddy/ASOS check (0.20), solar-radiation bias (0.10), reliability (0.15). Layers needing history (indoor ≥24h, solar ≥7d) pass (1.0) until enough data accumulates.
- **v0.1 limits:** live-only (no persistence/backtest), no `research()` integration. Persistence + research wiring are v0.2. See [`docs/cwop-adapter.md`](docs/cwop-adapter.md).
- **Why no `include_cwop` on `research()`:** `research()` aggregates `max/min(temp_f)` source-blind, so a hot-rooftop/indoor PWS would corrupt a Kalshi NHIGH/NLOW target. CWOP reaches models ONLY through `cwop.history()` (a DataFrame the strategy joins itself), never the settlement join. See [`docs/cwop-adapter.md`](docs/cwop-adapter.md).

## Testing

Expand Down
57 changes: 53 additions & 4 deletions docs/cwop-adapter.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ stations report every 5–15 minutes. The only real-time path is a TCP stream
> parity-frozen `schema.observation.v1`). A hot-rooftop or indoor PWS must never
> silently corrupt a Kalshi NHIGH/NLOW settlement target — so CWOP lives on its
> own island, gated by a 6-layer QC pipeline. There is no `research()`
> integration and no backtesting in v0.1.
> integration: CWOP reaches ML strategies only through its **own** access path
> (`cwop.history()`), which returns a `schema.cwop.v1` DataFrame the strategy
> joins itself — fully aware it is unofficial PWS data.

## Install

Expand Down Expand Up @@ -47,6 +49,48 @@ async for obs in cwop.stream("CW0875"):
print(obs.observed_at, obs.temp_f)
```

## Persistence + backtest (v0.2)

CWOP data is ephemeral — the APRS-IS stream has no REST backfill, so the only
way to build a history for model training is to **persist what you collect**.
v0.2 adds a monthly parquet cache and a `history()` replay query, both behind
the same firewall as the live surface (CWOP never enters `research()`/merge).

```python
from datetime import date
from mostlyright.weather import cwop

# Collect AND persist in one call (side effect — the returned live frame is
# unchanged and still tagged source="cwop.live").
cwop.snapshot("CW0875", duration_seconds=120, persist=True)

# Or persist the output of any collection path (e.g. a long stream run).
obs = [o async for o in cwop.stream("CW0875")] # illustrative
cwop.persist_observations(obs)

# Replay persisted history for backtesting — returns a schema.cwop.v1 frame
# tagged source="cwop.cache" that your strategy joins itself.
hist = cwop.history("CW0875", date(2026, 1, 1), date(2026, 6, 30))

# Drop QC-rejected PWS rows up front:
clean = cwop.history("CW0875", date(2026, 1, 1), date(2026, 6, 30), qc_status="clean")
```

- **Cache layout:** `$HOME/.mostlyright/cache/cwop/{station}/{year}/{month}.parquet`
(honors `MOSTLYRIGHT_CACHE_DIR`). `filelock`-guarded read-modify-write merge;
dedup by `(station_id, observed_at)`, first-seen wins.
- **No current-month skip.** Unlike the parity observation cache (which skips
the mutable current month because AWC/IEM/GHCNh are re-fetchable), CWOP
persists everything — the current month is exactly what a live collector
needs to keep.
- **Source identity:** live pulls are `"cwop.live"`; anything read back from the
cache is `"cwop.cache"`. `schema.cwop.v1` accepts both.
- **Why no `include_cwop=True` on `research()`?** `research()` aggregates
`max(temp_f)/min(temp_f)` source-blind, so a single hot-rooftop or indoor PWS
would raise `obs_high_f` / lower `obs_low_f` and corrupt a Kalshi NHIGH/NLOW
training target. CWOP therefore stays on its own `history()` path; the
strategy decides how (and whether) to fold it in.

Every failure mode raises `NoCWOPDataError` (a `NoLiveDataError` subclass) with
the requested station/area and a `reason` — the API never returns `[]`/`None`.

Expand Down Expand Up @@ -92,10 +136,15 @@ geometric mean. Status: `<0.3` dropped, `0.3–0.7` flagged, `>0.7` clean.
Layers without enough history pass (1.0) until data accumulates, so a
freshly-seen station is scored mostly on the range + temporal layers.

## Known limitations (v0.1)
## Known limitations (v0.2)

- **Live-only** — no persistence, no backtesting. CWOP data is ephemeral.
- **No `research()` integration** — CWOP cannot be used for model training yet.
- **Persistence is collect-then-replay** — CWOP has no REST backfill, so
`history()` only returns what you previously persisted (via `snapshot(persist=True)`
or `persist_observations()`). There is no way to retrieve CWOP data for a date
before you started collecting.
- **No `research()` integration** — by design. CWOP reaches models only through
the standalone `history()` path (see *Persistence + backtest* above); it is
never folded into the parity-critical settlement join.
- **Buddy check needs an ASOS** within 50km; degraded scoring otherwise.
- **Solar-bias / indoor detection need 7d / 24h** of history (the live session
rarely has it), so they pass until enough data accumulates.
Expand Down
20 changes: 18 additions & 2 deletions packages/weather/src/mostlyright/weather/cwop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
coordinates and scan around it.
- :func:`scan` — discover stations near a lat/lon by listening on APRS-IS.
- :func:`stream` — async generator of fresh observations.
- :func:`snapshot` — collect a fixed window into a ``schema.cwop.v1`` DataFrame.
- :func:`snapshot` — collect a fixed window into a ``schema.cwop.v1`` DataFrame
(pass ``persist=True`` to also write it to the backtest cache).
- :func:`latest` — the most-recent observation for one station.

Persistence + backtest (v0.2):

- :func:`history` — replay persisted CWOP observations for a date range as a
``schema.cwop.v1`` DataFrame (``source="cwop.cache"``). The parity-safe access
path for ML strategies — CWOP is NEVER wired into ``research()`` / merge /
``live._sources``; a strategy joins this frame itself.
- :func:`persist_observations` — write a list of observations to the monthly
parquet cache (the collection seam for ``snapshot()`` / ``stream()`` output).

Data classes: :class:`CWOPStation`, :class:`CWOPObservation`.
Errors: :class:`NoCWOPDataError`.

Expand All @@ -29,17 +39,23 @@

from mostlyright.core.exceptions import NoCWOPDataError

from ._cache import persist_observations
from ._history import history
from ._scan import CWOPStation, nearby, scan
from ._schema import CWOPObservation
from ._schema import CWOP_CACHE_SOURCE, CWOP_SOURCE, CWOPObservation
from ._snapshot import latest, snapshot
from ._stream import stream

__all__ = [
"CWOP_CACHE_SOURCE",
"CWOP_SOURCE",
"CWOPObservation",
"CWOPStation",
"NoCWOPDataError",
"history",
"latest",
"nearby",
"persist_observations",
"scan",
"snapshot",
"stream",
Expand Down
Loading
Loading