From da96887b4a4981cdcce2a43c5cd6e62dfc900010 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 18:01:34 +0300 Subject: [PATCH 1/2] feat(lifecycle): early-pLTV (tenure-anchored) snapshot [LTV-Pm] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the second observation regime (design.md §3.1 / D8): a tenure-anchored snapshot that observes every customer at a fixed short tenure (customer_start + early_tenure_weeks) — the genuine cold-start case for acquisition-time value prediction (Voyantis framing). - build_early_pltv_snapshot(population, sim, *, early_tenure_weeks=4, …) in schemes/lifecycle/snapshots.py. - Unify both regimes on one per-customer-cutoff core. The calendar and early builders now feed a shared _assemble_snapshot() driven by a customer_id -> cutoff map; the three aggregation helpers take that map instead of a single date. Feature derivations, the mrr_change_full_period trap, target attribution, and difficulty distortions are defined exactly once. The calendar regime's output is unchanged — all LTV-Pl tests pass as-is, and the lead-scoring distorted-snapshot hash is still byte-identical (196bc45f…). Semantics: - Eligibility = survival to the anchor: drops onboarding churners (churned at or before start+anchor), keeps late starters and customers who churn after the anchor. The cohort therefore differs from the calendar regime's. - Forward windows are fully simulated relative to each customer's OWN start (engine D6 runs through max(obs, start+et)+fwd), so the anchor may legitimately fall after observation_date — the builder does not require cutoff <= obs (unlike the calendar regime). - Coverage guards: early_tenure_weeks must be >= 1 and <= the sim's recorded early_tenure_weeks (else per-customer forward windows would be censored), on top of the shared forward-window / population-mismatch / observation-date checks. Known property: tenure_weeks is constant (= early_tenure_weeks) across the early table — the defining property of the regime, not a feature. The published-bundle no-zero-variance check must exempt it for this task family (noted for the validation harness, LTV-Pp). Tests (19): tenure constant at anchor; eligibility = survival to anchor; onboarding churners excluded; cohort difference vs calendar (post-anchor, pre-obs churners); per-customer censoring leakage probe (delete each customer's post-anchor events, features unchanged); targets recomputed off the per-customer cutoff vs the invoice table; cold-start sparsity (NPS all-null at 4w; health aggregates over pre-anchor signals only); anchor + horizon + mismatch + missing-obs validation; distortions leave targets and trap intact. Scope note: the actual early-pLTV *task directory* + split export (render/tasks.py) folds into LTV-Pn with the bundle/task writer, matching how LTV-Pl deferred the calendar task-split writer. This PR delivers the snapshot builder + recomputed targets. Full suite 1790 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Opus 4.8 --- .agent-plan.md | 10 +- docs/ltv/roadmap.md | 33 ++- leadforge/schemes/lifecycle/snapshots.py | 262 +++++++++++++++----- tests/schemes/lifecycle/test_early_pltv.py | 270 +++++++++++++++++++++ 4 files changed, 506 insertions(+), 69 deletions(-) create mode 100644 tests/schemes/lifecycle/test_early_pltv.py diff --git a/.agent-plan.md b/.agent-plan.md index 7abbe63..46dbc8e 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -64,8 +64,14 @@ merged (#118) — **LTV-M4 complete**. **LTV-M5**: `LTV-Pl` `CUSTOMER_SNAPSHOT_FEATURES` with the three `ltv_revenue_{90,365,730}d` targets, `churned_within_180d`, and the `mrr_change_full_period` trap; difficulty distortions extracted to scheme-agnostic `render/distortions.py`, -lead-scoring byte-identical; 39 tests) opened as **#119**. Next: `LTV-Pm` -(early-pLTV tenure-anchored task family). +lead-scoring byte-identical) merged (#119). `LTV-Pm` (early-pLTV +tenure-anchored snapshot — `build_early_pltv_snapshot()` with a per-customer +relative cutoff at `customer_start + early_tenure_weeks`; calendar + early +builders unified on one per-customer-cutoff core; 19 tests) opened as +**#120** — **LTV-M5 complete** (both observation regimes). Next: `LTV-M6` +(`LTV-Pn` — register LifecycleScheme + recipe + manifest/schema-v6, fold in +the deferred task-split writer for both regimes + the carried layering +cleanups). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index 92f6b2e..b834048 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -45,7 +45,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) | | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | | `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | -| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl) | +| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) | | `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | | `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | @@ -232,12 +232,31 @@ Total: ~19 PRs across 9 milestones. and can pick the cleaner semantics when its parquet schemas are fixed (Copilot review suggestion on #119). - Labels: `type: feature`, `layer: render` -- [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. - Reuse the snapshot builder with a per-customer relative cutoff - (`customer_start + early_tenure_weeks`) to emit the cold-start snapshot + - recomputed targets (D8); separate task directory. - - Tests: per-customer cutoff correctness, short-tenure sparsity, target parity, - no post-cutoff leakage. +- [x] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) snapshot` + (**PR #120**). `build_early_pltv_snapshot(early_tenure_weeks=…)` in + `schemes/lifecycle/snapshots.py`: per-customer relative cutoff at + `customer_start + early_tenure_weeks` (D8). The calendar and early builders + now share one per-customer-cutoff core (`_assemble_snapshot` + cutoff-map + aggregation helpers), so feature derivations, the trap, target attribution, + and distortions are defined once; the calendar regime's output is unchanged + (LTV-Pl tests pass as-is). Eligibility = survival to the anchor (drops + onboarding churners, keeps late starters / post-anchor churners); forward + windows are fully simulated relative to each customer's own start, so the + anchor may legitimately land after `observation_date`. + - Tests (19): tenure constant at the anchor; eligibility = survival to + anchor; cohort difference vs calendar (post-anchor pre-obs churners); + per-customer censoring leakage probe; targets recomputed per-customer + cutoff vs the invoice table; cold-start sparsity (NPS all-null at 4w); + anchor-validation (`>= 1`, `<= sim.early_tenure_weeks`), short-window / + mismatch / missing-obs guards; distortions leave targets + trap intact. + - **Known property (deferred to `LTV-Pp` validation):** `tenure_weeks` is + constant (= `early_tenure_weeks`) across the early table by design — the + published-bundle no-zero-variance check must exempt it for this task + family. + - **Deferred to `LTV-Pn` (bundle/task writer):** the actual early-pLTV + *task directory* + train/valid/test split export (`render/tasks.py`, + design.md §536) — this PR delivers the snapshot + recomputed targets only, + matching how `LTV-Pl` deferred the calendar task-split writer. - Labels: `type: feature`, `layer: render` --- diff --git a/leadforge/schemes/lifecycle/snapshots.py b/leadforge/schemes/lifecycle/snapshots.py index d20421f..c1d1521 100644 --- a/leadforge/schemes/lifecycle/snapshots.py +++ b/leadforge/schemes/lifecycle/snapshots.py @@ -1,26 +1,39 @@ -"""Customer snapshot builder — flatten the lifecycle simulation into an -ML-ready pLTV table. - -:func:`build_customer_snapshot` produces one row per customer **active at the -cutoff**, containing the features defined in -:data:`~leadforge.schemes.lifecycle.features.CUSTOMER_SNAPSHOT_FEATURES`. +"""Customer snapshot builders — flatten the lifecycle simulation into ML-ready +pLTV tables, one per observation regime (design.md §3.1). + +Two public entry points, both producing the same +:data:`~leadforge.schemes.lifecycle.features.CUSTOMER_SNAPSHOT_FEATURES` +columns from the same simulated world, differing only in the **cutoff** each +customer is anchored at: + +- :func:`build_customer_snapshot` — **calendar-anchored** (standard) regime: a + single absolute ``cutoff`` (the world ``observation_date``) shared by every + customer. Tenure at cutoff varies from cold to mature. +- :func:`build_early_pltv_snapshot` — **tenure-anchored** (early-pLTV) regime + (D8): a per-customer relative cutoff at + ``customer_start + early_tenure_weeks``. Every row is observed at the same + short tenure — the genuine cold-start case (only a few weeks of health + signal exist at the cutoff). + +Both delegate to one per-customer-cutoff core (:func:`_assemble_snapshot`), so +feature derivations, the leakage trap, target attribution, and difficulty +distortions are defined exactly once. Snapshot-safety contract (design.md §5): every feature column is computed -exclusively from events at or before the cutoff — with one deliberate +exclusively from events at or before that row's cutoff — with one deliberate exception, the ``mrr_change_full_period`` leakage trap (design.md §7), which reads the end-of-simulation MRR. The targets (``ltv_revenue_{90,365,730}d``, ``churned_within_180d``) are forward-window aggregates by construction and are never published as features. -Cutoff semantics ----------------- -The calendar-anchored regime (this PR, LTV-Pl) snapshots every customer at the -shared absolute ``observation_date``. The tenure-anchored early-pLTV regime -(LTV-Pm) will reuse the same per-customer machinery with a relative cutoff. -The cutoff must not exceed the population's ``observation_date``: the engine -only guarantees full forward-window simulation up to -``observation_date + forward_window_days`` (D6), so a later cutoff would -silently censor the targets. +Cutoff coverage +--------------- +Forward-window targets are only meaningful if the simulation ran long enough to +fill them. The engine (D6) simulates each customer through +``max(observation_date, start + early_tenure_weeks) + forward_window_days`` and +records that horizon on the result; both builders refuse to run unless the +recorded horizon covers the 730d/180d target windows, rather than silently +emitting censored targets. Revenue attribution (D7) ------------------------ @@ -49,13 +62,19 @@ if TYPE_CHECKING: from leadforge.core.models import DifficultyParams from leadforge.schemes.lifecycle.engine import LifecycleSimulationResult + from leadforge.schemes.lifecycle.entities import ( + CustomerLifecycleRow, + SubscriptionLifecycleRow, + ) from leadforge.schemes.lifecycle.population import CustomerPopulationResult __all__ = [ "CHURN_WINDOW_DAYS", + "DEFAULT_EARLY_TENURE_WEEKS", "FORWARD_WINDOWS_DAYS", "HEALTH_WINDOW_WEEKS", "build_customer_snapshot", + "build_early_pltv_snapshot", ] # pLTV forward windows (D6) and the secondary churn-label window (D9). @@ -65,6 +84,9 @@ # Look-back window for the health aggregates (*_l12w columns). HEALTH_WINDOW_WEEKS = 12 +# Default tenure anchor for the early-pLTV regime (design.md §3.1: "e.g. 4w"). +DEFAULT_EARLY_TENURE_WEEKS = 4 + # Invoice terminal statuses that count as collected gross revenue (D7). _REVENUE_STATUSES = frozenset({"paid", "recovered"}) @@ -75,6 +97,14 @@ # total_touches_all trap): noise/missingness on it would muddy the lesson. _DISTORTION_EXEMPT_COLS: frozenset[str] = frozenset({"mrr_change_full_period"}) +# One eligible customer plus the cutoff its row is anchored at. +_Eligible = tuple["CustomerLifecycleRow", "SubscriptionLifecycleRow", date] + + +# --------------------------------------------------------------------------- +# Public entry points +# --------------------------------------------------------------------------- + def build_customer_snapshot( population: CustomerPopulationResult, @@ -84,7 +114,9 @@ def build_customer_snapshot( difficulty_params: DifficultyParams | None = None, seed: int = 42, ) -> pd.DataFrame: - """Build the calendar-anchored customer snapshot table. + """Build the **calendar-anchored** customer snapshot table. + + Every customer is anchored at the same absolute ``cutoff``. Args: population: Output of @@ -102,16 +134,14 @@ def build_customer_snapshot( Returns: One row per customer active at the cutoff (started at or before it, - not yet churned), with columns in catalog order. Customers who - started after the cutoff or churned at/before it are excluded. + not yet churned), with columns in catalog order. Raises: - ValueError: if the population lacks an ``observation_date`` or the - cutoff exceeds it. + ValueError: if the population lacks an ``observation_date``, the cutoff + exceeds it, the sim horizon cannot cover the target windows, or the + population and sim do not match. """ - if not population.observation_date: - raise ValueError("population.observation_date is not set") - obs_date = date.fromisoformat(population.observation_date) + obs_date, accounts, subscriptions = _validate_inputs(population, sim) if cutoff is None: cutoff = obs_date elif cutoff > obs_date: @@ -120,6 +150,109 @@ def build_customer_snapshot( f"{population.observation_date}; forward-window targets would be censored" ) + eligible: list[_Eligible] = [] + for customer in population.customers: + start = date.fromisoformat(customer.customer_start_at) + if start > cutoff: + continue + sub = subscriptions[customer.customer_id] + if sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= cutoff: + continue + eligible.append((customer, sub, start)) + + cutoffs = {customer.customer_id: cutoff for customer, _, _ in eligible} + return _assemble_snapshot(sim, accounts, eligible, cutoffs, difficulty_params, seed) + + +def build_early_pltv_snapshot( + population: CustomerPopulationResult, + sim: LifecycleSimulationResult, + *, + early_tenure_weeks: int = DEFAULT_EARLY_TENURE_WEEKS, + difficulty_params: DifficultyParams | None = None, + seed: int = 42, +) -> pd.DataFrame: + """Build the **tenure-anchored** early-pLTV snapshot table (D8). + + Each customer is anchored at ``customer_start + early_tenure_weeks`` — a + per-customer relative cutoff — so every row is observed at the same fixed, + short tenure. This is the cold-start regime: only a few weeks of health + signal exist at the cutoff, and ``last_nps_score`` is null for the whole + cohort when ``early_tenure_weeks`` precedes the first quarterly survey. + + Because the cutoff is constant *in tenure*, ``tenure_weeks`` is constant + across the whole table (= ``early_tenure_weeks``). That is the defining + property of the regime, not a feature — the published-bundle + no-zero-variance check must exempt ``tenure_weeks`` for this task family + (handled in the validation harness, LTV-Pp). + + Eligibility does **not** require the cutoff to fall on or before + ``observation_date``: each customer's forward windows are fully simulated + relative to its own start (the engine runs through + ``max(obs, start + early_tenure_weeks) + forward_window_days``), so a + late-starting customer whose tenure cutoff lands after ``observation_date`` + still has complete targets. The cohort therefore differs from the + calendar regime's (it drops onboarding churners but keeps late starters). + + Args: + population: Customer population. + sim: Simulation result for the same population. + early_tenure_weeks: Tenure (whole weeks) at which every customer is + observed. Must not exceed the sim's recorded ``early_tenure_weeks`` + (otherwise the per-customer forward windows are not fully covered). + difficulty_params: Optional difficulty knobs (see + :func:`build_customer_snapshot`). + seed: Seed for the distortion RNG substream. + + Returns: + One row per customer that survived to ``early_tenure_weeks`` of tenure. + + Raises: + ValueError: on the same input problems as + :func:`build_customer_snapshot`, plus a non-positive + ``early_tenure_weeks`` or one exceeding the sim's recorded anchor. + """ + if early_tenure_weeks < 1: + raise ValueError(f"early_tenure_weeks must be >= 1, got {early_tenure_weeks}") + _obs_date, accounts, subscriptions = _validate_inputs(population, sim) + if early_tenure_weeks > sim.early_tenure_weeks: + raise ValueError( + f"early_tenure_weeks={early_tenure_weeks} exceeds the sim's recorded " + f"early_tenure_weeks={sim.early_tenure_weeks}; the per-customer forward " + "windows would be censored" + ) + + eligible: list[_Eligible] = [] + cutoffs: dict[str, date] = {} + for customer in population.customers: + start = date.fromisoformat(customer.customer_start_at) + cutoff = start + timedelta(weeks=early_tenure_weeks) + sub = subscriptions[customer.customer_id] + if sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= cutoff: + continue + eligible.append((customer, sub, start)) + cutoffs[customer.customer_id] = cutoff + + return _assemble_snapshot(sim, accounts, eligible, cutoffs, difficulty_params, seed) + + +# --------------------------------------------------------------------------- +# Shared assembly (per-customer cutoff) +# --------------------------------------------------------------------------- + + +def _validate_inputs( + population: CustomerPopulationResult, sim: LifecycleSimulationResult +) -> tuple[date, dict[str, Any], dict[str, Any]]: + """Shared precondition checks for both regimes. + + Returns the parsed ``observation_date``, an ``account_id -> AccountRow`` + index, and a ``customer_id -> SubscriptionLifecycleRow`` index. + """ + if not population.observation_date: + raise ValueError("population.observation_date is not set") + obs_date = date.fromisoformat(population.observation_date) + required_days = max(*FORWARD_WINDOWS_DAYS, CHURN_WINDOW_DAYS) if sim.forward_window_days < required_days: raise ValueError( @@ -137,34 +270,34 @@ def build_customer_snapshot( f"{len(population.customers)} population customers (e.g. {missing[0]}); " "population/sim mismatch" ) + return obs_date, accounts, subscriptions - # Eligibility: started at or before the cutoff, still active at it. - eligible = [] - for customer in population.customers: - start = date.fromisoformat(customer.customer_start_at) - if start > cutoff: - continue - sub = subscriptions[customer.customer_id] - if sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= cutoff: - continue - eligible.append((customer, sub, start)) +def _assemble_snapshot( + sim: LifecycleSimulationResult, + accounts: dict[str, Any], + eligible: list[_Eligible], + cutoffs: dict[str, date], + difficulty_params: DifficultyParams | None, + seed: int, +) -> pd.DataFrame: + """Build the snapshot frame from a per-customer ``customer_id -> cutoff`` map.""" if not eligible: return _empty_snapshot() - events = _event_aggregates(sim, cutoff) - health = _health_aggregates(sim, cutoff) - revenue = _forward_revenue(sim, cutoff) + events = _event_aggregates(sim, cutoffs) + health = _health_aggregates(sim, cutoffs) + revenue = _forward_revenue(sim, cutoffs) records: list[dict[str, object]] = [] for customer, sub, start in eligible: + cutoff = cutoffs[customer.customer_id] account = accounts[customer.account_id] tenure_weeks = (cutoff - start).days // 7 ev: Mapping[str, Any] = events.get(customer.customer_id, _EMPTY_EVENT_AGG) hl: Mapping[str, Any] = health.get(customer.customer_id, _EMPTY_HEALTH_AGG) rv = revenue.get(customer.customer_id, {}) - current_mrr = customer.initial_mrr + ev["mrr_delta"] churn_date = date.fromisoformat(sub.churn_at) if sub.churn_at else None records.append( { @@ -177,7 +310,7 @@ def build_customer_snapshot( "tenure_weeks": tenure_weeks, "initial_plan": customer.initial_plan, "initial_mrr": customer.initial_mrr, - "current_mrr": current_mrr, + "current_mrr": customer.initial_mrr + ev["mrr_delta"], "mrr_change_at_snapshot": ev["mrr_delta"], "renewal_count": ev["renewal_count"], "expansion_count": ev["expansion_count"], @@ -225,7 +358,7 @@ def build_customer_snapshot( # --------------------------------------------------------------------------- -# Per-table aggregation helpers +# Per-table aggregation helpers (per-customer cutoff) # --------------------------------------------------------------------------- # Frozen (MappingProxyType): these are handed out as shared fallbacks for @@ -252,13 +385,15 @@ def build_customer_snapshot( ) -def _event_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict]: - """Aggregate subscription events at or before *cutoff*, per customer.""" +def _event_aggregates(sim: LifecycleSimulationResult, cutoffs: dict[str, date]) -> dict[str, dict]: + """Aggregate each customer's subscription events at or before its cutoff.""" + cutoffs_iso = {cid: c.isoformat() for cid, c in cutoffs.items()} out: dict[str, dict] = {} - cutoff_iso = cutoff.isoformat() for event in sim.subscription_events: + cutoff_iso = cutoffs_iso.get(event.customer_id) # ISO dates compare correctly as strings — avoids per-event parsing. - if event.event_timestamp > cutoff_iso: + # A None cutoff means the customer is not eligible (skip entirely). + if cutoff_iso is None or event.event_timestamp > cutoff_iso: continue agg = out.setdefault(event.customer_id, dict(_EMPTY_EVENT_AGG)) if event.event_type == "expansion": @@ -272,36 +407,39 @@ def _event_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, return out -def _health_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict]: +def _health_aggregates(sim: LifecycleSimulationResult, cutoffs: dict[str, date]) -> dict[str, dict]: """Aggregate health signals into the last-12-week window features. ``last_nps_score`` looks back over the customer's whole history (NPS is quarterly — a 12-week window would miss most customers' latest response - purely by phase), while the ``*_l12w`` aggregates use the + purely by phase), while the ``*_l12w`` aggregates use each customer's ``(cutoff - 12w, cutoff]`` window. """ - window_start_iso = (cutoff - timedelta(weeks=HEALTH_WINDOW_WEEKS)).isoformat() - cutoff_iso = cutoff.isoformat() + cutoffs_iso = {cid: c.isoformat() for cid, c in cutoffs.items()} + window_start_iso = { + cid: (c - timedelta(weeks=HEALTH_WINDOW_WEEKS)).isoformat() for cid, c in cutoffs.items() + } users: dict[str, list[tuple[str, int]]] = {} depths: dict[str, list[float]] = {} tickets: dict[str, int] = {} last_nps: dict[str, int] = {} for signal in sim.health_signals: - ts = signal.period_start - if ts > cutoff_iso: + cutoff_iso = cutoffs_iso.get(signal.customer_id) + if cutoff_iso is None or signal.period_start > cutoff_iso: continue if signal.nps_score is not None: # Signals are chronological per customer — last write wins. last_nps[signal.customer_id] = signal.nps_score - if ts <= window_start_iso: + if signal.period_start <= window_start_iso[signal.customer_id]: continue - users.setdefault(signal.customer_id, []).append((ts, signal.active_users)) + users.setdefault(signal.customer_id, []).append((signal.period_start, signal.active_users)) depths.setdefault(signal.customer_id, []).append(signal.feature_depth_score) tickets[signal.customer_id] = tickets.get(signal.customer_id, 0) + signal.support_tickets out: dict[str, dict] = {} for customer_id, points in users.items(): + cutoff = cutoffs[customer_id] weeks = [(date.fromisoformat(ts) - cutoff).days / 7.0 for ts, _ in points] counts = [n for _, n in points] if len(points) >= 2: @@ -315,29 +453,33 @@ def _health_aggregates(sim: LifecycleSimulationResult, cutoff: date) -> dict[str "tickets": tickets[customer_id], "last_nps": last_nps.get(customer_id), } - # Customers with an NPS response but no in-window signals cannot occur - # (an active customer always has signals in the trailing window), but a + # Customers with an NPS response but no in-window signals cannot occur for + # an active customer (it always has a signal in the trailing window), but a # defensive merge keeps last_nps consistent if eligibility ever widens. for customer_id, nps in last_nps.items(): out.setdefault(customer_id, dict(_EMPTY_HEALTH_AGG))["last_nps"] = nps return out -def _forward_revenue(sim: LifecycleSimulationResult, cutoff: date) -> dict[str, dict[int, int]]: +def _forward_revenue( + sim: LifecycleSimulationResult, cutoffs: dict[str, date] +) -> dict[str, dict[int, int]]: """Sum collected gross revenue per customer per forward window (D7).""" - bounds = { - window: (cutoff + timedelta(days=window)).isoformat() for window in FORWARD_WINDOWS_DAYS + cutoffs_iso = {cid: c.isoformat() for cid, c in cutoffs.items()} + bounds_iso = { + cid: {window: (c + timedelta(days=window)).isoformat() for window in FORWARD_WINDOWS_DAYS} + for cid, c in cutoffs.items() } - cutoff_iso = cutoff.isoformat() out: dict[str, dict[int, int]] = {} for invoice in sim.invoices: - if invoice.payment_status not in _REVENUE_STATUSES: + cutoff_iso = cutoffs_iso.get(invoice.customer_id) + if cutoff_iso is None or invoice.payment_status not in _REVENUE_STATUSES: continue ts = invoice.invoice_date if ts <= cutoff_iso: continue sums = out.setdefault(invoice.customer_id, dict.fromkeys(FORWARD_WINDOWS_DAYS, 0)) - for window, bound in bounds.items(): + for window, bound in bounds_iso[invoice.customer_id].items(): if ts <= bound: sums[window] += invoice.amount_usd return out diff --git a/tests/schemes/lifecycle/test_early_pltv.py b/tests/schemes/lifecycle/test_early_pltv.py new file mode 100644 index 0000000..9d31040 --- /dev/null +++ b/tests/schemes/lifecycle/test_early_pltv.py @@ -0,0 +1,270 @@ +"""Tests for the tenure-anchored early-pLTV snapshot builder (LTV-Pm).""" + +from dataclasses import replace +from datetime import date, timedelta + +import pandas as pd +import pytest + +from leadforge.core.models import DifficultyParams +from leadforge.schemes.lifecycle.engine import ( + LifecycleSimulationResult, + simulate_lifecycle, +) +from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES +from leadforge.schemes.lifecycle.population import build_customer_population +from leadforge.schemes.lifecycle.snapshots import ( + DEFAULT_EARLY_TENURE_WEEKS, + FORWARD_WINDOWS_DAYS, + build_customer_snapshot, + build_early_pltv_snapshot, +) + +_POP_SEED = 11 +_SIM_SEED = 99 +_N = 200 +_ET = DEFAULT_EARLY_TENURE_WEEKS # 4 + +_FEATURE_COLS = [ + f.name for f in CUSTOMER_SNAPSHOT_FEATURES if not f.is_target and not f.leakage_risk +] +_TARGET_COLS = [f.name for f in CUSTOMER_SNAPSHOT_FEATURES if f.is_target] + + +@pytest.fixture(scope="module") +def population(): + return build_customer_population(_N, _POP_SEED, motif_family="expansion_led_growth") + + +@pytest.fixture(scope="module") +def sim(population): + return simulate_lifecycle(population, _SIM_SEED) + + +@pytest.fixture(scope="module") +def early(population, sim): + return build_early_pltv_snapshot(population, sim, early_tenure_weeks=_ET) + + +def _cutoff_for(customer) -> date: + return date.fromisoformat(customer.customer_start_at) + timedelta(weeks=_ET) + + +# --------------------------------------------------------------------------- +# Shape + the defining tenure-anchored property +# --------------------------------------------------------------------------- + + +def test_columns_and_dtypes_match_catalog(early) -> None: + assert list(early.columns) == [f.name for f in CUSTOMER_SNAPSHOT_FEATURES] + for f in CUSTOMER_SNAPSHOT_FEATURES: + assert str(early[f.name].dtype) == f.dtype, f.name + + +def test_tenure_is_constant_at_anchor(early) -> None: + # The defining property of the regime: every row observed at the same tenure. + assert set(early["tenure_weeks"].unique()) == {_ET} + + +def test_deterministic(population, sim, early) -> None: + again = build_early_pltv_snapshot(population, sim, early_tenure_weeks=_ET) + pd.testing.assert_frame_equal(early, again) + + +# --------------------------------------------------------------------------- +# Per-customer cutoff correctness + eligibility cohort +# --------------------------------------------------------------------------- + + +def test_eligibility_is_survival_to_anchor(population, sim, early) -> None: + """Included iff the customer did not churn at or before start + anchor.""" + churn = {s.customer_id: s.churn_at for s in sim.subscriptions} + expected = { + c.customer_id + for c in population.customers + if churn[c.customer_id] is None or date.fromisoformat(churn[c.customer_id]) > _cutoff_for(c) + } + assert set(early["customer_id"]) == expected + + +def test_onboarding_churners_excluded(population, sim, early) -> None: + churn = {s.customer_id: s.churn_at for s in sim.subscriptions} + onboarding_churners = { + c.customer_id + for c in population.customers + if churn[c.customer_id] is not None + and date.fromisoformat(churn[c.customer_id]) <= _cutoff_for(c) + } + assert onboarding_churners, "fixture should have some onboarding churn" + assert onboarding_churners.isdisjoint(set(early["customer_id"])) + + +def test_cohort_differs_from_calendar_regime(population, sim, early) -> None: + """The early cohort keeps customers who churned *after* their tenure anchor + but before the calendar observation_date — they are cold-start customers + with a real (often low) forward value, dropped by the calendar regime.""" + cal = build_customer_snapshot(population, sim) + obs = date.fromisoformat(population.observation_date) + churn = {s.customer_id: s.churn_at for s in sim.subscriptions} + early_only_expected = { + c.customer_id + for c in population.customers + if churn[c.customer_id] is not None + and _cutoff_for(c) < date.fromisoformat(churn[c.customer_id]) <= obs + } + early_ids, cal_ids = set(early["customer_id"]), set(cal["customer_id"]) + assert early_only_expected, "fixture should have post-anchor pre-obs churn" + assert early_only_expected == early_ids - cal_ids + + +def test_late_starter_cutoff_may_exceed_observation_date(population, sim, early) -> None: + obs = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + anchored_after_obs = [ + cid for cid in early["customer_id"] if starts[cid] + timedelta(weeks=_ET) > obs + ] + # Valid because each customer's forward windows are fully simulated relative + # to its own start (engine D6), not the calendar anchor — so the builder + # does not require the tenure cutoff to fall on or before observation_date. + assert anchored_after_obs + + +# --------------------------------------------------------------------------- +# Short-tenure sparsity (the cold-start signal) +# --------------------------------------------------------------------------- + + +def test_nps_entirely_null_before_first_survey(early) -> None: + # First quarterly NPS lands at week 13; at a 4-week anchor nobody has one. + assert early["last_nps_score"].isna().all() + + +def test_health_aggregates_use_only_pre_anchor_signals(population, sim, early) -> None: + for _, row in early.head(15).iterrows(): + customer = next(c for c in population.customers if c.customer_id == row["customer_id"]) + cutoff = _cutoff_for(customer) + signals = [ + h + for h in sim.health_signals + if h.customer_id == row["customer_id"] and date.fromisoformat(h.period_start) <= cutoff + ] + assert signals + assert row["avg_active_users_l12w"] == pytest.approx( + sum(h.active_users for h in signals) / len(signals) + ) + + +# --------------------------------------------------------------------------- +# Snapshot safety: features see nothing after each customer's own cutoff +# --------------------------------------------------------------------------- + + +def test_features_identical_under_per_customer_censoring(population, sim, early) -> None: + """Delete every event after each customer's own tenure cutoff and rebuild; + non-target, non-trap features must be unchanged. Any feature that moves + leaks across the (per-customer) anchor.""" + cutoff_iso = {c.customer_id: _cutoff_for(c).isoformat() for c in population.customers} + censored = LifecycleSimulationResult( + subscriptions=sim.subscriptions, + subscription_events=[ + e for e in sim.subscription_events if e.event_timestamp <= cutoff_iso[e.customer_id] + ], + health_signals=[ + h for h in sim.health_signals if h.period_start <= cutoff_iso[h.customer_id] + ], + invoices=[i for i in sim.invoices if i.invoice_date <= cutoff_iso[i.customer_id]], + forward_window_days=sim.forward_window_days, + early_tenure_weeks=sim.early_tenure_weeks, + ) + rebuilt = build_early_pltv_snapshot(population, censored, early_tenure_weeks=_ET) + pd.testing.assert_frame_equal(early[_FEATURE_COLS], rebuilt[_FEATURE_COLS]) + + +# --------------------------------------------------------------------------- +# Targets recomputed off the tenure anchor +# --------------------------------------------------------------------------- + + +def test_ltv_targets_match_invoice_table_per_customer_cutoff(population, sim, early) -> None: + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + for _, row in early.iterrows(): + cutoff = starts[row["customer_id"]] + timedelta(weeks=_ET) + for window in FORWARD_WINDOWS_DAYS: + bound = cutoff + timedelta(days=window) + expected = sum( + i.amount_usd + for i in sim.invoices + if i.customer_id == row["customer_id"] + and i.payment_status in ("paid", "recovered") + and cutoff < date.fromisoformat(i.invoice_date) <= bound + ) + assert row[f"ltv_revenue_{window}d"] == float(expected) + + +def test_ltv_windows_monotone(early) -> None: + assert (early["ltv_revenue_90d"] <= early["ltv_revenue_365d"]).all() + assert (early["ltv_revenue_365d"] <= early["ltv_revenue_730d"]).all() + + +def test_targets_are_right_skewed(early) -> None: + for window in FORWARD_WINDOWS_DAYS: + col = early[f"ltv_revenue_{window}d"] + assert (col >= 0).all() + assert col.mean() > col.median() + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def test_rejects_nonpositive_anchor(population, sim) -> None: + with pytest.raises(ValueError, match="early_tenure_weeks must be >= 1"): + build_early_pltv_snapshot(population, sim, early_tenure_weeks=0) + + +def test_rejects_anchor_beyond_simulated_tenure(population, sim) -> None: + with pytest.raises(ValueError, match="exceeds the sim's recorded"): + build_early_pltv_snapshot(population, sim, early_tenure_weeks=sim.early_tenure_weeks + 1) + + +def test_rejects_short_forward_window(population, sim) -> None: + short = replace(sim, forward_window_days=365) + with pytest.raises(ValueError, match="forward_window_days"): + build_early_pltv_snapshot(population, short) + + +def test_rejects_population_sim_mismatch(population, sim) -> None: + mismatched = replace(sim, subscriptions=sim.subscriptions[1:]) + with pytest.raises(ValueError, match="population/sim mismatch"): + build_early_pltv_snapshot(population, mismatched) + + +def test_rejects_missing_observation_date(population, sim) -> None: + broken = replace(population, observation_date="") + with pytest.raises(ValueError, match="observation_date"): + build_early_pltv_snapshot(broken, sim) + + +# --------------------------------------------------------------------------- +# Distortions reuse the shared machinery (targets/trap stay intact) +# --------------------------------------------------------------------------- + + +def test_distortions_leave_targets_and_trap_intact(population, sim, early) -> None: + params = DifficultyParams( + signal_strength=1.0, + noise_scale=0.5, + missing_rate=0.3, + outlier_rate=0.02, + conversion_rate_lo=0.02, + conversion_rate_hi=0.4, + committee_friction=0.5, + ) + distorted = build_early_pltv_snapshot( + population, sim, early_tenure_weeks=_ET, difficulty_params=params, seed=7 + ) + pd.testing.assert_frame_equal(distorted[_TARGET_COLS], early[_TARGET_COLS]) + pd.testing.assert_series_equal( + distorted["mrr_change_full_period"], early["mrr_change_full_period"] + ) From 2f7f08aad28df1e57f9538ee4655365c4caa607f Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 18:17:56 +0300 Subject: [PATCH 2/2] fix(lifecycle): disclose all degenerate early-pLTV columns [LTV-Pm] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Findings from hostile self-review of the early-pLTV snapshot PR. 1. CALENDAR BYTE-IDENTITY: PROVEN, not just claimed. The "calendar output unchanged by the unification refactor" claim rested on derivation tests, which a subtle reordering could pass. Verified the refactored builder produces byte-identical calendar snapshots to main across all 5 motifs x 2 seeds, with and without difficulty distortions. No code change; the 30 LTV-Pl derivation tests remain the permanent guard. 2. INCOMPLETE DISCLOSURE OF DEGENERATE COLUMNS (the real finding). The PR documented only tenure_weeks as constant in the early regime, but at a short anchor MULTIPLE feature columns are dead by construction — confirmed structural (every seed), not seed accidents: - renewal_count: constant 0 for any anchor < 52w (first anniversary wk 52) - last_nps_score: all-null for any anchor < 13w (first survey wk 13) - weeks_since_last_payment_failure: near-degenerate (<=1 distinct value) Shipping a builder while under-documenting that ~3 columns are dead in its primary (4-week) configuration would mislead consumers and the validation harness. Expanded the build_early_pltv_snapshot docstring and the roadmap note to enumerate all of them with the cadence reason, flag the shared-catalog design tension, and hand LTV-Pp the full exemption list / LTV-Pn the drop-or-keep decision. New parametrized test pins the structural set across seeds so reviving any column forces a conscious update. 3. Added an early-regime trap-divergence test: the mrr_change_full_period trap is *more* leaky here than in the calendar regime (at 4 weeks mrr_change_at_snapshot is ~0 for >80% of rows while the trap captures the whole future expansion path) — pinned so the pedagogically central column can't silently stop diverging. Full suite 1794 passed / 51 skipped; ruff + mypy clean; lead-scoring distorted-snapshot hash still byte-identical (196bc45f…). Co-Authored-By: Claude Opus 4.8 --- docs/ltv/roadmap.md | 13 +++++++--- leadforge/schemes/lifecycle/snapshots.py | 26 +++++++++++++++---- tests/schemes/lifecycle/test_early_pltv.py | 29 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index b834048..16db972 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -249,10 +249,15 @@ Total: ~19 PRs across 9 milestones. cutoff vs the invoice table; cold-start sparsity (NPS all-null at 4w); anchor-validation (`>= 1`, `<= sim.early_tenure_weeks`), short-window / mismatch / missing-obs guards; distortions leave targets + trap intact. - - **Known property (deferred to `LTV-Pp` validation):** `tenure_weeks` is - constant (= `early_tenure_weeks`) across the early table by design — the - published-bundle no-zero-variance check must exempt it for this task - family. + - **Known degenerate columns at a short anchor (deferred to `LTV-Pp` + validation):** by cadence math, several catalog columns are structurally + dead in the early table — `tenure_weeks` (constant = anchor), + `renewal_count` (0 for anchor < 52w), `last_nps_score` (all-null for + anchor < 13w), and near-degenerate `weeks_since_last_payment_failure`. + The catalog is shared with the calendar regime by design, so the + no-zero-variance / no-all-null checks must exempt these for the early task + family; whether to drop them from the early feature set instead is open for + `LTV-Pn`. - **Deferred to `LTV-Pn` (bundle/task writer):** the actual early-pLTV *task directory* + train/valid/test split export (`render/tasks.py`, design.md §536) — this PR delivers the snapshot + recomputed targets only, diff --git a/leadforge/schemes/lifecycle/snapshots.py b/leadforge/schemes/lifecycle/snapshots.py index c1d1521..9eb0a43 100644 --- a/leadforge/schemes/lifecycle/snapshots.py +++ b/leadforge/schemes/lifecycle/snapshots.py @@ -180,11 +180,27 @@ def build_early_pltv_snapshot( signal exist at the cutoff, and ``last_nps_score`` is null for the whole cohort when ``early_tenure_weeks`` precedes the first quarterly survey. - Because the cutoff is constant *in tenure*, ``tenure_weeks`` is constant - across the whole table (= ``early_tenure_weeks``). That is the defining - property of the regime, not a feature — the published-bundle - no-zero-variance check must exempt ``tenure_weeks`` for this task family - (handled in the validation harness, LTV-Pp). + Degenerate columns at a short anchor. Several catalog columns are + structurally constant/empty when ``early_tenure_weeks`` is short, because + the events that would vary them have not happened yet (the cadence math, + not the seed, makes them dead): + + - ``tenure_weeks`` — constant ``= early_tenure_weeks`` (the defining + property of the regime, not a feature). + - ``renewal_count`` — constant ``0`` for any anchor ``< 52`` weeks (the + first contract anniversary is at week 52). + - ``last_nps_score`` — entirely null for any anchor ``< 13`` weeks (the + first quarterly survey lands at week 13). + - ``weeks_since_last_payment_failure`` — near-degenerate (at most one + distinct value, often all-null): only the week-0 invoice precedes a + sub-month cutoff, so any failure shares the same recency. + + The catalog is shared with the calendar regime by design (design.md §8), + so these columns are kept rather than dropped; the published-bundle + no-zero-variance / no-all-null checks must **exempt them for this task + family** (handled in the validation harness, LTV-Pp). Whether to instead + drop them from the early task's feature set is an open question for the + bundle/task writer (LTV-Pn). Eligibility does **not** require the cutoff to fall on or before ``observation_date``: each customer's forward windows are fully simulated diff --git a/tests/schemes/lifecycle/test_early_pltv.py b/tests/schemes/lifecycle/test_early_pltv.py index 9d31040..c255bfa 100644 --- a/tests/schemes/lifecycle/test_early_pltv.py +++ b/tests/schemes/lifecycle/test_early_pltv.py @@ -66,6 +66,24 @@ def test_tenure_is_constant_at_anchor(early) -> None: assert set(early["tenure_weeks"].unique()) == {_ET} +@pytest.mark.parametrize("pop_seed", [1, 7, 42]) +def test_structurally_degenerate_columns_at_short_anchor(pop_seed: int) -> None: + """Pin the columns that are dead by *construction* at a sub-13-week anchor, + so the LTV-Pp validation harness has a tracked exemption list and a future + change that revives one of them forces a conscious update here. + + These are cadence consequences, not seed accidents: first renewal at week + 52, first NPS at week 13, tenure fixed at the anchor. + """ + pop = build_customer_population(250, pop_seed, motif_family="payment_fragile") + sim = simulate_lifecycle(pop, pop_seed * 2 + 1) + snap = build_early_pltv_snapshot(pop, sim, early_tenure_weeks=_ET) + + assert snap["tenure_weeks"].nunique(dropna=True) == 1 # constant = anchor + assert set(snap["renewal_count"].unique()) == {0} # first renewal at week 52 + assert snap["last_nps_score"].isna().all() # first NPS at week 13 + + def test_deterministic(population, sim, early) -> None: again = build_early_pltv_snapshot(population, sim, early_tenure_weeks=_ET) pd.testing.assert_frame_equal(early, again) @@ -213,6 +231,17 @@ def test_targets_are_right_skewed(early) -> None: assert col.mean() > col.median() +def test_trap_diverges_strongly_in_early_regime(early) -> None: + """The mrr_change_full_period trap is *more* leaky here than in the calendar + regime: at a 4-week anchor almost no expansion has happened, so the valid + mrr_change_at_snapshot is ~0 while the trap captures the whole future + expansion path that drives the targets.""" + valid_zero = (early["mrr_change_at_snapshot"] == 0).mean() + diverges = (early["mrr_change_full_period"] != early["mrr_change_at_snapshot"]).mean() + assert valid_zero > 0.8 # cold start: little expansion yet + assert diverges > 0.10 + + # --------------------------------------------------------------------------- # Validation # ---------------------------------------------------------------------------