From e406b514610d10307f8c7bd37d2fe1f430d9a521 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Sun, 14 Jun 2026 16:11:55 +0300 Subject: [PATCH 1/2] feat(lifecycle): student_public snapshot-safety [LTV-Pn.4c] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third sub-PR of the split LTV-Pn.4. Implements the snapshot-safe public export for the lifecycle scheme, so student_public bundles can no longer leak the pLTV / churn targets via the relational tables. New schemes/lifecycle/render/relational_snapshot_safe.py: - to_dataframes_snapshot_safe(dfs, cutoff): event tables (subscription_events / health_signals / invoices) row-filtered to timestamp <= observation_date; subscriptions drops its stateful/terminal columns; accounts/customers pass through. - design §5 named only the THREE terminal fields (churn_at / churn_reason / subscription_end_at), but the FOUR stateful columns (subscription_status, current_mrr, renewal_count, expansion_count) also hold end-of-sim values that leak the targets (status reveals churn; current_mrr reflects post-cutoff expansion; the counts reveal future renewals/expansions). The banned set (validation.leakage_probes.LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS) therefore extends the spec; only the at-signing identity is retained. write_bundle: drops the student_public guard and wires the projection for snapshot-safe modes; records relational_snapshot_safe + structural_redactions in the manifest. build_manifest: gains a pass-through structural_redactions param (the last lead-scoring coupling in the manifest builder — it previously hardcoded the lead-scoring banned constants). Default None keeps lead-scoring behaviour; lifecycle passes its own. Lead-scoring bundles verified byte-identical (both modes) and the v6 contract test still passes. CLAUDE.md: new lifecycle snapshot-safety hard-constraint clause. Public task safety: the per-task single-target splits + cutoff-bounded features from LTV-Pn.4b already satisfy it; the trap is retained in all modes. Tests (23 new): event tables <= cutoff; subscriptions column set; no target columns in any public relational table; no metadata/; manifest flags + structural_redactions; public-task single-target + trap; public determinism; instructor unaffected; snapshot-safe unit behaviour (passthrough, empty-cutoff reject, no input mutation). Obsolete Pn.4a/4b stub tests updated/removed. Full suite 1872 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Opus 4.8 --- .agent-plan.md | 11 +- CLAUDE.md | 1 + docs/ltv/roadmap.md | 24 ++- leadforge/render/manifests.py | 12 +- leadforge/schemes/lifecycle/__init__.py | 43 +++-- .../render/relational_snapshot_safe.py | 101 ++++++++++ leadforge/validation/leakage_probes.py | 33 ++++ tests/schemes/lifecycle/test_build_world.py | 6 - .../lifecycle/test_public_snapshot_safety.py | 172 ++++++++++++++++++ tests/schemes/lifecycle/test_write_bundle.py | 11 +- 10 files changed, 376 insertions(+), 38 deletions(-) create mode 100644 leadforge/schemes/lifecycle/render/relational_snapshot_safe.py create mode 100644 tests/schemes/lifecycle/test_public_snapshot_safety.py diff --git a/.agent-plan.md b/.agent-plan.md index 3b58f50..004fff1 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -86,10 +86,13 @@ motif sampling + population + sim + `LifecycleArtifacts`; lifecycle relational first on-disk lifecycle bundle: 6 relational tables + 8 task dirs (both regimes) + lifecycle dataset card + manifest extra_fields + hidden-truth metadata; difficulty_params threaded; student_public refused until 4c) opened -as **#126**. Next: `Pn.4c` (student_public snapshot-safety + CLAUDE.md + -recipe-driven difficulty resolution), `Pn.4d` (shared bundle orchestrator), -`LTV-Po` (recipe). Note: `validate_bundle` is lead-scoring-coupled — scheme- -aware validation is `LTV-Pp`. +as **#126** (merged). `LTV-Pn.4c` (student_public snapshot-safety — public +relational projection: event tables ≤ observation_date, subscriptions +stateful/terminal columns dropped; manifest flags; CLAUDE.md clause; +lead-scoring byte-identical) opened as **#127**. Next: `Pn.4d` (shared bundle +orchestrator), `LTV-Po` (recipe; also recipe-driven difficulty resolution). +Note: `validate_bundle` is lead-scoring-coupled — scheme-aware validation is +`LTV-Pp`. --- diff --git a/CLAUDE.md b/CLAUDE.md index d9b6464..519f3b5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -212,6 +212,7 @@ Key abstractions: `Recipe`, `GenerationConfig`, `WorldSpec`, `WorldBundle`, `Exp - Never use a single fixed hidden world (DGP must vary by motif family + rewiring). - Never leak post-snapshot-anchor data into flat task features. - **Never publish public relational tables that allow label reconstruction via joins.** Public relational exports must be snapshot-safe: every `*_timestamp` column in event tables (`touches.touch_timestamp`, `sessions.session_timestamp`, `sales_activities.activity_timestamp`) must satisfy `<= lead_created_at + snapshot_day`; `opportunities` must be filtered by `created_at <= lead_created_at + snapshot_day`; no terminal-state fields (`close_outcome`, `closed_at`, `converted_within_90_days`, `conversion_timestamp`) in public `leads`/`opportunities`; no conversion-conditional entities (`customers`, `subscriptions`) in public bundles. +- **(lifecycle / `b2b_saas_ltv_v1` scheme)** The public relational export is snapshot-safe against the absolute `observation_date` cutoff: every timestamp column in the public event tables (`subscription_events.event_timestamp`, `health_signals.period_start`, `invoices.invoice_date`) must satisfy `<= observation_date`; the public `subscriptions` table drops all stateful/terminal columns (`subscription_status`, `current_mrr`, `renewal_count`, `expansion_count`, `subscription_end_at`, `churn_at`, `churn_reason`), keeping only the at-signing identity (`subscription_id`, `customer_id`, `plan_name`, `subscription_start_at`, `contract_term_months`); no pLTV target (`ltv_revenue_*`) or churn label appears in any public relational table. Each task split carries only its own target (no cross-target leakage); the `mrr_change_full_period` trap is deliberately retained in all modes. - Never require external APIs for core generation. - Never publish hidden truth in `student_public` mode. - Never derive `converted_within_90_days` as a directly sampled label; it must emerge from simulated events. diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index bd65168..bad5518 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -46,7 +46,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | | `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) | -| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b) | +| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b), #127 (Pn.4c) | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | | `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | @@ -342,12 +342,22 @@ methods, then public-safety, then the carried orchestrator cleanup: coupled (applies lead-scoring FK/table/task checks) and errors on a lifecycle bundle; scheme-aware validation is `LTV-Pp`. - Labels: `type: feature`, `layer: api`, `layer: render` -- [ ] **`LTV-Pn.4c`** — `feat(lifecycle): student_public snapshot-safety`. - Public relational filtering (event tables ≤ cutoff; drop terminal - `churn_at`/`churn_reason`/`subscription_end_at`; no target columns); the - early-regime degenerate-column + dtype-preserving-missingness flags from - LTV-Pm. Extend `CLAUDE.md` hard constraints with the lifecycle - snapshot-safety clause + the `schemes/` layout. +- [x] **`LTV-Pn.4c`** — `feat(lifecycle): student_public snapshot-safety` + (**PR #127**). New `schemes/lifecycle/render/relational_snapshot_safe.py` + projects the public relational tables: event tables filtered to + `<= observation_date`; `subscriptions` drops its stateful/terminal columns. + **Note:** design §5 named only the three terminal fields, but the four + *stateful* columns (`subscription_status`/`current_mrr`/`renewal_count`/ + `expansion_count`) also hold end-of-sim values that leak the targets, so the + banned set (`leakage_probes.LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS`) extends + the spec. `write_bundle` drops the public guard and wires the projection; + manifest records `relational_snapshot_safe` + `structural_redactions` + (`build_manifest` gained a pass-through `structural_redactions` param — the + last lead-scoring coupling in the manifest builder; lead-scoring byte- + identical). `CLAUDE.md` gains the lifecycle snapshot-safety clause. The + per-task single-target splits + cutoff-bounded features (LTV-Pn.4b) already + satisfy public task safety; the early-regime degenerate-column flags are + documented (LTV-Pm). - Labels: `type: feature`, `layer: exposure`, `layer: render`, `layer: docs` - [ ] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator`. With both schemes' `write_bundle` in hand, lift the shared orchestrator (mkdir → diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index fffa4f8..859182a 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -80,6 +80,7 @@ def build_manifest( relational_snapshot_safe: bool = False, motif_family: str | None = None, extra_fields: dict[str, Any] | None = None, + structural_redactions: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build the bundle manifest dict. @@ -116,6 +117,11 @@ def build_manifest( extra_fields: Optional scheme-specific top-level manifest keys merged into the result (e.g. the lifecycle scheme's ``observation_date`` and forward windows). Must not collide with a core manifest key. + structural_redactions: Optional scheme-supplied table-level redaction + record (``{"columns": {...}, "omitted_tables": [...]}``). When + ``None`` the lead-scoring default is computed from the + snapshot-safe flag (back-compat); schemes with a different public + relational shape (e.g. lifecycle) pass their own. Returns: A JSON-serialisable dict ready to be written as ``manifest.json``. @@ -164,7 +170,11 @@ def build_manifest( "motif_family": motif_family, "redacted_columns": redacted_columns_list, "relational_snapshot_safe": bool(relational_snapshot_safe), - "structural_redactions": _build_structural_redactions(bool(relational_snapshot_safe)), + "structural_redactions": ( + structural_redactions + if structural_redactions is not None + else _build_structural_redactions(bool(relational_snapshot_safe)) + ), "tables": tables, "tasks": tasks, } diff --git a/leadforge/schemes/lifecycle/__init__.py b/leadforge/schemes/lifecycle/__init__.py index 5a60b8a..1bb5393 100644 --- a/leadforge/schemes/lifecycle/__init__.py +++ b/leadforge/schemes/lifecycle/__init__.py @@ -122,28 +122,30 @@ def write_bundle( path: str, generation_timestamp: str | None = None, ) -> None: - """Serialise a lifecycle *bundle* to *path* (instructor mode). + """Serialise a lifecycle *bundle* to *path*. Writes the six relational tables, both observation regimes' snapshots split into 8 task directories (3 pLTV regression + 1 churn classification per regime, the early regime prefixed ``early_``), a dataset card, the feature dictionary, the hidden-truth ``metadata/`` - (via :meth:`write_metadata`), and the manifest (recording - ``generation_scheme`` + ``observation_date`` + the forward windows). + (instructor only, via :meth:`write_metadata`), and the manifest + (``generation_scheme`` + ``observation_date`` + forward windows). ``config.difficulty_params`` is threaded into both snapshot builders — when set (LTV-Po resolves it from the recipe profile), it drives the snapshot distortions. - Only ``research_instructor`` mode is supported here. The - ``student_public`` snapshot-safety projection (event-table cutoff - filtering, terminal-column drops, per-task target projection) lands in - LTV-Pn.4c; until then this refuses to write a public bundle rather than - emit one that is not snapshot-safe. + ``student_public`` bundles are projected snapshot-safe: the relational + event tables are filtered to ``<= observation_date`` and the + ``subscriptions`` table's stateful/terminal columns are dropped (see + :mod:`leadforge.schemes.lifecycle.render.relational_snapshot_safe`); no + ``metadata/`` is written; and the manifest records + ``relational_snapshot_safe`` + ``structural_redactions``. The per-task + splits are single-target and cutoff-bounded by construction. """ from pathlib import Path - from leadforge.core.enums import ExposureMode + from leadforge.exposure.filters import get_filter from leadforge.exposure.modes import apply_exposure from leadforge.render.manifests import build_manifest, write_manifest from leadforge.render.relational_io import write_relational_tables @@ -153,6 +155,10 @@ def write_bundle( from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES from leadforge.schemes.lifecycle.render.dataset_card import render_lifecycle_dataset_card from leadforge.schemes.lifecycle.render.relational import to_dataframes + from leadforge.schemes.lifecycle.render.relational_snapshot_safe import ( + LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS, + to_dataframes_snapshot_safe, + ) from leadforge.schemes.lifecycle.snapshots import ( FORWARD_WINDOWS_DAYS, build_customer_snapshot, @@ -171,12 +177,7 @@ def write_bundle( "Call Generator.generate() / build_world() first." ) config = bundle.spec.config - if config.exposure_mode is not ExposureMode.research_instructor: - raise NotImplementedError( - f"lifecycle write_bundle currently supports only " - f"research_instructor; {config.exposure_mode.value!r} (snapshot-safe " - "public export) lands in LTV-Pn.4c" - ) + bundle_filter = get_filter(config.exposure_mode) population = artifacts.population sim = artifacts.simulation_result @@ -184,7 +185,17 @@ def write_bundle( root.mkdir(parents=True, exist_ok=True) # 1. Relational tables → tables/ + # student_public is projected snapshot-safe (event tables filtered to + # <= observation_date; subscriptions' stateful/terminal columns + # dropped). research_instructor keeps the full-horizon shape. dfs = to_dataframes(sim, population) + structural_redactions: dict[str, object] | None = None + if bundle_filter.relational_snapshot_safe: + dfs = to_dataframes_snapshot_safe(dfs, cutoff=population.observation_date) + structural_redactions = { + "columns": {"subscriptions": sorted(LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS)}, + "omitted_tables": [], + } table_row_counts = write_relational_tables(dfs, root / "tables") # 2. Both regime snapshots → 8 task directories. @@ -250,6 +261,8 @@ def write_bundle( "forward_windows_days": list(FORWARD_WINDOWS_DAYS), "early_tenure_weeks": config.early_tenure_weeks, }, + relational_snapshot_safe=bundle_filter.relational_snapshot_safe, + structural_redactions=structural_redactions, ) write_manifest(manifest, root) diff --git a/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py b/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py new file mode 100644 index 0000000..b3e44c2 --- /dev/null +++ b/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py @@ -0,0 +1,101 @@ +"""Snapshot-safe relational export for ``student_public`` lifecycle bundles. + +:func:`to_dataframes_snapshot_safe` projects the full-horizon dict from +:func:`leadforge.schemes.lifecycle.render.relational.to_dataframes` onto the +shape published in public bundles, enforcing the design.md §5 contract against +the absolute calendar ``cutoff`` (the world ``observation_date``): + +* event tables (``subscription_events`` / ``health_signals`` / ``invoices``) + are row-filtered to ``timestamp <= cutoff`` — no post-cutoff events; +* ``subscriptions`` drops its stateful/terminal columns + (:data:`LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS`), keeping only the at-signing + identity (plan, term, start) — current MRR / status / counts / churn fields + all hold end-of-simulation values that leak the pLTV / churn targets; +* ``accounts`` / ``customers`` pass through (firmographic / at-signing, no + post-cutoff state). + +The public **task** parquets are already snapshot-safe by construction (their +features are computed at/before the cutoff and each carries only its own +target); this module only governs the relational ``tables/``. + +Caveat: the cutoff is the calendar regime's ``observation_date``. The +early-pLTV (tenure-anchored) task's snapshot-safe data is its own task +parquet; relational-table feature engineering aligns with the calendar regime. + +``research_instructor`` keeps the full-horizon +:func:`~leadforge.schemes.lifecycle.render.relational.to_dataframes`. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from leadforge.validation.leakage_probes import ( + LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS, + LIFECYCLE_SNAPSHOT_FILTERED_TABLES, +) + +if TYPE_CHECKING: + from collections.abc import Mapping + + import pandas as pd + +__all__ = [ + "LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS", + "LIFECYCLE_SNAPSHOT_FILTERED_TABLES", + "to_dataframes_snapshot_safe", +] + +# Canonical output order (parity with the full-horizon to_dataframes). +_OUTPUT_ORDER = ( + "accounts", + "customers", + "subscriptions", + "subscription_events", + "health_signals", + "invoices", +) + + +def to_dataframes_snapshot_safe( + dfs: Mapping[str, pd.DataFrame], + *, + cutoff: str, +) -> dict[str, pd.DataFrame]: + """Project the full-horizon lifecycle relational dict to its public shape. + + Args: + dfs: Output of + :func:`leadforge.schemes.lifecycle.render.relational.to_dataframes`. + Input frames are never mutated. + cutoff: Absolute ISO date (the world ``observation_date``); event rows + with a timestamp strictly after it are dropped. + + Returns: + A new dict in canonical order. ``subscriptions`` has its + stateful/terminal columns removed; the event tables are row-filtered to + ``<= cutoff``; ``accounts`` / ``customers`` pass through. + + Raises: + ValueError: if *cutoff* is empty. + """ + if not cutoff: + raise ValueError("cutoff (observation_date) must be a non-empty ISO date string") + + filtered_tables = dict(LIFECYCLE_SNAPSHOT_FILTERED_TABLES) + banned = set(LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS) + + out: dict[str, pd.DataFrame] = {} + for name in _OUTPUT_ORDER: + if name not in dfs: + continue + df = dfs[name] + if name == "subscriptions": + out[name] = df.drop(columns=[c for c in banned if c in df.columns]) + elif name in filtered_tables: + ts_col = filtered_tables[name] + # ISO date strings compare correctly lexicographically. + out[name] = df[df[ts_col] <= cutoff].reset_index(drop=True) + else: + out[name] = df + return out diff --git a/leadforge/validation/leakage_probes.py b/leadforge/validation/leakage_probes.py index eaed773..8316b13 100644 --- a/leadforge/validation/leakage_probes.py +++ b/leadforge/validation/leakage_probes.py @@ -103,6 +103,37 @@ ("opportunities", "created_at"), ) + +# --------------------------------------------------------------------------- +# Lifecycle (pLTV) scheme snapshot-safety contract +# --------------------------------------------------------------------------- + +#: Columns dropped from the public ``subscriptions`` table. design.md §5 names +#: the three terminal fields (``churn_at`` / ``churn_reason`` / +#: ``subscription_end_at``); the four *stateful* columns below also hold +#: end-of-simulation values that leak the pLTV / churn targets (current MRR +#: reflects post-cutoff expansion; status reveals churn; the counts reveal +#: future renewals/expansions), so they are dropped too. The at-signing +#: identity columns (subscription_id, customer_id, plan_name, +#: subscription_start_at, contract_term_months) are retained. +LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS: Final[tuple[str, ...]] = ( + "churn_at", + "churn_reason", + "current_mrr", + "expansion_count", + "renewal_count", + "subscription_end_at", + "subscription_status", +) + +#: Lifecycle event tables filtered to ``<= observation_date`` (the public +#: calendar-anchored cutoff): every timestamp must be at or before the cutoff. +LIFECYCLE_SNAPSHOT_FILTERED_TABLES: Final[tuple[tuple[str, str], ...]] = ( + ("subscription_events", "event_timestamp"), + ("health_signals", "period_start"), + ("invoices", "invoice_date"), +) + #: Channel labels carried on :class:`LeakageFinding.channel`. Constants #: rather than an enum because findings serialise straight to JSON. CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" @@ -1481,6 +1512,8 @@ def _import_sklearn() -> _SklearnHandles | None: "PROBE_REGISTRY", "ProbeSpec", "RelationalLeakageError", + "LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS", + "LIFECYCLE_SNAPSHOT_FILTERED_TABLES", "SNAPSHOT_FILTERED_TABLES", "deterministic_relational_reconstruction", "probe_banned_columns", diff --git a/tests/schemes/lifecycle/test_build_world.py b/tests/schemes/lifecycle/test_build_world.py index b443b83..5692a95 100644 --- a/tests/schemes/lifecycle/test_build_world.py +++ b/tests/schemes/lifecycle/test_build_world.py @@ -156,9 +156,3 @@ def test_empty_population_yields_typed_empty_tables() -> None: for name, df in dfs.items(): assert list(df.columns), f"{name} has no columns" - -def test_write_bundle_still_stubbed(tmp_path) -> None: - # build_world works; the on-disk write path lands in Pn.4b. - bundle = _build() - with pytest.raises(NotImplementedError): - get_scheme("lifecycle").write_bundle(bundle, str(tmp_path)) diff --git a/tests/schemes/lifecycle/test_public_snapshot_safety.py b/tests/schemes/lifecycle/test_public_snapshot_safety.py new file mode 100644 index 0000000..7337c31 --- /dev/null +++ b/tests/schemes/lifecycle/test_public_snapshot_safety.py @@ -0,0 +1,172 @@ +"""student_public snapshot-safety for the lifecycle scheme (LTV-Pn.4c).""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.core.models import GenerationConfig +from leadforge.schemes import get_scheme +from leadforge.schemes.lifecycle.render.relational import to_dataframes +from leadforge.schemes.lifecycle.render.relational_snapshot_safe import ( + LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS, + to_dataframes_snapshot_safe, +) + +_TS = "2026-01-01T00:00:00+00:00" +_EVENT_TS = { + "subscription_events": "event_timestamp", + "health_signals": "period_start", + "invoices": "invoice_date", +} +_AT_SIGNING_SUB_COLS = { + "subscription_id", + "customer_id", + "plan_name", + "subscription_start_at", + "contract_term_months", +} + + +def _public_bundle(tmp_path: Path, *, n_customers: int = 150) -> tuple[Path, str]: + scheme = get_scheme("lifecycle") + cfg = GenerationConfig( + seed=42, + n_customers=n_customers, + recipe_id="b2b_saas_ltv_v1", + exposure_mode="student_public", + ) + bundle = scheme.build_world(cfg, narrative=None) + out = tmp_path / "bundle" + scheme.write_bundle(bundle, str(out), generation_timestamp=_TS) + obs = json.loads((out / "manifest.json").read_text())["observation_date"] + return out, obs + + +# --------------------------------------------------------------------------- +# Relational snapshot-safety (the published tables) +# --------------------------------------------------------------------------- + + +def test_event_tables_filtered_to_observation_date(tmp_path) -> None: + out, obs = _public_bundle(tmp_path) + for table, ts_col in _EVENT_TS.items(): + df = pd.read_parquet(out / "tables" / f"{table}.parquet") + assert (df[ts_col] <= obs).all(), f"{table} has rows after {obs}" + + +def test_subscriptions_drops_stateful_columns(tmp_path) -> None: + out, _ = _public_bundle(tmp_path) + cols = set(pd.read_parquet(out / "tables" / "subscriptions.parquet").columns) + assert cols == _AT_SIGNING_SUB_COLS + assert not (cols & set(LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS)) + + +def test_no_target_columns_in_any_public_relational_table(tmp_path) -> None: + out, _ = _public_bundle(tmp_path) + banned_targets = { + "ltv_revenue_90d", + "ltv_revenue_365d", + "ltv_revenue_730d", + "churned_within_180d", + } + for parquet in (out / "tables").glob("*.parquet"): + cols = set(pd.read_parquet(parquet).columns) + assert not (cols & banned_targets), f"{parquet.name} leaks a target column" + + +def test_no_metadata_dir_in_public(tmp_path) -> None: + out, _ = _public_bundle(tmp_path) + assert not (out / "metadata").exists() + + +def test_manifest_records_snapshot_safe_and_redactions(tmp_path) -> None: + out, _ = _public_bundle(tmp_path) + m = json.loads((out / "manifest.json").read_text()) + assert m["relational_snapshot_safe"] is True + assert m["structural_redactions"] == { + "columns": {"subscriptions": sorted(LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS)}, + "omitted_tables": [], + } + + +def test_public_tasks_still_single_target_and_keep_trap(tmp_path) -> None: + out, _ = _public_bundle(tmp_path) + targets = {"ltv_revenue_90d", "ltv_revenue_365d", "ltv_revenue_730d", "churned_within_180d"} + for td in (out / "tasks").iterdir(): + manifest = json.loads((td / "task_manifest.json").read_text()) + df = pd.read_parquet(td / "train.parquet") + assert targets & set(df.columns) == {manifest["label_column"]} + assert "mrr_change_full_period" in df.columns # deliberate trap, all modes + + +def test_public_bundle_deterministic(tmp_path) -> None: + import hashlib + + def hashes(root: Path) -> dict[str, str]: + return { + str(p.relative_to(root)): hashlib.sha256(p.read_bytes()).hexdigest() + for p in sorted(root.rglob("*")) + if p.is_file() + } + + a, _ = _public_bundle(tmp_path / "a") + b, _ = _public_bundle(tmp_path / "b") + assert hashes(a) == hashes(b) + + +# --------------------------------------------------------------------------- +# Instructor mode is unaffected +# --------------------------------------------------------------------------- + + +def test_instructor_keeps_full_subscriptions_and_metadata(tmp_path) -> None: + scheme = get_scheme("lifecycle") + cfg = GenerationConfig( + seed=42, n_customers=80, recipe_id="b2b_saas_ltv_v1", exposure_mode="research_instructor" + ) + out = tmp_path / "inst" + scheme.write_bundle(scheme.build_world(cfg, narrative=None), str(out), generation_timestamp=_TS) + subs = set(pd.read_parquet(out / "tables" / "subscriptions.parquet").columns) + assert {"churn_at", "current_mrr", "subscription_status"} <= subs + assert (out / "metadata").is_dir() + m = json.loads((out / "manifest.json").read_text()) + assert m["relational_snapshot_safe"] is False + + +# --------------------------------------------------------------------------- +# to_dataframes_snapshot_safe unit behaviour +# --------------------------------------------------------------------------- + + +def test_snapshot_safe_passes_through_accounts_and_customers(tmp_path) -> None: + scheme = get_scheme("lifecycle") + cfg = GenerationConfig(seed=1, n_customers=60, recipe_id="b2b_saas_ltv_v1") + arts = scheme.build_world(cfg, narrative=None).artifacts + full = to_dataframes(arts.simulation_result, arts.population) + safe = to_dataframes_snapshot_safe(full, cutoff=arts.population.observation_date) + for name in ("accounts", "customers"): + assert full[name].equals(safe[name]) + + +def test_snapshot_safe_rejects_empty_cutoff(tmp_path) -> None: + scheme = get_scheme("lifecycle") + cfg = GenerationConfig(seed=1, n_customers=30, recipe_id="b2b_saas_ltv_v1") + arts = scheme.build_world(cfg, narrative=None).artifacts + full = to_dataframes(arts.simulation_result, arts.population) + with pytest.raises(ValueError, match="cutoff"): + to_dataframes_snapshot_safe(full, cutoff="") + + +def test_snapshot_safe_does_not_mutate_input(tmp_path) -> None: + scheme = get_scheme("lifecycle") + cfg = GenerationConfig(seed=1, n_customers=60, recipe_id="b2b_saas_ltv_v1") + arts = scheme.build_world(cfg, narrative=None).artifacts + full = to_dataframes(arts.simulation_result, arts.population) + before = {k: len(v) for k, v in full.items()} + to_dataframes_snapshot_safe(full, cutoff=arts.population.observation_date) + assert {k: len(v) for k, v in full.items()} == before + assert "churn_at" in full["subscriptions"].columns # original untouched diff --git a/tests/schemes/lifecycle/test_write_bundle.py b/tests/schemes/lifecycle/test_write_bundle.py index 73de03b..52e2d9f 100644 --- a/tests/schemes/lifecycle/test_write_bundle.py +++ b/tests/schemes/lifecycle/test_write_bundle.py @@ -226,11 +226,12 @@ def test_difficulty_params_thread_into_snapshots(tmp_path) -> None: # --------------------------------------------------------------------------- -def test_student_public_refused_until_pn4c(tmp_path) -> None: - scheme = get_scheme("lifecycle") - bundle = scheme.build_world(_config(exposure_mode="student_public"), narrative=None) - with pytest.raises(NotImplementedError, match="LTV-Pn.4c"): - scheme.write_bundle(bundle, str(tmp_path / "public"), generation_timestamp=_TS) +def test_student_public_writes_a_bundle(tmp_path) -> None: + # Public mode is implemented in LTV-Pn.4c (snapshot-safe); it must produce a + # bundle (full safety is asserted in test_public_snapshot_safety.py). + out = _write(tmp_path, config=_config(exposure_mode="student_public")) + assert (out / "manifest.json").is_file() + assert not (out / "metadata").exists() # no hidden truth in public def test_unpopulated_bundle_refused(tmp_path) -> None: From 3fbced2c677a90d76df020896e5009da1c642d03 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 15 Jun 2026 13:50:03 +0300 Subject: [PATCH 2/2] =?UTF-8?q?fix(lifecycle):=20omit=20early-pLTV=20from?= =?UTF-8?q?=20public=20=E2=80=94=20relational=20leak=20[LTV-Pn.4c]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review found a severe leak in the public export: the early-pLTV (tenure-anchored) task target is reconstructible from the public relational tables. Verified empirically — early ltv_revenue_90d was reconstructed EXACTLY for 52/60 customers by joining the public invoices (<= observation_date) onto customer_start_at and summing the (early_cutoff, early_cutoff+90d] window. Root cause: the public relational tables are cut at observation_date (calendar regime), but the early regime's forward window precedes observation_date — the invoices between the early cutoff and observation_date ARE the early target window. A single observation_date-anchored relational export cannot serve both regimes. Fix: omit the early-pLTV task family from snapshot-safe (student_public) bundles; it ships in research_instructor (full truth) only. The calendar family is published — its targets fall after observation_date and so are absent from the public relational tables (verified by a reconstruction probe: 0 calendar targets recoverable). Public bundles now carry the 4 calendar task dirs; instructor keeps all 8. Docs: relational_snapshot_safe docstring + CLAUDE.md clause now state the early-omitted-in-public rule and its reason; roadmap records the design decision and the tension with D8 (first-class early-pLTV), flagged for LTV-Po / a design-doc update if public early-pLTV is wanted (would need per-regime relational exports). Tests: test_public_omits_early_pltv_family; a reconstruction-probe leakage test (published calendar target NOT recoverable from public relational); instructor-keeps-both-regimes (8 tasks). Full suite 1874 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Opus 4.8 --- CLAUDE.md | 2 +- docs/ltv/roadmap.md | 9 ++++ leadforge/schemes/lifecycle/__init__.py | 19 ++++++-- .../render/relational_snapshot_safe.py | 11 +++-- tests/schemes/lifecycle/test_build_world.py | 1 - .../lifecycle/test_public_snapshot_safety.py | 48 +++++++++++++++++++ 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 519f3b5..a83d955 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -212,7 +212,7 @@ Key abstractions: `Recipe`, `GenerationConfig`, `WorldSpec`, `WorldBundle`, `Exp - Never use a single fixed hidden world (DGP must vary by motif family + rewiring). - Never leak post-snapshot-anchor data into flat task features. - **Never publish public relational tables that allow label reconstruction via joins.** Public relational exports must be snapshot-safe: every `*_timestamp` column in event tables (`touches.touch_timestamp`, `sessions.session_timestamp`, `sales_activities.activity_timestamp`) must satisfy `<= lead_created_at + snapshot_day`; `opportunities` must be filtered by `created_at <= lead_created_at + snapshot_day`; no terminal-state fields (`close_outcome`, `closed_at`, `converted_within_90_days`, `conversion_timestamp`) in public `leads`/`opportunities`; no conversion-conditional entities (`customers`, `subscriptions`) in public bundles. -- **(lifecycle / `b2b_saas_ltv_v1` scheme)** The public relational export is snapshot-safe against the absolute `observation_date` cutoff: every timestamp column in the public event tables (`subscription_events.event_timestamp`, `health_signals.period_start`, `invoices.invoice_date`) must satisfy `<= observation_date`; the public `subscriptions` table drops all stateful/terminal columns (`subscription_status`, `current_mrr`, `renewal_count`, `expansion_count`, `subscription_end_at`, `churn_at`, `churn_reason`), keeping only the at-signing identity (`subscription_id`, `customer_id`, `plan_name`, `subscription_start_at`, `contract_term_months`); no pLTV target (`ltv_revenue_*`) or churn label appears in any public relational table. Each task split carries only its own target (no cross-target leakage); the `mrr_change_full_period` trap is deliberately retained in all modes. +- **(lifecycle / `b2b_saas_ltv_v1` scheme)** The public relational export is snapshot-safe against the absolute `observation_date` cutoff: every timestamp column in the public event tables (`subscription_events.event_timestamp`, `health_signals.period_start`, `invoices.invoice_date`) must satisfy `<= observation_date`; the public `subscriptions` table drops all stateful/terminal columns (`subscription_status`, `current_mrr`, `renewal_count`, `expansion_count`, `subscription_end_at`, `churn_at`, `churn_reason`), keeping only the at-signing identity (`subscription_id`, `customer_id`, `plan_name`, `subscription_start_at`, `contract_term_months`); no pLTV target (`ltv_revenue_*`) or churn label appears in any public relational table. Each task split carries only its own target (no cross-target leakage); the `mrr_change_full_period` trap is deliberately retained in all modes. The early-pLTV (tenure-anchored) task family is **omitted from `student_public` bundles** — its forward window precedes `observation_date`, so its targets would be reconstructible by joining the public event tables; it ships in `research_instructor` only. The calendar-anchored family is published (its targets fall after `observation_date`). - Never require external APIs for core generation. - Never publish hidden truth in `student_public` mode. - Never derive `converted_within_90_days` as a directly sampled label; it must emerge from simulated events. diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index bad5518..4307904 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -358,6 +358,15 @@ methods, then public-safety, then the carried orchestrator cleanup: per-task single-target splits + cutoff-bounded features (LTV-Pn.4b) already satisfy public task safety; the early-regime degenerate-column flags are documented (LTV-Pm). + - **Design decision (self-review):** the early-pLTV (tenure-anchored) task + family is **omitted from `student_public` bundles** — its forward window + precedes `observation_date`, so its target is exactly reconstructible by + joining the public event tables (verified: 52/60 customers). One + `observation_date`-anchored relational export cannot serve both regimes, so + the early family is instructor-only for now. Revisit if public early-pLTV + is wanted (would need per-regime relational exports or a relational-free + public early task) — flag for `LTV-Po`/design-doc update; tension noted + against D8's "first-class early-pLTV". - Labels: `type: feature`, `layer: exposure`, `layer: render`, `layer: docs` - [ ] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator`. With both schemes' `write_bundle` in hand, lift the shared orchestrator (mkdir → diff --git a/leadforge/schemes/lifecycle/__init__.py b/leadforge/schemes/lifecycle/__init__.py index 1bb5393..31471bb 100644 --- a/leadforge/schemes/lifecycle/__init__.py +++ b/leadforge/schemes/lifecycle/__init__.py @@ -198,20 +198,31 @@ def write_bundle( } table_row_counts = write_relational_tables(dfs, root / "tables") - # 2. Both regime snapshots → 8 task directories. + # 2. Regime snapshots → task directories. # difficulty_params (None until LTV-Po resolves it) drives distortions. + # + # The early-pLTV (tenure-anchored) family is OMITTED from snapshot-safe + # public bundles: its forward window (start + early_tenure_weeks + Nd) + # precedes the relational cutoff (observation_date), so its targets are + # reconstructible by joining the public event tables (invoices between + # the early cutoff and observation_date *are* the early target window). + # One observation_date-anchored relational export cannot serve both + # regimes; the early family stays instructor-only. The calendar family + # is safe (its targets fall after observation_date, absent from the + # public relational tables). snapshots = { CALENDAR_REGIME: build_customer_snapshot( population, sim, difficulty_params=config.difficulty_params, seed=config.seed ), - EARLY_REGIME: build_early_pltv_snapshot( + } + if not bundle_filter.relational_snapshot_safe: + snapshots[EARLY_REGIME] = build_early_pltv_snapshot( population, sim, early_tenure_weeks=config.early_tenure_weeks, difficulty_params=config.difficulty_params, seed=config.seed, - ), - } + ) # Each task is a standalone single-target split: drop every OTHER # target column so a task's parquet cannot leak the answer's siblings # (e.g. ltv_revenue_730d ⊇ ltv_revenue_90d). The deliberate diff --git a/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py b/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py index b3e44c2..9ddc46a 100644 --- a/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py +++ b/leadforge/schemes/lifecycle/render/relational_snapshot_safe.py @@ -18,9 +18,14 @@ features are computed at/before the cutoff and each carries only its own target); this module only governs the relational ``tables/``. -Caveat: the cutoff is the calendar regime's ``observation_date``. The -early-pLTV (tenure-anchored) task's snapshot-safe data is its own task -parquet; relational-table feature engineering aligns with the calendar regime. +The cutoff is the calendar regime's ``observation_date``. The early-pLTV +(tenure-anchored) task family is therefore **omitted from public bundles** +(``LifecycleScheme.write_bundle``): its forward window precedes +``observation_date``, so its targets would be reconstructible by joining the +public event tables (the invoices between the early cutoff and +``observation_date`` *are* the early target window). A single +``observation_date``-anchored relational export cannot serve both regimes; the +early family stays instructor-only. ``research_instructor`` keeps the full-horizon :func:`~leadforge.schemes.lifecycle.render.relational.to_dataframes`. diff --git a/tests/schemes/lifecycle/test_build_world.py b/tests/schemes/lifecycle/test_build_world.py index 5692a95..db0cd06 100644 --- a/tests/schemes/lifecycle/test_build_world.py +++ b/tests/schemes/lifecycle/test_build_world.py @@ -155,4 +155,3 @@ def test_empty_population_yields_typed_empty_tables() -> None: assert len(dfs["customers"]) == 1 for name, df in dfs.items(): assert list(df.columns), f"{name} has no columns" - diff --git a/tests/schemes/lifecycle/test_public_snapshot_safety.py b/tests/schemes/lifecycle/test_public_snapshot_safety.py index 7337c31..7c9d2fe 100644 --- a/tests/schemes/lifecycle/test_public_snapshot_safety.py +++ b/tests/schemes/lifecycle/test_public_snapshot_safety.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +from datetime import date, timedelta from pathlib import Path import pandas as pd @@ -118,6 +119,49 @@ def hashes(root: Path) -> dict[str, str]: assert hashes(a) == hashes(b) +# --------------------------------------------------------------------------- +# Early-pLTV family is omitted from public (its target is relational-reconstructible) +# --------------------------------------------------------------------------- + + +def test_public_omits_early_pltv_family(tmp_path) -> None: + """The tenure-anchored early-pLTV tasks are NOT published in student_public: + their forward window precedes observation_date, so the public event tables + (<= observation_date) would let a join reconstruct the target.""" + out, _ = _public_bundle(tmp_path) + task_dirs = {p.name for p in (out / "tasks").iterdir() if p.is_dir()} + assert not any(t.startswith("early_") for t in task_dirs), task_dirs + assert task_dirs == { + "pltv_revenue_90d", + "pltv_revenue_365d", + "pltv_revenue_730d", + "churned_within_180d", + } + + +def test_published_calendar_target_not_reconstructible_from_public_relational(tmp_path) -> None: + """Leakage probe: the published pltv_revenue_90d target (revenue AFTER + observation_date) cannot be recovered from the public invoices table (which + is filtered to <= observation_date). Customers with a nonzero target must + reconstruct to a strictly smaller value (typically 0).""" + out, obs = _public_bundle(tmp_path, n_customers=200) + invoices = pd.read_parquet(out / "tables" / "invoices.parquet") + paid = invoices[invoices.payment_status.isin(["paid", "recovered"])] + bound = (date.fromisoformat(obs) + timedelta(days=90)).isoformat() + task = pd.read_parquet(out / "tasks" / "pltv_revenue_90d" / "train.parquet") + + nonzero = task[task["ltv_revenue_90d"] > 0] + assert len(nonzero) > 0, "fixture should have customers with forward revenue" + leaked = 0 + for _, row in nonzero.head(60).iterrows(): + inv = paid[paid.customer_id == row["customer_id"]] + # Everything reconstructible from public relational is <= obs < window. + recon = float(inv[(inv.invoice_date > obs) & (inv.invoice_date <= bound)].amount_usd.sum()) + if abs(recon - float(row["ltv_revenue_90d"])) < 1e-6: + leaked += 1 + assert leaked == 0, f"{leaked} calendar targets reconstructible from public relational" + + # --------------------------------------------------------------------------- # Instructor mode is unaffected # --------------------------------------------------------------------------- @@ -135,6 +179,10 @@ def test_instructor_keeps_full_subscriptions_and_metadata(tmp_path) -> None: assert (out / "metadata").is_dir() m = json.loads((out / "manifest.json").read_text()) assert m["relational_snapshot_safe"] is False + # Instructor keeps BOTH regimes — the early family is full-truth here. + task_dirs = {p.name for p in (out / "tasks").iterdir() if p.is_dir()} + assert sum(t.startswith("early_") for t in task_dirs) == 4 + assert len(task_dirs) == 8 # ---------------------------------------------------------------------------