From 49e0dba9cc306035598d44f098e9fef40ccb8be7 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 13:26:03 +0300 Subject: [PATCH 1/3] feat(lifecycle): calendar-anchored customer snapshot [LTV-Pl] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Build the pLTV snapshot table: one row per customer active at the observation-date cutoff, flattened from the weekly lifecycle simulation. - schemes/lifecycle/features.py — CUSTOMER_SNAPSHOT_FEATURES catalog: firmographics, at-cutoff subscription state, last-12-week health aggregates, financial signals, the mrr_change_full_period leakage trap (leakage_risk=True, never redacted), and four targets: ltv_revenue_{90,365,730}d (Float64, gross revenue D7) + churned_within_180d (secondary / ZILN zero-inflation indicator D9). Discharges the feature-catalog half of LTV-Pc; the regression task specs move to LTV-Pn with the task-split writer. Deliberately omitted: current_plan (no plan-change mechanism — exact duplicate of initial_plan) and downgrade_count (no downgrade mechanism — zero-variance); re-add only together with the mechanism. - schemes/lifecycle/snapshots.py — build_customer_snapshot(cutoff=…): at-cutoff MRR/renewal/expansion state reconstructed from the event chain (the subscription row holds terminal state and would leak); health *_l12w window (cutoff-12w, cutoff] with OLS active-user trend; last_nps_score looks over the whole history (quarterly cadence would alias a 12-week window); ltv_revenue_* sums paid+recovered invoices attributed by issuance date in (cutoff, cutoff+window]; cutoff > observation_date rejected (forward windows would be silently censored). weeks_to_next_renewal uses the same anniversary boundary as is_renewal_week, so the feature and the hazard spike agree. - render/distortions.py — difficulty distortions extracted verbatim from the lead-scoring snapshot builder into a scheme-agnostic apply_difficulty_distortions(feature_specs=…, exempt_cols=…); lead-scoring delegates with its catalog + total_touches_all exemption. Verified BYTE-IDENTICAL on a distorted lead-scoring snapshot (sha256 196bc45f… before and after). The lifecycle trap mrr_change_full_period is distortion-exempt for the same pedagogical reason as total_touches_all. Tests (39 new): censoring leakage probe (every feature column identical after deleting all post-cutoff events); target derivation against the invoice table incl. failed/written-off exclusion; ZILN target shape (right-skew + heavy tail); trap-divergence invariant (>10% of rows); trap and targets exempt from distortion; renewal-boundary agreement; catalog invariants. Full suite 1754 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Fable 5 --- .agent-plan.md | 9 +- docs/ltv/roadmap.md | 42 +- leadforge/render/distortions.py | 147 +++++++ .../schemes/lead_scoring/render/snapshots.py | 94 +---- leadforge/schemes/lifecycle/features.py | 260 +++++++++++++ leadforge/schemes/lifecycle/snapshots.py | 339 ++++++++++++++++ tests/schemes/lifecycle/test_features.py | 79 ++++ tests/schemes/lifecycle/test_snapshots.py | 366 ++++++++++++++++++ 8 files changed, 1244 insertions(+), 92 deletions(-) create mode 100644 leadforge/render/distortions.py create mode 100644 leadforge/schemes/lifecycle/features.py create mode 100644 leadforge/schemes/lifecycle/snapshots.py create mode 100644 tests/schemes/lifecycle/test_features.py create mode 100644 tests/schemes/lifecycle/test_snapshots.py diff --git a/.agent-plan.md b/.agent-plan.md index 4ed4f76..7abbe63 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -59,8 +59,13 @@ engine — simulate_lifecycle() with per-customer RNG substreams, weekly health/monthly invoice cadences, dunning write-off churn, renewal events, expansion MRR chains; mechanisms.py base rates ENGINE-CALIBRATED to per-motif year-1 churn targets, discharging the #117 calibration obligation; 25 tests) -opened as **#118** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` — -calendar-anchored customer snapshot + pLTV targets). +merged (#118) — **LTV-M4 complete**. **LTV-M5**: `LTV-Pl` +(calendar-anchored customer snapshot — `build_customer_snapshot()` + +`CUSTOMER_SNAPSHOT_FEATURES` with the three `ltv_revenue_{90,365,730}d` +targets, `churned_within_180d`, and the `mrr_change_full_period` trap; +difficulty distortions extracted to scheme-agnostic `render/distortions.py`, +lead-scoring byte-identical; 39 tests) opened as **#119**. Next: `LTV-Pm` +(early-pLTV tenure-anchored task family). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index ef4262e..d7c4dde 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -45,7 +45,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) | | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | | `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | -| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | | +| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl) | | `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | | `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | @@ -72,11 +72,16 @@ Total: ~19 PRs across 9 milestones. Lead-scoring catalog untouched. (These rows relocate into `schemes/lifecycle/` during `LTV-M2`.) - Labels: `type: feature`, `layer: schema` -- [ ] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`. +- [~] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`. + **Feature-catalog half discharged in `LTV-Pl` (#119):** `CUSTOMER_SNAPSHOT_FEATURES` (three `ltv_revenue_{90,365,730}d` targets, the - secondary `churned_within_180d`, the `mrr_change_full_period` trap); regression - task specs + a `task_type` (`regression` | `classification`) on the task model. - - Tests: feature-spec invariants, regression task-spec shape. + secondary `churned_within_180d`, the `mrr_change_full_period` trap) is + authored in `schemes/lifecycle/features.py` (post-reorg home, per the + `LTV-M2` note above) because the snapshot builder needs it. **Remaining + scope (folds into `LTV-Pn`):** regression task specs + a `task_type` + (`regression` | `classification`) on the task model — they belong with the + task-split writer's continuous-target path. + - Tests: feature-spec invariants ✓ (#119); regression task-spec shape → `LTV-Pn`. - Labels: `type: feature`, `layer: schema` --- @@ -192,13 +197,26 @@ Total: ~19 PRs across 9 milestones. ## `LTV-M5` — Customer snapshots + pLTV targets (both regimes) -- [ ] **`LTV-Pl`** — `feat(lifecycle): calendar-anchored customer snapshot`. - `build_customer_snapshot(cutoff=observation_date)`: last-12-week health - aggregates; `mrr_change_at_snapshot` (valid) + `mrr_change_full_period` - (trap); the three `ltv_revenue_{90,365,730}d` gross-revenue targets + - `churned_within_180d`; difficulty distortions. - - Tests: no post-cutoff data in windowed columns; ZILN target shape; trap - invariant; target derivation; trap exempt from distortion. +- [x] **`LTV-Pl`** — `feat(lifecycle): calendar-anchored customer snapshot` + (**PR #119**). `schemes/lifecycle/snapshots.py`: + `build_customer_snapshot(cutoff=…)` — one row per active-at-cutoff customer; + at-cutoff subscription state reconstructed from the event chain (not the + terminal row); last-12-week health aggregates + whole-history `last_nps_score`; + `mrr_change_at_snapshot` (valid) + `mrr_change_full_period` (trap, all modes, + distortion-exempt); `ltv_revenue_{90,365,730}d` (gross = paid + recovered + invoices, attributed by issuance date) + `churned_within_180d`. + `CUSTOMER_SNAPSHOT_FEATURES` catalog in `schemes/lifecycle/features.py` + (discharges the `LTV-Pc` catalog half). Difficulty distortions extracted to + the scheme-agnostic `render/distortions.py` (lead-scoring delegates; + verified byte-identical). **Deliberately omitted from the catalog:** + `current_plan` (no plan-change mechanism → exact duplicate of + `initial_plan`) and `downgrade_count` (no downgrade mechanism → + zero-variance); re-add only with the mechanism. + - Tests (39): censoring-based leakage probe (features identical when all + post-cutoff events are deleted); target derivation vs the invoice table; + failed/written-off exclusion (D7); ZILN target shape; trap-divergence + invariant; trap + targets exempt from distortion; weeks_to_next_renewal + agrees with `is_renewal_week`. - Labels: `type: feature`, `layer: render` - [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. Reuse the snapshot builder with a per-customer relative cutoff diff --git a/leadforge/render/distortions.py b/leadforge/render/distortions.py new file mode 100644 index 0000000..1829b82 --- /dev/null +++ b/leadforge/render/distortions.py @@ -0,0 +1,147 @@ +"""Scheme-agnostic difficulty distortions for snapshot tables. + +:func:`apply_difficulty_distortions` injects Gaussian noise, MCAR missingness, +and outliers into the numeric feature columns of a snapshot DataFrame, +parameterized by a scheme's :class:`~leadforge.schema.features.FeatureSpec` +catalog. Extracted from the lead-scoring snapshot builder (verbatim op order +and RNG substream, so existing outputs stay byte-identical) so the lifecycle +scheme can share it. + +Column eligibility is derived from the feature catalog rather than runtime +dtype sniffing — categoricals, booleans, IDs, and target columns are never +distorted even if their runtime dtype happens to be numeric. Callers exempt +pedagogical leakage-trap columns explicitly (distorting a trap muddies the +lesson the trap exists to teach). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +from leadforge.core.rng import RNGRoot + +if TYPE_CHECKING: + from collections.abc import Sequence + + import pandas as pd + + from leadforge.core.models import DifficultyParams + from leadforge.schema.features import FeatureSpec + +__all__ = ["apply_difficulty_distortions"] + +_FLOAT_DTYPES = ("Float64", "float64") +_NUMERIC_DTYPES = ("Float64", "float64", "Int64", "int64") + + +def apply_difficulty_distortions( + df: pd.DataFrame, + params: DifficultyParams, + seed: int, + *, + feature_specs: Sequence[FeatureSpec], + exempt_cols: frozenset[str] = frozenset(), + rng_substream: str = "snapshot_distortions", +) -> pd.DataFrame: + """Apply noise, missingness, and outliers to numeric snapshot features. + + Args: + df: The snapshot table. Not mutated — a new DataFrame is returned. + params: Difficulty knobs (``noise_scale``, ``missing_rate``, + ``outlier_rate``); a knob at 0 disables that distortion. + seed: Seed for the distortion RNG substream. Pass the generation + seed so distortions are deterministic per run. + feature_specs: The scheme's snapshot feature catalog. Float-dtyped, + non-target, non-exempt features receive noise and outliers; all + numeric non-target, non-exempt features receive missingness. + Targets are never distorted. + exempt_cols: Columns excluded from every distortion — deliberate + leakage traps whose signal must survive intact. + rng_substream: Name of the numpy child stream. Schemes with multiple + distortion call sites must use distinct names. + + Returns: + A distorted copy of *df*. + """ + float_distortion_cols = [ + f.name + for f in feature_specs + if f.dtype in _FLOAT_DTYPES and not f.is_target and f.name not in exempt_cols + ] + numeric_distortion_cols = [ + f.name + for f in feature_specs + if f.dtype in _NUMERIC_DTYPES and not f.is_target and f.name not in exempt_cols + ] + # Post-noise physical-range clamps, derived from FeatureSpec.non_negative + # so the lists stay in sync automatically when features are added/renamed. + nonneg_float_cols = frozenset( + f.name for f in feature_specs if f.dtype in _FLOAT_DTYPES and f.non_negative + ) + nonneg_int_cols = frozenset( + f.name for f in feature_specs if f.dtype in ("Int64", "int64") and f.non_negative + ) + + df = df.copy() + rng_root = RNGRoot(seed) + np_rng = rng_root.numpy_child(rng_substream) + + # Filter to columns actually present (guards against feature spec drift). + float_cols = [c for c in float_distortion_cols if c in df.columns] + all_numeric_cols = [c for c in numeric_distortion_cols if c in df.columns] + + # 1. Gaussian noise on float features only (avoids int casting issues). + if params.noise_scale > 0: + for col in float_cols: + valid_mask = df[col].notna() + if valid_mask.sum() == 0: + continue + col_std = float(df.loc[valid_mask, col].std()) + if col_std == 0 or np.isnan(col_std): + continue + noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df)) + # Add noise only where values are valid. + values = df[col].copy() + values[valid_mask] = values[valid_mask] + noise[valid_mask.values] + df[col] = values + + # 1b. Post-noise clamp to physical ranges. + # Non-negative float columns: clip to >= 0. + for col in nonneg_float_cols: + if col in df.columns and df[col].notna().any(): + df[col] = df[col].clip(lower=0) + # Non-negative int columns: clip to >= 0. clip() preserves Int64 dtype. + for col in nonneg_int_cols: + if col in df.columns and df[col].notna().any(): + df[col] = df[col].clip(lower=0) + + # 2. MCAR missingness injection (all numeric columns). + if params.missing_rate > 0: + mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate + for i, col in enumerate(all_numeric_cols): + col_mask = mask[:, i] + if col_mask.any(): + # Convert int columns to float to support NaN. + if df[col].dtype in ("int64", "Int64"): + df[col] = df[col].astype("Float64") + df.loc[col_mask, col] = np.nan + + # 3. Outlier injection (float columns only). Uses 5σ to produce values + # clearly distinguishable from natural variation. + if params.outlier_rate > 0: + for col in float_cols: + valid_mask = df[col].notna() + col_std = float(df.loc[valid_mask, col].std()) + if col_std == 0 or np.isnan(col_std): + continue + col_median = float(df[col].median()) + outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate + signs = np_rng.choice([-1, 1], size=len(df)).astype(float) + outlier_values = col_median + signs * 5 * col_std + combined = outlier_mask & valid_mask.values + if combined.any(): + df.loc[combined, col] = outlier_values[combined] + + return df diff --git a/leadforge/schemes/lead_scoring/render/snapshots.py b/leadforge/schemes/lead_scoring/render/snapshots.py index e905957..54544a6 100644 --- a/leadforge/schemes/lead_scoring/render/snapshots.py +++ b/leadforge/schemes/lead_scoring/render/snapshots.py @@ -13,10 +13,9 @@ from typing import TYPE_CHECKING -import numpy as np import pandas as pd -from leadforge.core.rng import RNGRoot +from leadforge.render.distortions import apply_difficulty_distortions from leadforge.schemes.lead_scoring.entities import ( OpportunityRow, SalesActivityRow, @@ -349,14 +348,16 @@ def build_snapshot( # Difficulty distortion helpers # --------------------------------------------------------------------------- -# Derive eligible columns from the feature spec rather than runtime dtype -# sniffing. This guarantees categoricals, booleans, IDs, and labels are -# never distorted even if their runtime dtype happens to be numeric. +# The distortion algorithm itself is scheme-agnostic and lives in +# leadforge.render.distortions (extracted in LTV-Pl, byte-identical for this +# scheme). The lead-scoring-specific knowledge kept here: which columns are +# exempt, and the catalog the column lists derive from. # total_touches_all is a pedagogical leakage trap — distorting it muddies the # lesson (up to 18% NaN on Advanced tier hides the trap). Exempt it explicitly. _DISTORTION_EXEMPT_COLS: frozenset[str] = frozenset({"total_touches_all"}) +# Derived column lists, retained for tests and introspection. _FLOAT_DISTORTION_COLS: list[str] = [ f.name for f in LEAD_SNAPSHOT_FEATURES @@ -372,17 +373,6 @@ def build_snapshot( and f.name not in _DISTORTION_EXEMPT_COLS ] -# Post-noise physical-range clamps. Applied after Gaussian noise to prevent -# non-physical values (e.g. negative durations, negative counts). -# Derived from FeatureSpec.non_negative so the lists stay in sync automatically -# when features are added or renamed — no manual maintenance required. -_NONNEG_FLOAT_COLS: frozenset[str] = frozenset( - f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Float64", "float64") and f.non_negative -) -_NONNEG_INT_COLS: frozenset[str] = frozenset( - f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Int64", "int64") and f.non_negative -) - def _apply_difficulty_distortions( df: pd.DataFrame, @@ -391,66 +381,14 @@ def _apply_difficulty_distortions( ) -> pd.DataFrame: """Apply noise, missingness, and outliers to numeric snapshot features. - Returns a new DataFrame — the input is not mutated. + Returns a new DataFrame — the input is not mutated. Delegates to the + shared scheme-agnostic implementation with this scheme's feature catalog + and exemptions. """ - df = df.copy() - rng_root = RNGRoot(seed) - np_rng = rng_root.numpy_child("snapshot_distortions") - - # Filter to columns actually present (guards against feature spec drift). - float_cols = [c for c in _FLOAT_DISTORTION_COLS if c in df.columns] - all_numeric_cols = [c for c in _NUMERIC_DISTORTION_COLS if c in df.columns] - - # 1. Gaussian noise on float features only (avoids int casting issues). - if params.noise_scale > 0: - for col in float_cols: - valid_mask = df[col].notna() - if valid_mask.sum() == 0: - continue - col_std = float(df.loc[valid_mask, col].std()) - if col_std == 0 or np.isnan(col_std): - continue - noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df)) - # Add noise only where values are valid. - values = df[col].copy() - values[valid_mask] = values[valid_mask] + noise[valid_mask.values] - df[col] = values - - # 1b. Post-noise clamp to physical ranges. - # Non-negative float columns: clip to >= 0. - for col in _NONNEG_FLOAT_COLS: - if col in df.columns and df[col].notna().any(): - df[col] = df[col].clip(lower=0) - # Non-negative int columns: clip to >= 0. clip() preserves Int64 dtype. - for col in _NONNEG_INT_COLS: - if col in df.columns and df[col].notna().any(): - df[col] = df[col].clip(lower=0) - - # 2. MCAR missingness injection (all numeric columns). - if params.missing_rate > 0: - mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate - for i, col in enumerate(all_numeric_cols): - col_mask = mask[:, i] - if col_mask.any(): - # Convert int columns to float to support NaN. - if df[col].dtype in ("int64", "Int64"): - df[col] = df[col].astype("Float64") - df.loc[col_mask, col] = np.nan - - # 3. Outlier injection (float columns only). Uses 5σ to produce values - # clearly distinguishable from natural variation. - if params.outlier_rate > 0: - for col in float_cols: - valid_mask = df[col].notna() - col_std = float(df.loc[valid_mask, col].std()) - if col_std == 0 or np.isnan(col_std): - continue - col_median = float(df[col].median()) - outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate - signs = np_rng.choice([-1, 1], size=len(df)).astype(float) - outlier_values = col_median + signs * 5 * col_std - combined = outlier_mask & valid_mask.values - if combined.any(): - df.loc[combined, col] = outlier_values[combined] - - return df + return apply_difficulty_distortions( + df, + params, + seed, + feature_specs=LEAD_SNAPSHOT_FEATURES, + exempt_cols=_DISTORTION_EXEMPT_COLS, + ) diff --git a/leadforge/schemes/lifecycle/features.py b/leadforge/schemes/lifecycle/features.py new file mode 100644 index 0000000..6bed41e --- /dev/null +++ b/leadforge/schemes/lifecycle/features.py @@ -0,0 +1,260 @@ +"""Customer snapshot feature catalog for the lifecycle (pLTV) scheme. + +:data:`CUSTOMER_SNAPSHOT_FEATURES` is the canonical column spec for the +customer snapshot table built by +:func:`~leadforge.schemes.lifecycle.snapshots.build_customer_snapshot` — +the lifecycle analogue of the lead-scoring catalog +(:data:`~leadforge.schemes.lead_scoring.features.LEAD_SNAPSHOT_FEATURES`). +The same catalog serves both observation regimes (calendar-anchored and +tenure-anchored early-pLTV); only the cutoff differs (design.md §4). + +Targets (design.md §8, D6/D7/D9): + +- ``ltv_revenue_{90,365,730}d`` — gross revenue (paid + recovered invoices) + in the forward window after the cutoff. Continuous, zero-inflated, + right-skewed (ZILN-shaped) regression targets. +- ``churned_within_180d`` — secondary churn label / ZILN zero-inflation + indicator. + +Leakage trap (design.md §7): ``mrr_change_full_period`` is computed through +the **end of simulation** — post-cutoff expansions, which directly drive the +pLTV targets, inflate it. ``leakage_risk=True`` but never redacted: it is a +deliberate pedagogical trap, retained in all modes and exempt from difficulty +distortions. + +Deliberately absent (vs. design.md §8's draft list): + +- ``current_plan`` — the engine has no plan-change mechanism; the column + would duplicate ``initial_plan`` exactly. +- ``downgrade_count`` — no downgrade mechanism exists (see the engine module + docstring); the column would be zero-variance, violating the published + no-zero-variance-features invariant. Revisit if a downgrade mechanism + lands. +""" + +from __future__ import annotations + +from leadforge.schema.features import FeatureSpec + +__all__ = ["CUSTOMER_SNAPSHOT_FEATURES"] + +CUSTOMER_SNAPSHOT_FEATURES: list[FeatureSpec] = [ + # -- identifiers -------------------------------------------------------- + FeatureSpec( + name="customer_id", + dtype="string", + description="Stable customer identifier (join key; not a feature).", + category="customer_meta", + ), + FeatureSpec( + name="account_id", + dtype="string", + description="Owning account identifier (join key; not a feature).", + category="customer_meta", + ), + # -- account firmographics ---------------------------------------------- + FeatureSpec( + name="industry", + dtype="string", + description="Account industry vertical.", + category="account", + ), + FeatureSpec( + name="region", + dtype="string", + description="Account geographic region.", + category="account", + ), + FeatureSpec( + name="employee_band", + dtype="string", + description="Account employee-count band.", + category="account", + ), + FeatureSpec( + name="estimated_revenue_band", + dtype="string", + description="Account estimated-annual-revenue band.", + category="account", + ), + # -- customer / subscription state at the cutoff ------------------------- + FeatureSpec( + name="tenure_weeks", + dtype="Int64", + description="Whole weeks from customer start to the snapshot cutoff.", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="initial_plan", + dtype="string", + description="Plan tier at signing (starter / growth / enterprise).", + category="subscription", + ), + FeatureSpec( + name="initial_mrr", + dtype="Int64", + description="Monthly recurring revenue (USD) at signing.", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="current_mrr", + dtype="Int64", + description="Monthly recurring revenue (USD) as of the cutoff.", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="mrr_change_at_snapshot", + dtype="Int64", + description=( + "current_mrr - initial_mrr, both measured at the cutoff. The " + "leakage-safe counterpart of mrr_change_full_period." + ), + category="subscription", + ), + FeatureSpec( + name="renewal_count", + dtype="Int64", + description="Contract renewals completed at or before the cutoff.", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="expansion_count", + dtype="Int64", + description="Expansion (upsell) events at or before the cutoff.", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="contract_term_months", + dtype="Int64", + description="Contract term length in months (12 or 24).", + category="subscription", + non_negative=True, + ), + FeatureSpec( + name="weeks_to_next_renewal", + dtype="Int64", + description="Weeks from the cutoff to the next contract anniversary.", + category="subscription", + non_negative=True, + ), + # -- health aggregates (last 12 weeks before the cutoff) ----------------- + FeatureSpec( + name="avg_active_users_l12w", + dtype="Float64", + description="Mean weekly active users over the 12 weeks before the cutoff.", + category="health", + non_negative=True, + ), + FeatureSpec( + name="active_user_trend_l12w", + dtype="Float64", + description=( + "OLS slope of weekly active users over the 12 weeks before the " + "cutoff (users per week; negative = declining usage)." + ), + category="health", + ), + FeatureSpec( + name="avg_feature_depth_l12w", + dtype="Float64", + description="Mean feature-depth score over the 12 weeks before the cutoff.", + category="health", + non_negative=True, + ), + FeatureSpec( + name="support_ticket_count_l12w", + dtype="Int64", + description="Support tickets filed in the 12 weeks before the cutoff.", + category="health", + non_negative=True, + ), + FeatureSpec( + name="last_nps_score", + dtype="Int64", + description=( + "Most recent NPS response (0-10) at or before the cutoff; null if " + "the customer has not yet answered a quarterly survey." + ), + category="health", + non_negative=True, + ), + # -- financial ----------------------------------------------------------- + FeatureSpec( + name="payment_failure_count", + dtype="Int64", + description="Invoice payment failures at or before the cutoff.", + category="financial", + non_negative=True, + ), + FeatureSpec( + name="weeks_since_last_payment_failure", + dtype="Int64", + description=( + "Weeks from the most recent payment failure to the cutoff; null " + "if the customer has never had one." + ), + category="financial", + non_negative=True, + ), + # -- leakage trap (deliberate, all modes; see module docstring) ----------- + FeatureSpec( + name="mrr_change_full_period", + dtype="Int64", + description=( + "MRR delta from signing to END OF SIMULATION. LEAKAGE TRAP: " + "post-cutoff expansions, which directly drive the pLTV targets, " + "inflate this column. Use mrr_change_at_snapshot instead." + ), + category="subscription", + leakage_risk=True, + ), + # -- targets (D6 windows; D7 gross revenue; D9 secondary churn) ---------- + FeatureSpec( + name="ltv_revenue_90d", + dtype="Float64", + description=( + "Gross revenue (USD, paid + recovered invoices) in the 90 days " + "after the cutoff. Primary pLTV regression target (warm-up)." + ), + category="target", + is_target=True, + non_negative=True, + ), + FeatureSpec( + name="ltv_revenue_365d", + dtype="Float64", + description=( + "Gross revenue (USD, paid + recovered invoices) in the 365 days " + "after the cutoff. Primary pLTV regression target (standard)." + ), + category="target", + is_target=True, + non_negative=True, + ), + FeatureSpec( + name="ltv_revenue_730d", + dtype="Float64", + description=( + "Gross revenue (USD, paid + recovered invoices) in the 730 days " + "after the cutoff. Primary pLTV regression target (hard)." + ), + category="target", + is_target=True, + non_negative=True, + ), + FeatureSpec( + name="churned_within_180d", + dtype="boolean", + description=( + "True iff the customer churned within 180 days after the cutoff. " + "Secondary task / ZILN zero-inflation indicator." + ), + category="target", + is_target=True, + ), +] diff --git a/leadforge/schemes/lifecycle/snapshots.py b/leadforge/schemes/lifecycle/snapshots.py new file mode 100644 index 0000000..6041c6d --- /dev/null +++ b/leadforge/schemes/lifecycle/snapshots.py @@ -0,0 +1,339 @@ +"""Customer snapshot builder — flatten the lifecycle simulation into an +ML-ready pLTV table. + +:func:`build_customer_snapshot` produces one row per customer **active at the +cutoff**, containing the features defined in +:data:`~leadforge.schemes.lifecycle.features.CUSTOMER_SNAPSHOT_FEATURES`. + +Snapshot-safety contract (design.md §5): every feature column is computed +exclusively from events at or before the cutoff — with one deliberate +exception, the ``mrr_change_full_period`` leakage trap (design.md §7), which +reads the end-of-simulation MRR. The targets (``ltv_revenue_{90,365,730}d``, +``churned_within_180d``) are forward-window aggregates by construction and are +never published as features. + +Cutoff semantics +---------------- +The calendar-anchored regime (this PR, LTV-Pl) snapshots every customer at the +shared absolute ``observation_date``. The tenure-anchored early-pLTV regime +(LTV-Pm) will reuse the same per-customer machinery with a relative cutoff. +The cutoff must not exceed the population's ``observation_date``: the engine +only guarantees full forward-window simulation up to +``observation_date + forward_window_days`` (D6), so a later cutoff would +silently censor the targets. + +Revenue attribution (D7) +------------------------ +``ltv_revenue_*`` sums ``amount_usd`` over invoices with +``cutoff < invoice_date <= cutoff + window`` whose **terminal** payment status +is ``paid`` or ``recovered`` (a recovered invoice is gross revenue collected +late; ``failed`` / ``written_off`` invoices never count). Attribution is by +issuance date, so a window-edge invoice recovered after the window still +counts toward the window it was issued in. +""" + +from __future__ import annotations + +from datetime import date, timedelta +from typing import TYPE_CHECKING + +import numpy as np +import pandas as pd + +from leadforge.render.distortions import apply_difficulty_distortions +from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES + +if TYPE_CHECKING: + from leadforge.core.models import DifficultyParams + from leadforge.schemes.lifecycle.engine import LifecycleSimulationResult + from leadforge.schemes.lifecycle.population import CustomerPopulationResult + +__all__ = [ + "CHURN_WINDOW_DAYS", + "FORWARD_WINDOWS_DAYS", + "HEALTH_WINDOW_WEEKS", + "build_customer_snapshot", +] + +# pLTV forward windows (D6) and the secondary churn-label window (D9). +FORWARD_WINDOWS_DAYS: tuple[int, ...] = (90, 365, 730) +CHURN_WINDOW_DAYS = 180 + +# Look-back window for the health aggregates (*_l12w columns). +HEALTH_WINDOW_WEEKS = 12 + +# Invoice terminal statuses that count as collected gross revenue (D7). +_REVENUE_STATUSES = frozenset({"paid", "recovered"}) + +_WEEKS_PER_MONTH = 52.0 / 12.0 + +_SNAPSHOT_COLUMNS = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] +_SNAPSHOT_DTYPES = {f.name: f.dtype for f in CUSTOMER_SNAPSHOT_FEATURES} + +# The trap must survive distortion intact (same policy as the lead-scoring +# total_touches_all trap): noise/missingness on it would muddy the lesson. +_DISTORTION_EXEMPT_COLS: frozenset[str] = frozenset({"mrr_change_full_period"}) + + +def build_customer_snapshot( + population: CustomerPopulationResult, + sim: LifecycleSimulationResult, + *, + cutoff: date | None = None, + difficulty_params: DifficultyParams | None = None, + seed: int = 42, +) -> pd.DataFrame: + """Build the calendar-anchored customer snapshot table. + + Args: + population: Output of + :func:`~leadforge.schemes.lifecycle.population.build_customer_population`. + sim: Output of + :func:`~leadforge.schemes.lifecycle.engine.simulate_lifecycle` for + the same population. + cutoff: Snapshot anchor date. Defaults to the population's + ``observation_date``. Must not be later than it (the engine only + guarantees complete forward windows up to that anchor). + difficulty_params: Optional difficulty knobs; when given, noise / + missingness / outliers are applied to the numeric feature columns + (never to targets, identifiers, or the leakage trap). + seed: Seed for the distortion RNG substream. + + Returns: + One row per customer active at the cutoff (started at or before it, + not yet churned), with columns in catalog order. Customers who + started after the cutoff or churned at/before it are excluded. + + Raises: + ValueError: if the population lacks an ``observation_date`` or the + cutoff exceeds it. + """ + if not population.observation_date: + raise ValueError("population.observation_date is not set") + obs_date = date.fromisoformat(population.observation_date) + if cutoff is None: + cutoff = obs_date + elif cutoff > obs_date: + raise ValueError( + f"cutoff {cutoff.isoformat()} is after observation_date " + f"{population.observation_date}; forward-window targets would be censored" + ) + + accounts = {a.account_id: a for a in population.accounts} + subscriptions = {s.customer_id: s for s in sim.subscriptions} + + # Eligibility: started at or before the cutoff, still active at it. + eligible = [] + for customer in population.customers: + start = date.fromisoformat(customer.customer_start_at) + if start > cutoff: + continue + sub = subscriptions[customer.customer_id] + if sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= cutoff: + continue + eligible.append((customer, sub, start)) + + if not eligible: + return _empty_snapshot() + + events = _event_aggregates(sim, cutoff) + health = _health_aggregates(sim, cutoff) + revenue = _forward_revenue(sim, cutoff) + + records: list[dict[str, object]] = [] + for customer, sub, start in eligible: + account = accounts[customer.account_id] + tenure_weeks = (cutoff - start).days // 7 + ev = events.get(customer.customer_id, _EMPTY_EVENT_AGG) + hl = health.get(customer.customer_id, _EMPTY_HEALTH_AGG) + rv = revenue.get(customer.customer_id, {}) + + current_mrr = customer.initial_mrr + ev["mrr_delta"] + churn_date = date.fromisoformat(sub.churn_at) if sub.churn_at else None + records.append( + { + "customer_id": customer.customer_id, + "account_id": customer.account_id, + "industry": account.industry, + "region": account.region, + "employee_band": account.employee_band, + "estimated_revenue_band": account.estimated_revenue_band, + "tenure_weeks": tenure_weeks, + "initial_plan": customer.initial_plan, + "initial_mrr": customer.initial_mrr, + "current_mrr": current_mrr, + "mrr_change_at_snapshot": ev["mrr_delta"], + "renewal_count": ev["renewal_count"], + "expansion_count": ev["expansion_count"], + "contract_term_months": customer.contract_term_months, + "weeks_to_next_renewal": _weeks_to_next_renewal( + tenure_weeks, customer.contract_term_months + ), + "avg_active_users_l12w": hl["avg_active_users"], + "active_user_trend_l12w": hl["trend"], + "avg_feature_depth_l12w": hl["avg_depth"], + "support_ticket_count_l12w": hl["tickets"], + "last_nps_score": hl["last_nps"], + "payment_failure_count": ev["payment_failure_count"], + "weeks_since_last_payment_failure": ( + (cutoff - ev["last_failure_date"]).days // 7 + if ev["last_failure_date"] is not None + else None + ), + "mrr_change_full_period": sub.current_mrr - customer.initial_mrr, + **{ + f"ltv_revenue_{window}d": float(rv.get(window, 0)) + for window in FORWARD_WINDOWS_DAYS + }, + "churned_within_180d": ( + churn_date is not None + and churn_date <= cutoff + timedelta(days=CHURN_WINDOW_DAYS) + ), + } + ) + + snapshot = pd.DataFrame.from_records(records)[_SNAPSHOT_COLUMNS] + for col, dtype in _SNAPSHOT_DTYPES.items(): + snapshot[col] = snapshot[col].astype(dtype) + + if difficulty_params is not None: + snapshot = apply_difficulty_distortions( + snapshot, + difficulty_params, + seed, + feature_specs=CUSTOMER_SNAPSHOT_FEATURES, + exempt_cols=_DISTORTION_EXEMPT_COLS, + ) + + return snapshot + + +# --------------------------------------------------------------------------- +# Per-table aggregation helpers +# --------------------------------------------------------------------------- + +_EMPTY_EVENT_AGG: dict = { + "mrr_delta": 0, + "renewal_count": 0, + "expansion_count": 0, + "payment_failure_count": 0, + "last_failure_date": None, +} + +_EMPTY_HEALTH_AGG: dict = { + "avg_active_users": None, + "trend": None, + "avg_depth": None, + "tickets": 0, + "last_nps": None, +} + + +def _event_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict]: + """Aggregate subscription events at or before *cutoff*, per customer.""" + out: dict[str, dict] = {} + cutoff_iso = cutoff.isoformat() + for event in sim.subscription_events: + # ISO dates compare correctly as strings — avoids per-event parsing. + if event.event_timestamp > cutoff_iso: + continue + agg = out.setdefault(event.customer_id, dict(_EMPTY_EVENT_AGG)) + if event.event_type == "expansion": + agg["mrr_delta"] += event.mrr_after - event.mrr_before + agg["expansion_count"] += 1 + elif event.event_type == "renewal": + agg["renewal_count"] += 1 + elif event.event_type == "payment_failure": + agg["payment_failure_count"] += 1 + agg["last_failure_date"] = date.fromisoformat(event.event_timestamp) + return out + + +def _health_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict]: + """Aggregate health signals into the last-12-week window features. + + ``last_nps_score`` looks back over the customer's whole history (NPS is + quarterly — a 12-week window would miss most customers' latest response + purely by phase), while the ``*_l12w`` aggregates use the + ``(cutoff - 12w, cutoff]`` window. + """ + window_start_iso = (cutoff - timedelta(weeks=HEALTH_WINDOW_WEEKS)).isoformat() + cutoff_iso = cutoff.isoformat() + + users: dict[str, list[tuple[str, int]]] = {} + depths: dict[str, list[float]] = {} + tickets: dict[str, int] = {} + last_nps: dict[str, int] = {} + for signal in sim.health_signals: + ts = signal.period_start + if ts > cutoff_iso: + continue + if signal.nps_score is not None: + # Signals are chronological per customer — last write wins. + last_nps[signal.customer_id] = signal.nps_score + if ts <= window_start_iso: + continue + users.setdefault(signal.customer_id, []).append((ts, signal.active_users)) + depths.setdefault(signal.customer_id, []).append(signal.feature_depth_score) + tickets[signal.customer_id] = tickets.get(signal.customer_id, 0) + signal.support_tickets + + out: dict[str, dict] = {} + for customer_id, points in users.items(): + weeks = [(date.fromisoformat(ts) - cutoff).days / 7.0 for ts, _ in points] + counts = [n for _, n in points] + if len(points) >= 2: + trend = float(np.polyfit(weeks, counts, 1)[0]) + else: + trend = None + out[customer_id] = { + "avg_active_users": float(np.mean(counts)), + "trend": trend, + "avg_depth": float(np.mean(depths[customer_id])), + "tickets": tickets[customer_id], + "last_nps": last_nps.get(customer_id), + } + # Customers with an NPS response but no in-window signals cannot occur + # (an active customer always has signals in the trailing window), but a + # defensive merge keeps last_nps consistent if eligibility ever widens. + for customer_id, nps in last_nps.items(): + out.setdefault(customer_id, dict(_EMPTY_HEALTH_AGG))["last_nps"] = nps + return out + + +def _forward_revenue(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict[int, int]]: + """Sum collected gross revenue per customer per forward window (D7).""" + bounds = { + window: (cutoff + timedelta(days=window)).isoformat() for window in FORWARD_WINDOWS_DAYS + } + cutoff_iso = cutoff.isoformat() + out: dict[str, dict[int, int]] = {} + for invoice in sim.invoices: + if invoice.payment_status not in _REVENUE_STATUSES: + continue + ts = invoice.invoice_date + if ts <= cutoff_iso: + continue + sums = out.setdefault(invoice.customer_id, dict.fromkeys(FORWARD_WINDOWS_DAYS, 0)) + for window, bound in bounds.items(): + if ts <= bound: + sums[window] += invoice.amount_usd + return out + + +def _weeks_to_next_renewal(tenure_weeks: int, contract_term_months: int) -> int: + """Weeks from the cutoff to the next contract anniversary. + + Anniversaries fall at ``round(k · term · 52/12)`` weeks — the same + boundary :func:`~leadforge.schemes.lifecycle.hazards.is_renewal_week` + uses, so the feature and the hazard spike agree exactly. + """ + term_weeks = contract_term_months * _WEEKS_PER_MONTH + k = 1 + while round(k * term_weeks) <= tenure_weeks: + k += 1 + return round(k * term_weeks) - tenure_weeks + + +def _empty_snapshot() -> pd.DataFrame: + df = pd.DataFrame({name: pd.Series(dtype=_SNAPSHOT_DTYPES[name]) for name in _SNAPSHOT_COLUMNS}) + return df[_SNAPSHOT_COLUMNS] diff --git a/tests/schemes/lifecycle/test_features.py b/tests/schemes/lifecycle/test_features.py new file mode 100644 index 0000000..e91b6ab --- /dev/null +++ b/tests/schemes/lifecycle/test_features.py @@ -0,0 +1,79 @@ +"""Tests for the lifecycle customer-snapshot feature catalog (LTV-Pl).""" + +from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES + +_VALID_DTYPES = {"string", "Int64", "Float64", "boolean"} +_VALID_CATEGORIES = {"customer_meta", "account", "subscription", "health", "financial", "target"} + + +def test_feature_names_unique() -> None: + names = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] + assert len(names) == len(set(names)) + + +def test_dtypes_are_pandas_nullable() -> None: + for f in CUSTOMER_SNAPSHOT_FEATURES: + assert f.dtype in _VALID_DTYPES, f"{f.name}: {f.dtype}" + + +def test_categories_valid() -> None: + for f in CUSTOMER_SNAPSHOT_FEATURES: + assert f.category in _VALID_CATEGORIES, f"{f.name}: {f.category}" + + +def test_exactly_four_targets() -> None: + targets = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES if f.is_target] + assert targets == [ + "ltv_revenue_90d", + "ltv_revenue_365d", + "ltv_revenue_730d", + "churned_within_180d", + ] + + +def test_targets_are_in_target_category() -> None: + for f in CUSTOMER_SNAPSHOT_FEATURES: + assert f.is_target == (f.category == "target"), f.name + + +def test_ltv_targets_are_continuous() -> None: + for f in CUSTOMER_SNAPSHOT_FEATURES: + if f.name.startswith("ltv_revenue_"): + assert f.dtype == "Float64" + assert f.non_negative + + +def test_trap_is_descriptive_not_redacted() -> None: + """The mrr_change_full_period trap is flagged as leakage but retained in + every exposure mode (deliberate pedagogical trap, design.md §7).""" + (trap,) = [f for f in CUSTOMER_SNAPSHOT_FEATURES if f.name == "mrr_change_full_period"] + assert trap.leakage_risk + assert trap.redact_in_modes == frozenset() + assert not trap.is_target + + +def test_trap_is_only_leakage_risk_column() -> None: + risky = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES if f.leakage_risk] + assert risky == ["mrr_change_full_period"] + + +def test_identifiers_are_strings() -> None: + for name in ("customer_id", "account_id"): + (spec,) = [f for f in CUSTOMER_SNAPSHOT_FEATURES if f.name == name] + assert spec.dtype == "string" + assert spec.category == "customer_meta" + + +def test_no_mechanism_less_columns() -> None: + """current_plan and downgrade_count are deliberately absent: the engine + has no plan-change or downgrade mechanism, so they would be a duplicate + column and a zero-variance column respectively (see features.py + docstring). Re-add them only together with the mechanism.""" + names = {f.name for f in CUSTOMER_SNAPSHOT_FEATURES} + assert "current_plan" not in names + assert "downgrade_count" not in names + + +def test_valid_mrr_change_counterpart_present() -> None: + names = {f.name for f in CUSTOMER_SNAPSHOT_FEATURES} + assert "mrr_change_at_snapshot" in names diff --git a/tests/schemes/lifecycle/test_snapshots.py b/tests/schemes/lifecycle/test_snapshots.py new file mode 100644 index 0000000..450d8df --- /dev/null +++ b/tests/schemes/lifecycle/test_snapshots.py @@ -0,0 +1,366 @@ +"""Tests for the calendar-anchored customer snapshot builder (LTV-Pl).""" + +from dataclasses import replace +from datetime import date, timedelta + +import pandas as pd +import pytest + +from leadforge.core.models import DifficultyParams +from leadforge.schemes.lifecycle.engine import ( + LifecycleSimulationResult, + simulate_lifecycle, +) +from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES +from leadforge.schemes.lifecycle.hazards import is_renewal_week +from leadforge.schemes.lifecycle.population import build_customer_population +from leadforge.schemes.lifecycle.snapshots import ( + CHURN_WINDOW_DAYS, + FORWARD_WINDOWS_DAYS, + build_customer_snapshot, +) + +_POP_SEED = 11 +_SIM_SEED = 99 +_N = 150 + +_FEATURE_COLS = [ + f.name for f in CUSTOMER_SNAPSHOT_FEATURES if not f.is_target and not f.leakage_risk +] +_TARGET_COLS = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES if f.is_target] + + +@pytest.fixture(scope="module") +def population(): + return build_customer_population(_N, _POP_SEED, motif_family="expansion_led_growth") + + +@pytest.fixture(scope="module") +def sim(population): + return simulate_lifecycle(population, _SIM_SEED) + + +@pytest.fixture(scope="module") +def snapshot(population, sim): + return build_customer_snapshot(population, sim) + + +def _difficulty_params(**overrides) -> DifficultyParams: + defaults = { + "signal_strength": 1.0, + "noise_scale": 0.3, + "missing_rate": 0.10, + "outlier_rate": 0.02, + "conversion_rate_lo": 0.02, + "conversion_rate_hi": 0.4, + "committee_friction": 0.5, + } + defaults.update(overrides) + return DifficultyParams(**defaults) + + +# --------------------------------------------------------------------------- +# Shape + schema +# --------------------------------------------------------------------------- + + +def test_one_row_per_active_at_cutoff_customer(population, sim, snapshot) -> None: + cutoff = population.observation_date + active = {s.customer_id for s in sim.subscriptions if s.churn_at is None or s.churn_at > cutoff} + assert set(snapshot["customer_id"]) == active + assert len(snapshot) == len(active) + + +def test_churned_before_cutoff_excluded(population, sim, snapshot) -> None: + cutoff = population.observation_date + churned_early = { + s.customer_id for s in sim.subscriptions if s.churn_at is not None and s.churn_at <= cutoff + } + assert churned_early, "fixture world should have some pre-cutoff churn" + assert churned_early.isdisjoint(set(snapshot["customer_id"])) + + +def test_columns_match_catalog_order(snapshot) -> None: + assert list(snapshot.columns) == [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] + + +def test_dtypes_match_catalog(snapshot) -> None: + for f in CUSTOMER_SNAPSHOT_FEATURES: + assert str(snapshot[f.name].dtype) == f.dtype, f.name + + +def test_deterministic(population, sim, snapshot) -> None: + again = build_customer_snapshot(population, sim) + pd.testing.assert_frame_equal(snapshot, again) + + +def test_empty_when_cutoff_precedes_all_starts(population, sim) -> None: + earliest = min(date.fromisoformat(c.customer_start_at) for c in population.customers) + snap = build_customer_snapshot(population, sim, cutoff=earliest - timedelta(days=1)) + assert len(snap) == 0 + assert list(snap.columns) == [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def test_rejects_cutoff_after_observation_date(population, sim) -> None: + late = date.fromisoformat(population.observation_date) + timedelta(days=1) + with pytest.raises(ValueError, match="censored"): + build_customer_snapshot(population, sim, cutoff=late) + + +def test_rejects_population_without_observation_date(population, sim) -> None: + broken = replace(population, observation_date="") + with pytest.raises(ValueError, match="observation_date"): + build_customer_snapshot(broken, sim) + + +# --------------------------------------------------------------------------- +# Snapshot safety: features must not see past the cutoff +# --------------------------------------------------------------------------- + + +def test_features_identical_under_post_cutoff_censoring(population, sim, snapshot) -> None: + """Every non-target, non-trap column must be reproducible from a sim + result truncated at the cutoff — if deleting all post-cutoff events + changes a feature, that feature leaks.""" + cutoff = population.observation_date + censored = LifecycleSimulationResult( + subscriptions=sim.subscriptions, + subscription_events=[e for e in sim.subscription_events if e.event_timestamp <= cutoff], + health_signals=[h for h in sim.health_signals if h.period_start <= cutoff], + invoices=[i for i in sim.invoices if i.invoice_date <= cutoff], + ) + censored_snap = build_customer_snapshot(population, censored) + pd.testing.assert_frame_equal(snapshot[_FEATURE_COLS], censored_snap[_FEATURE_COLS]) + + +# --------------------------------------------------------------------------- +# Feature derivations +# --------------------------------------------------------------------------- + + +def test_mrr_chain_consistency(snapshot) -> None: + assert ( + snapshot["current_mrr"] - snapshot["initial_mrr"] == snapshot["mrr_change_at_snapshot"] + ).all() + no_expansion = snapshot["expansion_count"] == 0 + assert (snapshot.loc[no_expansion, "mrr_change_at_snapshot"] == 0).all() + + +def test_event_counts_match_event_table(population, sim, snapshot) -> None: + cutoff = population.observation_date + for _, row in snapshot.head(20).iterrows(): + events = [ + e + for e in sim.subscription_events + if e.customer_id == row["customer_id"] and e.event_timestamp <= cutoff + ] + by_type = { + t: sum(1 for e in events if e.event_type == t) for t in {e.event_type for e in events} + } + assert row["expansion_count"] == by_type.get("expansion", 0) + assert row["renewal_count"] == by_type.get("renewal", 0) + assert row["payment_failure_count"] == by_type.get("payment_failure", 0) + + +def test_tenure_weeks(population, snapshot) -> None: + cutoff = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + for _, row in snapshot.iterrows(): + assert row["tenure_weeks"] == (cutoff - starts[row["customer_id"]]).days // 7 + + +def test_weeks_to_next_renewal_lands_on_renewal_week(snapshot) -> None: + for _, row in snapshot.iterrows(): + weeks_to = int(row["weeks_to_next_renewal"]) + assert weeks_to >= 1 + assert is_renewal_week( + int(row["tenure_weeks"]) + weeks_to, int(row["contract_term_months"]) + ) + + +def test_health_aggregates_match_signal_table(population, sim, snapshot) -> None: + cutoff = date.fromisoformat(population.observation_date) + window_start = cutoff - timedelta(weeks=12) + for _, row in snapshot.head(10).iterrows(): + in_window = [ + h + for h in sim.health_signals + if h.customer_id == row["customer_id"] + and window_start < date.fromisoformat(h.period_start) <= cutoff + ] + assert in_window, "active customer must have signals in the trailing window" + assert row["avg_active_users_l12w"] == pytest.approx( + sum(h.active_users for h in in_window) / len(in_window) + ) + assert row["avg_feature_depth_l12w"] == pytest.approx( + sum(h.feature_depth_score for h in in_window) / len(in_window) + ) + assert row["support_ticket_count_l12w"] == sum(h.support_tickets for h in in_window) + + +def test_last_nps_is_latest_response_at_or_before_cutoff(population, sim, snapshot) -> None: + cutoff = population.observation_date + nps_history: dict[str, int] = {} + for h in sim.health_signals: # chronological per customer + if h.nps_score is not None and h.period_start <= cutoff: + nps_history[h.customer_id] = h.nps_score + for _, row in snapshot.iterrows(): + expected = nps_history.get(row["customer_id"]) + if expected is None: + assert pd.isna(row["last_nps_score"]) + else: + assert row["last_nps_score"] == expected + + +def test_young_customers_have_null_nps(snapshot) -> None: + young = snapshot[snapshot["tenure_weeks"] < 13] + assert young["last_nps_score"].isna().all() + + +# --------------------------------------------------------------------------- +# Leakage trap +# --------------------------------------------------------------------------- + + +def test_trap_equals_terminal_minus_initial_mrr(sim, snapshot) -> None: + terminal = {s.customer_id: s.current_mrr for s in sim.subscriptions} + for _, row in snapshot.iterrows(): + assert row["mrr_change_full_period"] == terminal[row["customer_id"]] - row["initial_mrr"] + + +def test_trap_diverges_for_nontrivial_fraction(snapshot) -> None: + """design.md §7: the trap must visibly differ from the valid counterpart + (post-cutoff expansions inflate it) for the lesson to exist at all.""" + diverges = snapshot["mrr_change_full_period"] != snapshot["mrr_change_at_snapshot"] + assert diverges.mean() > 0.10 + + +# --------------------------------------------------------------------------- +# Targets +# --------------------------------------------------------------------------- + + +def test_ltv_targets_match_invoice_table(population, sim, snapshot) -> None: + cutoff = date.fromisoformat(population.observation_date) + for _, row in snapshot.iterrows(): + for window in FORWARD_WINDOWS_DAYS: + bound = cutoff + timedelta(days=window) + expected = sum( + i.amount_usd + for i in sim.invoices + if i.customer_id == row["customer_id"] + and i.payment_status in ("paid", "recovered") + and cutoff < date.fromisoformat(i.invoice_date) <= bound + ) + assert row[f"ltv_revenue_{window}d"] == float(expected) + + +def test_ltv_windows_are_monotone(snapshot) -> None: + assert (snapshot["ltv_revenue_90d"] <= snapshot["ltv_revenue_365d"]).all() + assert (snapshot["ltv_revenue_365d"] <= snapshot["ltv_revenue_730d"]).all() + + +def test_failed_and_written_off_invoices_excluded_from_revenue(population) -> None: + """D7: only collected revenue counts. payment_fragile worlds have enough + write-offs that including them would inflate the targets measurably.""" + pf_pop = build_customer_population(_N, _POP_SEED, motif_family="payment_fragile") + pf_sim = simulate_lifecycle(pf_pop, _SIM_SEED) + cutoff = date.fromisoformat(pf_pop.observation_date) + snap = build_customer_snapshot(pf_pop, pf_sim) + bound = cutoff + timedelta(days=730) + uncollected = sum( + i.amount_usd + for i in pf_sim.invoices + if i.payment_status in ("failed", "written_off") + and cutoff < date.fromisoformat(i.invoice_date) <= bound + and i.customer_id in set(snap["customer_id"]) + ) + assert uncollected > 0, "fixture should produce post-cutoff uncollected invoices" + gross_all = sum( + i.amount_usd + for i in pf_sim.invoices + if cutoff < date.fromisoformat(i.invoice_date) <= bound + and i.customer_id in set(snap["customer_id"]) + ) + assert float(snap["ltv_revenue_730d"].sum()) == float(gross_all - uncollected) + + +def test_churn_label_matches_churn_dates(population, sim, snapshot) -> None: + cutoff = date.fromisoformat(population.observation_date) + bound = cutoff + timedelta(days=CHURN_WINDOW_DAYS) + churn_dates = { + s.customer_id: date.fromisoformat(s.churn_at) + for s in sim.subscriptions + if s.churn_at is not None + } + for _, row in snapshot.iterrows(): + churned = churn_dates.get(row["customer_id"]) + assert row["churned_within_180d"] == (churned is not None and churned <= bound) + + +def test_target_distribution_is_ziln_shaped(snapshot) -> None: + """Right-skewed with a heavy upper tail (the expansion world drives it).""" + for window in FORWARD_WINDOWS_DAYS: + col = snapshot[f"ltv_revenue_{window}d"] + assert (col >= 0).all() + assert col.mean() > col.median(), f"{window}d not right-skewed" + long_window = snapshot["ltv_revenue_730d"] + assert long_window.max() > 5 * long_window.median() + + +# --------------------------------------------------------------------------- +# Difficulty distortions +# --------------------------------------------------------------------------- + + +def test_distortions_perturb_float_features(population, sim, snapshot) -> None: + distorted = build_customer_snapshot( + population, sim, difficulty_params=_difficulty_params(), seed=7 + ) + assert not distorted["avg_active_users_l12w"].equals(snapshot["avg_active_users_l12w"]) + + +def test_distortions_inject_missingness(population, sim) -> None: + distorted = build_customer_snapshot( + population, sim, difficulty_params=_difficulty_params(missing_rate=0.25), seed=7 + ) + numeric_feature_cols = [ + f.name + for f in CUSTOMER_SNAPSHOT_FEATURES + if f.dtype in ("Int64", "Float64") and not f.is_target and not f.leakage_risk + ] + assert distorted[numeric_feature_cols].isna().sum().sum() > 0 + + +def test_distortions_never_touch_targets(population, sim, snapshot) -> None: + distorted = build_customer_snapshot( + population, sim, difficulty_params=_difficulty_params(), seed=7 + ) + pd.testing.assert_frame_equal(distorted[_TARGET_COLS], snapshot[_TARGET_COLS]) + + +def test_trap_exempt_from_distortion(population, sim, snapshot) -> None: + """Noise or missingness on the trap would hide the lesson it teaches.""" + distorted = build_customer_snapshot( + population, + sim, + difficulty_params=_difficulty_params(missing_rate=0.5, noise_scale=1.0), + seed=7, + ) + pd.testing.assert_series_equal( + distorted["mrr_change_full_period"], snapshot["mrr_change_full_period"] + ) + + +def test_distortions_deterministic_per_seed(population, sim) -> None: + params = _difficulty_params() + a = build_customer_snapshot(population, sim, difficulty_params=params, seed=7) + b = build_customer_snapshot(population, sim, difficulty_params=params, seed=7) + pd.testing.assert_frame_equal(a, b) + c = build_customer_snapshot(population, sim, difficulty_params=params, seed=8) + assert not a.equals(c) From 844fc1cced62faaab9507f3868912fb5bbed27c5 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 13:33:57 +0300 Subject: [PATCH 2/3] fix(lifecycle): address self-review findings on the snapshot builder [LTV-Pl] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five findings from hostile self-review of the snapshot PR: 1. SILENT TARGET CENSORING (the serious one). The builder validated cutoff <= observation_date but had no way to verify the sim was run with a horizon covering the target windows: simulate_lifecycle(..., forward_window_days=90) followed by build_customer_snapshot() would emit plausible-looking, deterministic, silently truncated ltv_revenue_365d/730d values — exactly the censoring failure the cutoff check exists to prevent. Fix: LifecycleSimulationResult now records forward_window_days/early_tenure_weeks, and the builder raises unless the recorded horizon covers max(730, 180) days. Regression tests on both sides. 2. The contract-anniversary boundary was COPIED, not shared — and the docstring bragged it was "the same boundary is_renewal_week uses". A claim, not a mechanism: a second copy of round(k*term*52/12) and a third copy of _WEEKS_PER_MONTH, where drift would desynchronize the published weeks_to_next_renewal feature from the hazard spike (the data<->causality agreement this dataset teaches). Fix: public hazards.next_renewal_week() beside is_renewal_week; the snapshot consumes it; duplicates deleted. Property test: the returned week is the FIRST week after the input satisfying is_renewal_week, for every week in [0, 160) across 12/13/24-month terms. 3. A mismatched (population, sim) pair died with a bare KeyError deep in the row loop — the one precondition the function depends on was the one it never checked. Fix: explicit validation with a population/sim-mismatch diagnostic naming the first missing customer. 4. The module-level _EMPTY_*_AGG fallback dicts were handed out SHARED via .get(cid, _EMPTY_...) — read-only today; the first accidental write corrupts every subsequent row. Fix: MappingProxyType (the mechanisms.py precedent), so mutation raises instead. 5. Documentation debts: the design.md §7 secondary advanced-tier trap (last_health_signal_post_obs) is now EXPLICITLY deferred to LTV-Pn in the roadmap (tier-conditional — belongs with difficulty wiring); features.py documents that MCAR missingness stacks on semantic nulls (last_nps_score, weeks_since_last_payment_failure); distortions.py documents the inherited byte-identity-locked Int64->Float64 dtype flip under missingness. Full suite 1768 passed / 51 skipped; ruff + mypy clean; lead-scoring distorted-snapshot hash still byte-identical (sha256 196bc45f...). Co-Authored-By: Claude Fable 5 --- docs/ltv/roadmap.md | 10 ++- leadforge/render/distortions.py | 6 ++ leadforge/schemes/lifecycle/engine.py | 11 ++- leadforge/schemes/lifecycle/features.py | 7 ++ leadforge/schemes/lifecycle/hazards.py | 24 +++++++ leadforge/schemes/lifecycle/snapshots.py | 81 +++++++++++++---------- tests/schemes/lifecycle/test_engine.py | 8 +++ tests/schemes/lifecycle/test_hazards.py | 40 +++++++++++ tests/schemes/lifecycle/test_snapshots.py | 16 +++++ 9 files changed, 165 insertions(+), 38 deletions(-) diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index d7c4dde..df54faa 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -212,11 +212,19 @@ Total: ~19 PRs across 9 milestones. `current_plan` (no plan-change mechanism → exact duplicate of `initial_plan`) and `downgrade_count` (no downgrade mechanism → zero-variance); re-add only with the mechanism. - - Tests (39): censoring-based leakage probe (features identical when all + - Tests (43): censoring-based leakage probe (features identical when all post-cutoff events are deleted); target derivation vs the invoice table; failed/written-off exclusion (D7); ZILN target shape; trap-divergence invariant; trap + targets exempt from distortion; weeks_to_next_renewal agrees with `is_renewal_week`. + - Self-review hardening: `LifecycleSimulationResult` records its + `forward_window_days`/`early_tenure_weeks` and the builder rejects sims + whose horizon cannot cover the 730d/180d target windows (silent-censoring + guard); anniversary boundary single-sourced via public + `hazards.next_renewal_week`; population/sim mismatch raises a real error. + - **Deferred to `LTV-Pn` (difficulty wiring):** the design.md §7 secondary + advanced-tier trap `last_health_signal_post_obs` — it is tier-conditional, + so it belongs with the difficulty-profile plumbing, not the builder. - Labels: `type: feature`, `layer: render` - [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. Reuse the snapshot builder with a per-customer relative cutoff diff --git a/leadforge/render/distortions.py b/leadforge/render/distortions.py index 1829b82..390f9e9 100644 --- a/leadforge/render/distortions.py +++ b/leadforge/render/distortions.py @@ -7,6 +7,12 @@ and RNG substream, so existing outputs stay byte-identical) so the lifecycle scheme can share it. +Known wart (inherited, locked by byte-identity with shipped lead-scoring +bundles): missingness injection converts an Int64 column to Float64 **only if +at least one of its cells is masked**, so the post-distortion dtype of integer +columns varies with seed and missing_rate. Consumers must not rely on +integer dtypes surviving distortion. + Column eligibility is derived from the feature catalog rather than runtime dtype sniffing — categoricals, booleans, IDs, and target columns are never distorted even if their runtime dtype happens to be numeric. Callers exempt diff --git a/leadforge/schemes/lifecycle/engine.py b/leadforge/schemes/lifecycle/engine.py index d581a07..e5916c8 100644 --- a/leadforge/schemes/lifecycle/engine.py +++ b/leadforge/schemes/lifecycle/engine.py @@ -116,6 +116,11 @@ class LifecycleSimulationResult: subscription_events: list[SubscriptionEventRow] = field(default_factory=list) health_signals: list[HealthSignalRow] = field(default_factory=list) invoices: list[InvoiceRow] = field(default_factory=list) + # Horizon the simulation was run with, recorded so downstream builders can + # verify their forward-window targets are fully covered (D6) instead of + # silently emitting censored values. + forward_window_days: int = 730 + early_tenure_weeks: int = 4 # --------------------------------------------------------------------------- @@ -190,7 +195,11 @@ def simulate_lifecycle( acct_latents = population.latent_state.account_latents cust_latents = population.latent_state.customer_latents - result = LifecycleSimulationResult(subscriptions=[]) + result = LifecycleSimulationResult( + subscriptions=[], + forward_window_days=forward_window_days, + early_tenure_weeks=early_tenure_weeks, + ) # Event ID counters are global across customers (population order), so IDs # stay deterministic and dense. counters = {"subscription_event": 0, "health_signal": 0, "invoice": 0} diff --git a/leadforge/schemes/lifecycle/features.py b/leadforge/schemes/lifecycle/features.py index 6bed41e..e889e89 100644 --- a/leadforge/schemes/lifecycle/features.py +++ b/leadforge/schemes/lifecycle/features.py @@ -22,6 +22,13 @@ deliberate pedagogical trap, retained in all modes and exempt from difficulty distortions. +Semantic nulls vs. injected missingness: ``last_nps_score`` and +``weeks_since_last_payment_failure`` are null *by meaning* (no survey response +yet / never failed). Difficulty-tier MCAR missingness stacks on top of these, +so at distorted tiers a null no longer distinguishes "never happened" from +"not recorded" — deliberate (real CRM exports have both kinds of missingness) +and to be documented in the feature dictionary at publication (LTV-M6). + Deliberately absent (vs. design.md §8's draft list): - ``current_plan`` — the engine has no plan-change mechanism; the column diff --git a/leadforge/schemes/lifecycle/hazards.py b/leadforge/schemes/lifecycle/hazards.py index a805863..d3cb599 100644 --- a/leadforge/schemes/lifecycle/hazards.py +++ b/leadforge/schemes/lifecycle/hazards.py @@ -58,6 +58,7 @@ "churn_probability", "expansion_probability", "is_renewal_week", + "next_renewal_week", "payment_failure_probability", ] @@ -139,6 +140,29 @@ def is_renewal_week(week_of_tenure: int, contract_term_months: int) -> bool: return k >= 1 and round(k * term_weeks) == week_of_tenure +def next_renewal_week(week_of_tenure: int, contract_term_months: int) -> int: + """Return the first contract-anniversary week strictly after *week_of_tenure*. + + Single source of the anniversary boundary for downstream consumers (the + snapshot builder's ``weeks_to_next_renewal`` feature): the returned week + always satisfies :func:`is_renewal_week`, so the published feature and the + hazard spike can never drift apart. + + Raises: + ValueError: if *week_of_tenure* is negative or *contract_term_months* + is not a positive integer. + """ + if week_of_tenure < 0: + raise ValueError(f"week_of_tenure must be >= 0, got {week_of_tenure}") + if contract_term_months < 1: + raise ValueError(f"contract_term_months must be >= 1, got {contract_term_months}") + term_weeks = contract_term_months * _WEEKS_PER_MONTH + k = max(1, int(week_of_tenure / term_weeks)) + while round(k * term_weeks) <= week_of_tenure: + k += 1 + return round(k * term_weeks) + + def churn_probability( params: ChurnHazardParams, latents: Mapping[str, float], diff --git a/leadforge/schemes/lifecycle/snapshots.py b/leadforge/schemes/lifecycle/snapshots.py index 6041c6d..d20421f 100644 --- a/leadforge/schemes/lifecycle/snapshots.py +++ b/leadforge/schemes/lifecycle/snapshots.py @@ -34,14 +34,17 @@ from __future__ import annotations +from collections.abc import Mapping from datetime import date, timedelta -from typing import TYPE_CHECKING +from types import MappingProxyType +from typing import TYPE_CHECKING, Any import numpy as np import pandas as pd from leadforge.render.distortions import apply_difficulty_distortions from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES +from leadforge.schemes.lifecycle.hazards import next_renewal_week if TYPE_CHECKING: from leadforge.core.models import DifficultyParams @@ -65,8 +68,6 @@ # Invoice terminal statuses that count as collected gross revenue (D7). _REVENUE_STATUSES = frozenset({"paid", "recovered"}) -_WEEKS_PER_MONTH = 52.0 / 12.0 - _SNAPSHOT_COLUMNS = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] _SNAPSHOT_DTYPES = {f.name: f.dtype for f in CUSTOMER_SNAPSHOT_FEATURES} @@ -119,8 +120,23 @@ def build_customer_snapshot( f"{population.observation_date}; forward-window targets would be censored" ) + required_days = max(*FORWARD_WINDOWS_DAYS, CHURN_WINDOW_DAYS) + if sim.forward_window_days < required_days: + raise ValueError( + f"sim was run with forward_window_days={sim.forward_window_days}, which " + f"cannot cover the {required_days}-day target windows; the ltv/churn " + "targets would be silently censored" + ) + accounts = {a.account_id: a for a in population.accounts} subscriptions = {s.customer_id: s for s in sim.subscriptions} + missing = [c.customer_id for c in population.customers if c.customer_id not in subscriptions] + if missing: + raise ValueError( + f"sim result lacks subscriptions for {len(missing)} of " + f"{len(population.customers)} population customers (e.g. {missing[0]}); " + "population/sim mismatch" + ) # Eligibility: started at or before the cutoff, still active at it. eligible = [] @@ -144,8 +160,8 @@ def build_customer_snapshot( for customer, sub, start in eligible: account = accounts[customer.account_id] tenure_weeks = (cutoff - start).days // 7 - ev = events.get(customer.customer_id, _EMPTY_EVENT_AGG) - hl = health.get(customer.customer_id, _EMPTY_HEALTH_AGG) + ev: Mapping[str, Any] = events.get(customer.customer_id, _EMPTY_EVENT_AGG) + hl: Mapping[str, Any] = health.get(customer.customer_id, _EMPTY_HEALTH_AGG) rv = revenue.get(customer.customer_id, {}) current_mrr = customer.initial_mrr + ev["mrr_delta"] @@ -166,8 +182,8 @@ def build_customer_snapshot( "renewal_count": ev["renewal_count"], "expansion_count": ev["expansion_count"], "contract_term_months": customer.contract_term_months, - "weeks_to_next_renewal": _weeks_to_next_renewal( - tenure_weeks, customer.contract_term_months + "weeks_to_next_renewal": ( + next_renewal_week(tenure_weeks, customer.contract_term_months) - tenure_weeks ), "avg_active_users_l12w": hl["avg_active_users"], "active_user_trend_l12w": hl["trend"], @@ -212,21 +228,28 @@ def build_customer_snapshot( # Per-table aggregation helpers # --------------------------------------------------------------------------- -_EMPTY_EVENT_AGG: dict = { - "mrr_delta": 0, - "renewal_count": 0, - "expansion_count": 0, - "payment_failure_count": 0, - "last_failure_date": None, -} - -_EMPTY_HEALTH_AGG: dict = { - "avg_active_users": None, - "trend": None, - "avg_depth": None, - "tickets": 0, - "last_nps": None, -} +# Frozen (MappingProxyType): these are handed out as shared fallbacks for +# customers with no events/signals — mutating one would corrupt every +# subsequent row, so accidental writes must raise. +_EMPTY_EVENT_AGG: Mapping[str, Any] = MappingProxyType( + { + "mrr_delta": 0, + "renewal_count": 0, + "expansion_count": 0, + "payment_failure_count": 0, + "last_failure_date": None, + } +) + +_EMPTY_HEALTH_AGG: Mapping[str, Any] = MappingProxyType( + { + "avg_active_users": None, + "trend": None, + "avg_depth": None, + "tickets": 0, + "last_nps": None, + } +) def _event_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict]: @@ -320,20 +343,6 @@ def _forward_revenue(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, return out -def _weeks_to_next_renewal(tenure_weeks: int, contract_term_months: int) -> int: - """Weeks from the cutoff to the next contract anniversary. - - Anniversaries fall at ``round(k · term · 52/12)`` weeks — the same - boundary :func:`~leadforge.schemes.lifecycle.hazards.is_renewal_week` - uses, so the feature and the hazard spike agree exactly. - """ - term_weeks = contract_term_months * _WEEKS_PER_MONTH - k = 1 - while round(k * term_weeks) <= tenure_weeks: - k += 1 - return round(k * term_weeks) - tenure_weeks - - def _empty_snapshot() -> pd.DataFrame: df = pd.DataFrame({name: pd.Series(dtype=_SNAPSHOT_DTYPES[name]) for name in _SNAPSHOT_COLUMNS}) return df[_SNAPSHOT_COLUMNS] diff --git a/tests/schemes/lifecycle/test_engine.py b/tests/schemes/lifecycle/test_engine.py index 4d8c6ef..552bf53 100644 --- a/tests/schemes/lifecycle/test_engine.py +++ b/tests/schemes/lifecycle/test_engine.py @@ -53,6 +53,14 @@ def test_rejects_population_without_motif(population) -> None: simulate_lifecycle(broken, _SIM_SEED) +def test_result_records_simulation_horizon(population, sim) -> None: + # Downstream builders verify target-window coverage off these fields. + assert sim.forward_window_days == 730 + assert sim.early_tenure_weeks == 4 + custom = simulate_lifecycle(population, _SIM_SEED, forward_window_days=90) + assert custom.forward_window_days == 90 + + def test_rejects_bad_windows(population) -> None: with pytest.raises(ValueError, match="forward_window_days"): simulate_lifecycle(population, _SIM_SEED, forward_window_days=0) diff --git a/tests/schemes/lifecycle/test_hazards.py b/tests/schemes/lifecycle/test_hazards.py index dd53f4e..4a6da97 100644 --- a/tests/schemes/lifecycle/test_hazards.py +++ b/tests/schemes/lifecycle/test_hazards.py @@ -9,6 +9,7 @@ churn_probability, expansion_probability, is_renewal_week, + next_renewal_week, payment_failure_probability, ) from leadforge.schemes.lifecycle.mechanisms import ( @@ -93,6 +94,45 @@ def test_renewal_week_rejects_bad_term() -> None: is_renewal_week(10, 0) +# --------------------------------------------------------------------------- +# next_renewal_week +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("week", "term", "expected"), + [ + (0, 12, 52), + (51, 12, 52), + (52, 12, 104), + (0, 24, 104), + (104, 24, 208), + (0, 13, 56), + (56, 13, 113), + ], +) +def test_next_renewal_week_known_anniversaries(week: int, term: int, expected: int) -> None: + assert next_renewal_week(week, term) == expected + + +@pytest.mark.parametrize("term", [12, 13, 24]) +def test_next_renewal_week_agrees_with_is_renewal_week(term: int) -> None: + """The returned week is the FIRST week after the input that satisfies + is_renewal_week — the two functions share one anniversary boundary.""" + for week in range(0, 160): + nxt = next_renewal_week(week, term) + assert nxt > week + assert is_renewal_week(nxt, term) + assert not any(is_renewal_week(w, term) for w in range(week + 1, nxt)) + + +def test_next_renewal_week_rejects_bad_inputs() -> None: + with pytest.raises(ValueError, match="week_of_tenure"): + next_renewal_week(-1, 12) + with pytest.raises(ValueError, match="contract_term_months"): + next_renewal_week(10, 0) + + # --------------------------------------------------------------------------- # churn_probability # --------------------------------------------------------------------------- diff --git a/tests/schemes/lifecycle/test_snapshots.py b/tests/schemes/lifecycle/test_snapshots.py index 450d8df..648a713 100644 --- a/tests/schemes/lifecycle/test_snapshots.py +++ b/tests/schemes/lifecycle/test_snapshots.py @@ -118,6 +118,22 @@ def test_rejects_population_without_observation_date(population, sim) -> None: build_customer_snapshot(broken, sim) +def test_rejects_sim_with_short_forward_window(population, sim) -> None: + """Regression: a sim run with a shorter horizon than the target windows + must be rejected, not silently produce censored ltv_revenue_* values.""" + short = replace(sim, forward_window_days=365) + with pytest.raises(ValueError, match="forward_window_days"): + build_customer_snapshot(population, short) + + +def test_rejects_population_sim_mismatch(population, sim) -> None: + """Regression: a sim missing subscriptions for population customers must + fail with a diagnostic, not a bare KeyError mid-build.""" + mismatched = replace(sim, subscriptions=sim.subscriptions[1:]) + with pytest.raises(ValueError, match="population/sim mismatch"): + build_customer_snapshot(population, mismatched) + + # --------------------------------------------------------------------------- # Snapshot safety: features must not see past the cutoff # --------------------------------------------------------------------------- From 93c720d327f6f85d99133883e82a4f1b11e50ed6 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 14:31:33 +0300 Subject: [PATCH 3/3] fix(render): exclude targets/exempt cols from distortion clamps [LTV-Pl] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address the two Copilot review threads on #119: 1. APPLIED — the post-noise non-negative clamp lists filtered only on dtype + non_negative, so target columns (and any future non-negative exempt trap) were clip(lower=0)-ed. Behaviorally a no-op today (those columns never receive noise, and non_negative raw values are >= 0 by definition), but the "targets are never distorted" contract must hold by construction, not coincidence. Clamp lists now exclude is_target and exempt_cols exactly like the distortion-eligible lists. Verified byte-identical for lead-scoring (sha256 196bc45f… unchanged). New tests prove a deliberately out-of-contract negative value in a target / exempt column survives distortion untouched. 2. DECLINED with rationale — pd.NA into nullable Int64 instead of the Float64 conversion under missingness: the current behavior is byte-identity-locked with the shipped lead-scoring bundles (v4/v6/v7 and their validation scripts assume Float64). Documented as a Known wart in the module docstring; an opt-in dtype-preserving mode for the lifecycle scheme (no shipped bundles yet) is recorded in the roadmap as deferred to LTV-Pn, where the bundle writer fixes the lifecycle parquet schemas. Full suite 1771 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Fable 5 --- docs/ltv/roadmap.md | 6 +++ leadforge/render/distortions.py | 18 ++++++++- tests/render/test_distortions.py | 63 ++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 tests/render/test_distortions.py diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index df54faa..92f6b2e 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -225,6 +225,12 @@ Total: ~19 PRs across 9 milestones. - **Deferred to `LTV-Pn` (difficulty wiring):** the design.md §7 secondary advanced-tier trap `last_health_signal_post_obs` — it is tier-conditional, so it belongs with the difficulty-profile plumbing, not the builder. + - **Deferred to `LTV-Pn` (bundle writer):** an opt-in dtype-preserving + missingness mode for `render/distortions.py` (`pd.NA` into nullable + `Int64` instead of the Float64 conversion) — the lead-scoring default is + byte-identity-locked, but the lifecycle scheme has no shipped bundles yet + and can pick the cleaner semantics when its parquet schemas are fixed + (Copilot review suggestion on #119). - Labels: `type: feature`, `layer: render` - [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. Reuse the snapshot builder with a per-customer relative cutoff diff --git a/leadforge/render/distortions.py b/leadforge/render/distortions.py index 390f9e9..0a5d6db 100644 --- a/leadforge/render/distortions.py +++ b/leadforge/render/distortions.py @@ -83,11 +83,25 @@ def apply_difficulty_distortions( ] # Post-noise physical-range clamps, derived from FeatureSpec.non_negative # so the lists stay in sync automatically when features are added/renamed. + # Targets and exempt columns are excluded like the distortion lists above: + # they never receive noise, so clamping them was always a no-op — but the + # "targets are never distorted" contract should hold by construction, not + # by coincidence. nonneg_float_cols = frozenset( - f.name for f in feature_specs if f.dtype in _FLOAT_DTYPES and f.non_negative + f.name + for f in feature_specs + if f.dtype in _FLOAT_DTYPES + and f.non_negative + and not f.is_target + and f.name not in exempt_cols ) nonneg_int_cols = frozenset( - f.name for f in feature_specs if f.dtype in ("Int64", "int64") and f.non_negative + f.name + for f in feature_specs + if f.dtype in ("Int64", "int64") + and f.non_negative + and not f.is_target + and f.name not in exempt_cols ) df = df.copy() diff --git a/tests/render/test_distortions.py b/tests/render/test_distortions.py new file mode 100644 index 0000000..aa430c6 --- /dev/null +++ b/tests/render/test_distortions.py @@ -0,0 +1,63 @@ +"""Tests for the shared difficulty-distortion helper (render/distortions.py).""" + +import pandas as pd + +from leadforge.core.models import DifficultyParams +from leadforge.render.distortions import apply_difficulty_distortions +from leadforge.schema.features import FeatureSpec + +_SPECS = [ + FeatureSpec(name="feat_f", dtype="Float64", description="", category="x", non_negative=True), + FeatureSpec(name="trap", dtype="Float64", description="", category="x", non_negative=True), + FeatureSpec( + name="target", + dtype="Float64", + description="", + category="target", + is_target=True, + non_negative=True, + ), +] + +_PARAMS = DifficultyParams( + signal_strength=1.0, + noise_scale=1.0, + missing_rate=0.0, + outlier_rate=0.0, + conversion_rate_lo=0.02, + conversion_rate_hi=0.4, + committee_friction=0.5, +) + + +def _frame() -> pd.DataFrame: + # Deliberately out-of-contract negatives in the target and trap columns: + # the helper must not "repair" columns it is forbidden from touching. + return pd.DataFrame( + { + "feat_f": pd.array([1.0, 2.0, 3.0, 4.0], dtype="Float64"), + "trap": pd.array([-5.0, 1.0, 2.0, 3.0], dtype="Float64"), + "target": pd.array([-7.0, 0.0, 10.0, 20.0], dtype="Float64"), + } + ) + + +def test_targets_never_clamped_or_distorted() -> None: + """Regression (Copilot review on #119): the non-negative clamp lists must + exclude targets — 'targets are never distorted' has to hold by + construction, including the clip step.""" + out = apply_difficulty_distortions(_frame(), _PARAMS, seed=3, feature_specs=_SPECS) + pd.testing.assert_series_equal(out["target"], _frame()["target"]) + + +def test_exempt_columns_never_clamped_or_distorted() -> None: + out = apply_difficulty_distortions( + _frame(), _PARAMS, seed=3, feature_specs=_SPECS, exempt_cols=frozenset({"trap"}) + ) + pd.testing.assert_series_equal(out["trap"], _frame()["trap"]) + + +def test_nonneg_feature_columns_still_clamped() -> None: + out = apply_difficulty_distortions(_frame(), _PARAMS, seed=3, feature_specs=_SPECS) + assert (out["feat_f"].dropna() >= 0).all() + assert not out["feat_f"].equals(_frame()["feat_f"]) # noise actually applied