diff --git a/.agent-plan.md b/.agent-plan.md index 384839e..4ed4f76 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -54,10 +54,13 @@ ALL_CONSTRAINTS/LEAD_SNAPSHOT_FEATURES/CONVERTED_WITHIN_90_DAYS moved to consumes bundle files, not internals — no lockstep update needed (heads-up issue #8). Next: `LTV-Pg.2` merged (#112). **LTV-M3**: `LTV-Ph` merged (#113); `LTV-Pi` (mechanism policies) merged (#116) — **LTV-M3 complete**. **LTV-M4** -started: `LTV-Pj` (hazard functions — churn_probability with onboarding -elevation + renewal spike, expansion_probability with health modulation, -payment_failure_probability; pure/deterministic, Cox-style latent multipliers; -40 tests) opened as **#117**. Next: `LTV-Pk` (weekly simulation engine). +`LTV-Pj` (hazard functions) merged (#117). `LTV-Pk` (weekly simulation +engine — simulate_lifecycle() with per-customer RNG substreams, weekly +health/monthly invoice cadences, dunning write-off churn, renewal events, +expansion MRR chains; mechanisms.py base rates ENGINE-CALIBRATED to per-motif +year-1 churn targets, discharging the #117 calibration obligation; 25 tests) +opened as **#118** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` — +calendar-anchored customer snapshot + pLTV targets). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index a68327c..ef4262e 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -44,7 +44,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M1` | Lifecycle schema foundation | `LTV-Pb`, `LTV-Pc` | #104 (Pb) | | `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) | | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | -| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj) | +| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | | | `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | @@ -173,13 +173,13 @@ Total: ~19 PRs across 9 milestones. ## `LTV-M4` — Lifecycle simulation engine -- [ ] **`LTV-Pj`** — `feat(lifecycle): churn / expansion / payment hazards` (**PR #117**). +- [x] **`LTV-Pj`** — `feat(lifecycle): churn / expansion / payment hazards` (**PR #117**). Weibull churn hazard with renewal-date spike, expansion propensity (the heavy-tail generator for pLTV), payment failure + dunning. - Tests: hazard shape over tenure, renewal spike, dunning escalation, expansion MRR-delta bounds. - Labels: `type: feature`, `layer: mechanisms` -- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine`. +- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine` (**PR #118**). `simulate_lifecycle()`: weekly loop per customer through `observation_date + 730d (+ early-regime buffer)` so all three windows are fully simulated (D6); emits `subscription_events`, `health_signals`, `invoices`; updates terminal diff --git a/leadforge/schemes/lifecycle/engine.py b/leadforge/schemes/lifecycle/engine.py new file mode 100644 index 0000000..d581a07 --- /dev/null +++ b/leadforge/schemes/lifecycle/engine.py @@ -0,0 +1,529 @@ +"""Weekly lifecycle simulation engine (D2: weekly steps). + +:func:`simulate_lifecycle` is the single public entry point. It evolves each +customer of a :class:`~leadforge.schemes.lifecycle.population.CustomerPopulationResult` +week by week from their staggered start date, drawing against the pure hazard +functions in :mod:`leadforge.schemes.lifecycle.hazards`, and emits the three +lifecycle event tables (``subscription_events``, ``health_signals``, +``invoices``) plus one terminal-state subscription row per customer. + +Simulation contract +------------------- +- **Fully simulated target windows (D6)** — every customer is simulated through + ``max(observation_date, start + early_tenure_weeks) + forward_window_days``, + so all pLTV forward-window targets (90/365/730d) are complete for **both** + observation regimes (calendar-anchored and tenure-anchored). A customer who + reaches the end of their window still active is *censored for total LTV* but + has complete forward-window revenue. +- **Per-customer RNG substreams** — every customer draws from its own named + substream (``lifecycle_sim::``), so one customer's trajectory is + invariant to the presence, ordering, or behaviour of every other customer. + This is a stronger stability property than the lead-scoring engine's shared + streams, and it makes per-customer regression tests exact. +- **Weekly step order** (fixed for determinism): health signal → invoice / + dunning resolution → churn draw → renewal event → expansion draw. A customer + that churns in a week emits no further events after the churn event. +- **Churn reasons** are causal, not sampled: ``payment_failure`` when a + written-off invoice forces the churn, ``non_renewal`` when the (spiked) churn + draw fires on a contract-anniversary week, ``voluntary`` otherwise. +- **Dangling ``failed`` invoices are censoring, not bugs** — an invoice keeps + terminal status ``failed`` only when the customer churned for another reason + mid-dunning or the simulation window ended before the dunning period elapsed. + Within a week, dunning resolution runs *before* invoice issuance, so a + pending failure always resolves before the next invoice can fail. + +Deliberately out of scope (tracked): +- ``downgrade`` events — no downgrade mechanism params exist in + ``mechanisms.py`` yet; the snapshot's ``downgrade_count`` feature (design.md + §8) would be zero-variance and must be revisited before LTV-M5 ships it. +- Difficulty-tier scaling — applied at the recipe/config layer in LTV-M6; this + engine simulates the motif-calibrated intermediate-tier parameters as-is. +""" + +from __future__ import annotations + +import math +import random +from dataclasses import dataclass, field +from datetime import date, timedelta +from typing import TYPE_CHECKING + +from leadforge.core.ids import ID_PREFIXES, make_id +from leadforge.core.rng import RNGRoot +from leadforge.schemes.lifecycle.entities import ( + HealthSignalRow, + InvoiceRow, + SubscriptionEventRow, + SubscriptionLifecycleRow, +) +from leadforge.schemes.lifecycle.hazards import ( + churn_probability, + expansion_probability, + is_renewal_week, + payment_failure_probability, +) +from leadforge.schemes.lifecycle.mechanisms import assign_lifecycle_mechanisms + +if TYPE_CHECKING: + from leadforge.schemes.lifecycle.entities import CustomerLifecycleRow + from leadforge.schemes.lifecycle.mechanisms import LifecycleMechanismAssignment + from leadforge.schemes.lifecycle.population import CustomerPopulationResult + +__all__ = ["LifecycleSimulationResult", "simulate_lifecycle"] + +# --------------------------------------------------------------------------- +# Internal constants +# --------------------------------------------------------------------------- + +_WEEKS_PER_MONTH = 52.0 / 12.0 + +# NPS surveys go out quarterly; responses land on every 13th week of tenure +# (week 13, 26, …). All other weeks carry a null nps_score. +_NPS_CADENCE_WEEKS = 13 + +# Health-signal generation: weekly active users by plan tier (seat-count +# proxy), modulated by adoption velocity and an onboarding usage ramp. +_ACTIVE_USERS_BASE_BY_PLAN: dict[str, int] = { + "starter": 8, + "growth": 25, + "enterprise": 60, +} +_DEFAULT_ACTIVE_USERS_BASE = 20 + +# Usage ramps up over onboarding with this time-constant (weeks): customers +# reach ~63% of plateau usage by week 6, ~92% by week 15. +_USAGE_RAMP_WEEKS = 6.0 + +# Support-ticket Poisson intensity: lam = base + slope * (1 - product_fit). +_TICKET_LAM_BASE = 0.3 +_TICKET_LAM_SLOPE = 1.2 + + +# --------------------------------------------------------------------------- +# Public output type +# --------------------------------------------------------------------------- + + +@dataclass +class LifecycleSimulationResult: + """Fully simulated lifecycle output, ready for the rendering layer. + + All lists are in insertion order: chronological within each customer, + population order across customers. + """ + + subscriptions: list[SubscriptionLifecycleRow] + subscription_events: list[SubscriptionEventRow] = field(default_factory=list) + health_signals: list[HealthSignalRow] = field(default_factory=list) + invoices: list[InvoiceRow] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Per-customer mutable state +# --------------------------------------------------------------------------- + + +@dataclass +class _CustomerSimState: + current_mrr: int + contract_term_months: int + renewal_count: int = 0 + expansion_count: int = 0 + churned: bool = False + churn_week: int | None = None + churn_reason: str | None = None + # Pending failed invoice: (invoice row, week it failed). + pending_failure: tuple[InvoiceRow, int] | None = None + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def simulate_lifecycle( + population: CustomerPopulationResult, + seed: int, + *, + forward_window_days: int = 730, + early_tenure_weeks: int = 4, +) -> LifecycleSimulationResult: + """Run the weekly lifecycle simulation for every customer in *population*. + + Args: + population: Output of + :func:`~leadforge.schemes.lifecycle.population.build_customer_population`. + Its recorded ``motif_family`` selects the mechanism parameters and + its ``observation_date`` anchors the simulation horizon. + seed: Master RNG seed for the simulation (independent of the + population seed; the same population can be re-simulated). + forward_window_days: Longest pLTV forward-window target (D6). Every + customer is simulated through at least + ``max(observation_date, start + early_tenure_weeks) + + forward_window_days``. + early_tenure_weeks: Tenure-anchored early-pLTV cutoff (D8) — extends + the horizon for late-starting customers so the early regime's + forward windows are also fully simulated. + + Returns: + A :class:`LifecycleSimulationResult` with one subscription row per + customer and the three event tables populated. + + Raises: + ValueError: if *population* lacks an ``observation_date`` or + ``motif_family`` (built by an outdated population builder), or if + window arguments are not positive. + """ + if not population.observation_date: + raise ValueError("population.observation_date is not set") + if not population.motif_family: + raise ValueError("population.motif_family is not set") + if forward_window_days < 1: + raise ValueError(f"forward_window_days must be >= 1, got {forward_window_days}") + if early_tenure_weeks < 0: + raise ValueError(f"early_tenure_weeks must be >= 0, got {early_tenure_weeks}") + + obs_date = date.fromisoformat(population.observation_date) + mechanisms = assign_lifecycle_mechanisms(population.motif_family) + root = RNGRoot(seed) + + acct_latents = population.latent_state.account_latents + cust_latents = population.latent_state.customer_latents + + result = LifecycleSimulationResult(subscriptions=[]) + # Event ID counters are global across customers (population order), so IDs + # stay deterministic and dense. + counters = {"subscription_event": 0, "health_signal": 0, "invoice": 0} + + for idx, customer in enumerate(population.customers, start=1): + latents = _merge_latents( + acct_latents.get(customer.account_id, {}), + cust_latents.get(customer.customer_id, {}), + ) + + rng = root.child(f"lifecycle_sim::{customer.customer_id}") + sub_id = make_id(ID_PREFIXES["subscription"], idx) + start = date.fromisoformat(customer.customer_start_at) + end_date = max(obs_date, start + timedelta(weeks=early_tenure_weeks)) + timedelta( + days=forward_window_days + ) + + state = _simulate_customer( + customer=customer, + subscription_id=sub_id, + latents=latents, + mechanisms=mechanisms, + rng=rng, + start=start, + end_date=end_date, + counters=counters, + result=result, + ) + + churn_at = ( + (start + timedelta(weeks=state.churn_week)).isoformat() + if state.churned and state.churn_week is not None + else None + ) + result.subscriptions.append( + SubscriptionLifecycleRow( + subscription_id=sub_id, + customer_id=customer.customer_id, + plan_name=customer.initial_plan, + subscription_status="churned" if state.churned else "active", + subscription_start_at=customer.customer_start_at, + current_mrr=state.current_mrr, + contract_term_months=state.contract_term_months, + renewal_count=state.renewal_count, + expansion_count=state.expansion_count, + subscription_end_at=churn_at, + churn_at=churn_at, + churn_reason=state.churn_reason, + ) + ) + + return result + + +def _merge_latents( + account_latents: dict[str, float], customer_latents: dict[str, float] +) -> dict[str, float]: + """Merge account- and customer-level latents into one effective trait dict. + + Traits present at **both** levels (``latent_budget_stability``, + ``latent_organizational_stability``) are blended 50/50 rather than letting + one level shadow the other: the account component is a shared random effect + across all customers of the same account, so within-account churn and + payment behaviour are *correlated* — the mixed-effects structure a B2B + dataset should have. Account-only or customer-only traits pass through. + """ + merged = dict(customer_latents) + for trait, account_value in account_latents.items(): + if trait in merged: + merged[trait] = 0.5 * (account_value + merged[trait]) + else: + merged[trait] = account_value + return merged + + +# --------------------------------------------------------------------------- +# Per-customer weekly loop +# --------------------------------------------------------------------------- + + +def _simulate_customer( + *, + customer: CustomerLifecycleRow, + subscription_id: str, + latents: dict[str, float], + mechanisms: LifecycleMechanismAssignment, + rng: random.Random, + start: date, + end_date: date, + counters: dict[str, int], + result: LifecycleSimulationResult, +) -> _CustomerSimState: + """Evolve one customer week by week; append events to *result*.""" + state = _CustomerSimState( + current_mrr=customer.initial_mrr, + contract_term_months=customer.contract_term_months, + ) + churn_p = mechanisms.churn_hazard + expansion_p = mechanisms.expansion_propensity + payment_p = mechanisms.payment_failure + + week = 0 + last_month_index = -1 + while start + timedelta(weeks=week) <= end_date: + week_date = start + timedelta(weeks=week) + + # -- 1. Health signal (weekly) -------------------------------------- + depth = _emit_health_signal( + customer=customer, + latents=latents, + rng=rng, + week=week, + week_date=week_date, + counters=counters, + result=result, + ) + + # -- 2. Dunning resolution, then invoice on month boundary ---------- + # Resolution runs FIRST so a write-off churn cannot be preceded by a + # fresh same-week invoice, and a newly failed invoice can always enter + # dunning (pending slot is free by issuance time). + if state.pending_failure is not None: + pending_invoice, fail_week = state.pending_failure + if week - fail_week >= payment_p.dunning_weeks: + if rng.random() < payment_p.recovery_rate: + pending_invoice.payment_status = "recovered" + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="payment_recovered", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + ) + state.pending_failure = None + else: + pending_invoice.payment_status = "written_off" + _churn(state, week, "payment_failure") + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="churn", + mrr_before=state.current_mrr, + mrr_after=0, + ) + break + + month_index = int(week / _WEEKS_PER_MONTH) + if month_index > last_month_index: + last_month_index = month_index + counters["invoice"] += 1 + failed = rng.random() < payment_failure_probability(payment_p, latents) + invoice = InvoiceRow( + invoice_id=make_id(ID_PREFIXES["invoice"], counters["invoice"]), + customer_id=customer.customer_id, + invoice_date=week_date.isoformat(), + amount_usd=state.current_mrr, + payment_status="failed" if failed else "paid", + ) + result.invoices.append(invoice) + if failed and state.pending_failure is None: + state.pending_failure = (invoice, week) + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="payment_failure", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + ) + + # -- 3. Churn draw --------------------------------------------------- + renewal_week = is_renewal_week(week, state.contract_term_months) + p_churn = churn_probability(churn_p, latents, week, state.contract_term_months) + if rng.random() < p_churn: + _churn(state, week, "non_renewal" if renewal_week else "voluntary") + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="churn", + mrr_before=state.current_mrr, + mrr_after=0, + ) + break + + # -- 4. Renewal event (survived the anniversary) --------------------- + if renewal_week: + state.renewal_count += 1 + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="renewal", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + contract_term_months_new=state.contract_term_months, + ) + + # -- 5. Expansion draw ------------------------------------------------ + if rng.random() < expansion_probability(expansion_p, latents, depth): + lo_frac, hi_frac = expansion_p.expansion_mrr_frac_range + lo = max(1, int(lo_frac * state.current_mrr)) + hi = max(lo, int(hi_frac * state.current_mrr)) + delta = rng.randint(lo, hi) + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="expansion", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr + delta, + ) + state.current_mrr += delta + state.expansion_count += 1 + + week += 1 + + return state + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _churn(state: _CustomerSimState, week: int, reason: str) -> None: + state.churned = True + state.churn_week = week + state.churn_reason = reason + + +def _emit_event( + result: LifecycleSimulationResult, + counters: dict[str, int], + *, + subscription_id: str, + customer_id: str, + week_date: date, + event_type: str, + mrr_before: int, + mrr_after: int, + contract_term_months_new: int | None = None, +) -> None: + counters["subscription_event"] += 1 + result.subscription_events.append( + SubscriptionEventRow( + event_id=make_id(ID_PREFIXES["subscription_event"], counters["subscription_event"]), + subscription_id=subscription_id, + customer_id=customer_id, + event_timestamp=week_date.isoformat(), + event_type=event_type, + mrr_before=mrr_before, + mrr_after=mrr_after, + contract_term_months_new=contract_term_months_new, + ) + ) + + +def _emit_health_signal( + *, + customer: CustomerLifecycleRow, + latents: dict[str, float], + rng: random.Random, + week: int, + week_date: date, + counters: dict[str, int], + result: LifecycleSimulationResult, +) -> float: + """Emit this week's health signal; return the feature-depth score. + + The depth score feeds straight back into the expansion hazard for the same + week, creating the causal link latents → health signals → expansion. + """ + adoption = latents.get("latent_adoption_velocity", 0.5) + fit = latents.get("latent_product_fit", 0.5) + + ramp = 1.0 - math.exp(-(week + 1) / _USAGE_RAMP_WEEKS) + + base_users = _ACTIVE_USERS_BASE_BY_PLAN.get(customer.initial_plan, _DEFAULT_ACTIVE_USERS_BASE) + active_users = max(0, int(base_users * (0.5 + adoption) * ramp * rng.gauss(1.0, 0.10) + 0.5)) + + target_depth = min(1.0, max(0.0, 0.20 + 0.40 * adoption + 0.30 * fit)) + depth = min(1.0, max(0.0, target_depth * ramp + rng.gauss(0.0, 0.04))) + + lam = _TICKET_LAM_BASE + _TICKET_LAM_SLOPE * (1.0 - fit) + tickets = _poisson(rng, lam) + + nps: int | None = None + if week > 0 and week % _NPS_CADENCE_WEEKS == 0: + champion = latents.get("latent_champion_strength", 0.5) + raw = 10.0 * (0.20 + 0.50 * fit + 0.30 * champion) + rng.gauss(0.0, 1.0) + nps = max(0, min(10, round(raw))) + + # The recorded observable and the value fed to the expansion hazard must + # be the SAME number — the data must fully explain the behaviour. + depth = round(depth, 4) + + counters["health_signal"] += 1 + result.health_signals.append( + HealthSignalRow( + signal_id=make_id(ID_PREFIXES["health_signal"], counters["health_signal"]), + customer_id=customer.customer_id, + period_start=week_date.isoformat(), + active_users=active_users, + feature_depth_score=depth, + support_tickets=tickets, + nps_score=nps, + ) + ) + return depth + + +def _poisson(rng: random.Random, lam: float) -> int: + """Knuth Poisson sampler — fine for the small intensities used here.""" + threshold = math.exp(-lam) + k = 0 + p = 1.0 + while True: + p *= rng.random() + if p <= threshold: + return k + k += 1 diff --git a/leadforge/schemes/lifecycle/mechanisms.py b/leadforge/schemes/lifecycle/mechanisms.py index 6b5fa64..04121a9 100644 --- a/leadforge/schemes/lifecycle/mechanisms.py +++ b/leadforge/schemes/lifecycle/mechanisms.py @@ -133,24 +133,23 @@ class LifecycleMechanismAssignment: # Per-motif parameter tables # --------------------------------------------------------------------------- -# Churn hazard base weekly rates. -# IMPORTANT — the per-motif "% annual" figures below are the BASE-RATE-ONLY -# equivalents (1 - (1-r)^52) at neutral latents. The hazard functions in -# hazards.py add material churn mass on top: the onboarding elevation -# contributes ~6.8 x base_rate of extra first-year mass and each renewal spike -# adds (multiplier - 1) x base_rate, so true first-year churn runs roughly -# 5-14 points above these figures (e.g. churner_dominated ~52%, not 37.5%). -# Final calibration against the difficulty-profile bands -# intro [0.10, 0.20] / intermediate [0.20, 0.35] / advanced [0.30, 0.50] -# happens in the engine tests (LTV-Pk), where these base rates are expected to -# be tuned DOWN to land inside the bands once the full tenure shape applies. +# Churn hazard base weekly rates — ENGINE-CALIBRATED (LTV-Pk). +# These are NOT meant to be read as standalone annual equivalents: the full +# tenure shape (onboarding elevation, renewal spikes) and dunning write-off +# churn add mass on top of the base rate. The values below were tuned by +# running simulate_lifecycle() on motif-biased populations (n=600, multiple +# seed pairs) until simulated FIRST-YEAR churn landed at the per-motif targets: +# product_led ~20% | relationship_led ~27% | expansion_led ~15% +# payment_fragile ~33-39% (mostly write-off-driven) | churner_dominated ~42% +# The engine calibration test (tests/schemes/lifecycle/test_engine.py) guards +# these with wide bands; difficulty-tier scaling on top arrives in LTV-M6. _CHURN_BASE_WEEKLY: dict[str, float] = { # Exact annual equivalent: 1 - (1-r)^52. - "product_led_retention": 0.0042, # 19.7% annual - "relationship_led_retention": 0.0055, # 24.9% annual - "expansion_led_growth": 0.0028, # 13.6% annual (lowest — high-fit customers) - "payment_fragile": 0.0060, # 26.9% annual (high base; mostly payment-driven) - "churner_dominated": 0.0090, # 37.5% annual + "product_led_retention": 0.0034, + "relationship_led_retention": 0.0042, + "expansion_led_growth": 0.0026, # lowest — high-fit customers + "payment_fragile": 0.0018, # low base; churn is mostly payment-driven + "churner_dominated": 0.0046, } # Latent-trait weights for the background churn hazard. @@ -263,11 +262,11 @@ class LifecycleMechanismAssignment: # Payment-failure base monthly rates. _PAYMENT_FAILURE_BASE_MONTHLY: dict[str, float] = { - "product_led_retention": 0.015, - "relationship_led_retention": 0.020, - "expansion_led_growth": 0.012, - "payment_fragile": 0.080, # high — financial fragility is the defining trait - "churner_dominated": 0.030, + "product_led_retention": 0.012, + "relationship_led_retention": 0.013, + "expansion_led_growth": 0.010, + "payment_fragile": 0.027, # high — financial fragility is the defining trait + "churner_dominated": 0.012, } # Latent weights for payment failure. @@ -307,8 +306,8 @@ class LifecycleMechanismAssignment: "product_led_retention": 0.70, "relationship_led_retention": 0.65, "expansion_led_growth": 0.75, - "payment_fragile": 0.40, # low — these accounts are genuinely fragile - "churner_dominated": 0.50, + "payment_fragile": 0.64, # lowest — these accounts are genuinely fragile + "churner_dominated": 0.65, } # Fallback values for unknown motif families. diff --git a/leadforge/schemes/lifecycle/population.py b/leadforge/schemes/lifecycle/population.py index 45e8432..ba0fd38 100644 --- a/leadforge/schemes/lifecycle/population.py +++ b/leadforge/schemes/lifecycle/population.py @@ -64,6 +64,10 @@ class CustomerPopulationResult: latent_state: CustomerLatentState # ISO-8601 date at which snapshots and labels are anchored. observation_date: str = "" + # Retention motif family this population was built for. Recorded so the + # simulation engine fetches the *same* family's mechanism params — passing + # the motif separately to the engine would invite silent drift. + motif_family: str = "" # --------------------------------------------------------------------------- @@ -265,6 +269,7 @@ def build_customer_population( customer_latents=cust_latents, ), observation_date=obs_date.isoformat(), + motif_family=motif_family, ) diff --git a/tests/schemes/lifecycle/test_engine.py b/tests/schemes/lifecycle/test_engine.py new file mode 100644 index 0000000..4d8c6ef --- /dev/null +++ b/tests/schemes/lifecycle/test_engine.py @@ -0,0 +1,379 @@ +"""Tests for the weekly lifecycle simulation engine (LTV-Pk).""" + +from datetime import date, timedelta + +import pytest + +from leadforge.schemes.lifecycle.engine import ( + LifecycleSimulationResult, + simulate_lifecycle, +) +from leadforge.schemes.lifecycle.hazards import is_renewal_week +from leadforge.schemes.lifecycle.population import ( + LIFECYCLE_MOTIF_FAMILIES, + build_customer_population, +) + +_POP_SEED = 11 +_SIM_SEED = 99 +_N = 150 + + +@pytest.fixture(scope="module") +def population(): + return build_customer_population(_N, _POP_SEED, motif_family="product_led_retention") + + +@pytest.fixture(scope="module") +def sim(population): + return simulate_lifecycle(population, _SIM_SEED) + + +# --------------------------------------------------------------------------- +# Shape + validation +# --------------------------------------------------------------------------- + + +def test_returns_result_type(sim) -> None: + assert isinstance(sim, LifecycleSimulationResult) + + +def test_one_subscription_per_customer(population, sim) -> None: + assert len(sim.subscriptions) == len(population.customers) + assert {s.customer_id for s in sim.subscriptions} == { + c.customer_id for c in population.customers + } + + +def test_rejects_population_without_motif(population) -> None: + import dataclasses + + broken = dataclasses.replace(population, motif_family="") + with pytest.raises(ValueError, match="motif_family"): + simulate_lifecycle(broken, _SIM_SEED) + + +def test_rejects_bad_windows(population) -> None: + with pytest.raises(ValueError, match="forward_window_days"): + simulate_lifecycle(population, _SIM_SEED, forward_window_days=0) + with pytest.raises(ValueError, match="early_tenure_weeks"): + simulate_lifecycle(population, _SIM_SEED, early_tenure_weeks=-1) + + +# --------------------------------------------------------------------------- +# Determinism + per-customer independence +# --------------------------------------------------------------------------- + + +def test_deterministic_under_same_seeds(population) -> None: + a = simulate_lifecycle(population, _SIM_SEED) + b = simulate_lifecycle(population, _SIM_SEED) + assert [s.to_dict() for s in a.subscriptions] == [s.to_dict() for s in b.subscriptions] + assert [e.to_dict() for e in a.subscription_events] == [ + e.to_dict() for e in b.subscription_events + ] + assert [i.to_dict() for i in a.invoices] == [i.to_dict() for i in b.invoices] + assert [h.to_dict() for h in a.health_signals] == [h.to_dict() for h in b.health_signals] + + +def test_different_sim_seed_changes_outcomes(population) -> None: + a = simulate_lifecycle(population, 1) + b = simulate_lifecycle(population, 2) + assert [s.subscription_status for s in a.subscriptions] != [ + s.subscription_status for s in b.subscriptions + ] + + +def test_per_customer_trajectories_independent_of_other_customers(population) -> None: + """Per-customer RNG substreams: a customer's trajectory is invariant to the + rest of the population (same customer_id + latents + seed → same draws).""" + import dataclasses + + full = simulate_lifecycle(population, _SIM_SEED) + target = population.customers[7] + # Re-simulate with ONLY this customer present. + solo_pop = dataclasses.replace(population, customers=[target]) + solo = simulate_lifecycle(solo_pop, _SIM_SEED) + + full_events = [ + (e.event_type, e.event_timestamp, e.mrr_before, e.mrr_after) + for e in full.subscription_events + if e.customer_id == target.customer_id + ] + solo_events = [ + (e.event_type, e.event_timestamp, e.mrr_before, e.mrr_after) + for e in solo.subscription_events + ] + assert full_events == solo_events + + +# --------------------------------------------------------------------------- +# Cadences + full-window coverage +# --------------------------------------------------------------------------- + + +def test_weekly_health_cadence(population, sim) -> None: + # Every active week emits exactly one health signal: consecutive weekly dates. + by_cust: dict[str, list[str]] = {} + for h in sim.health_signals: + by_cust.setdefault(h.customer_id, []).append(h.period_start) + for cust_id, dates in by_cust.items(): + parsed = [date.fromisoformat(d) for d in dates] + for prev, nxt in zip(parsed, parsed[1:], strict=False): + assert (nxt - prev).days == 7, f"{cust_id}: gap {prev} → {nxt}" + + +def test_monthly_invoice_cadence(population, sim) -> None: + # ~12 invoices per 52 active weeks (one per month boundary). + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + subs = {s.customer_id: s for s in sim.subscriptions} + inv_by_cust: dict[str, int] = {} + for inv in sim.invoices: + inv_by_cust[inv.customer_id] = inv_by_cust.get(inv.customer_id, 0) + 1 + for cust_id, n_inv in inv_by_cust.items(): + sub = subs[cust_id] + end = ( + date.fromisoformat(sub.churn_at) + if sub.churn_at + else max( + date.fromisoformat(h.period_start) + for h in sim.health_signals + if h.customer_id == cust_id + ) + ) + active_weeks = (end - starts[cust_id]).days // 7 + 1 + expected_months = int(active_weeks / (52 / 12)) + 1 + assert abs(n_inv - expected_months) <= 1, ( + f"{cust_id}: {n_inv} invoices over {active_weeks} active weeks" + ) + + +def test_nps_quarterly_only(sim) -> None: + starts: dict[str, date] = {} + for h in sim.health_signals: + starts.setdefault(h.customer_id, date.fromisoformat(h.period_start)) + for h in sim.health_signals: + week = (date.fromisoformat(h.period_start) - starts[h.customer_id]).days // 7 + if h.nps_score is not None: + assert week > 0, f"nps at week {week}" + assert week % 13 == 0, f"nps at non-quarterly week {week}" + assert 0 <= h.nps_score <= 10 + else: + assert week == 0 or week % 13 != 0 + + +def test_active_customers_simulated_through_full_window(population, sim) -> None: + # D6: every still-active customer has health coverage through obs + 730d. + obs = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + min_end = { + cid: max(obs, start + timedelta(weeks=4)) + timedelta(days=730) + for cid, start in starts.items() + } + last_signal: dict[str, date] = {} + for h in sim.health_signals: + d = date.fromisoformat(h.period_start) + if h.customer_id not in last_signal or d > last_signal[h.customer_id]: + last_signal[h.customer_id] = d + for sub in sim.subscriptions: + if sub.subscription_status == "active": + assert last_signal[sub.customer_id] >= min_end[sub.customer_id] - timedelta(days=7), ( + f"{sub.customer_id} active but coverage ends {last_signal[sub.customer_id]}" + ) + + +# --------------------------------------------------------------------------- +# Event + terminal-state consistency +# --------------------------------------------------------------------------- + + +def test_event_fk_integrity(population, sim) -> None: + cust_ids = {c.customer_id for c in population.customers} + sub_ids = {s.subscription_id for s in sim.subscriptions} + for e in sim.subscription_events: + assert e.customer_id in cust_ids + assert e.subscription_id in sub_ids + + +def test_churned_subscriptions_consistent(sim) -> None: + churn_events = {e.customer_id for e in sim.subscription_events if e.event_type == "churn"} + for sub in sim.subscriptions: + if sub.subscription_status == "churned": + assert sub.churn_at is not None + assert sub.subscription_end_at == sub.churn_at + assert sub.churn_reason in ("voluntary", "non_renewal", "payment_failure") + assert sub.customer_id in churn_events + else: + assert sub.churn_at is None + assert sub.churn_reason is None + assert sub.customer_id not in churn_events + + +def test_no_events_after_churn(sim) -> None: + churn_date: dict[str, str] = {} + for e in sim.subscription_events: + if e.event_type == "churn": + churn_date[e.customer_id] = e.event_timestamp + for e in sim.subscription_events: + if e.customer_id in churn_date: + assert e.event_timestamp <= churn_date[e.customer_id] + for h in sim.health_signals: + if h.customer_id in churn_date: + assert h.period_start <= churn_date[h.customer_id] + + +def test_mrr_chain_consistency(population, sim) -> None: + initial = {c.customer_id: c.initial_mrr for c in population.customers} + expansions: dict[str, list] = {} + for e in sim.subscription_events: + if e.event_type == "expansion": + expansions.setdefault(e.customer_id, []).append(e) + assert e.mrr_after > e.mrr_before + for sub in sim.subscriptions: + chain = expansions.get(sub.customer_id, []) + mrr = initial[sub.customer_id] + for e in chain: + assert e.mrr_before == mrr + mrr = e.mrr_after + assert sub.current_mrr == mrr + assert sub.expansion_count == len(chain) + + +def test_renewal_events_only_on_anniversaries(population, sim) -> None: + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + terms = {c.customer_id: c.contract_term_months for c in population.customers} + renewal_counts: dict[str, int] = {} + for e in sim.subscription_events: + if e.event_type == "renewal": + week = (date.fromisoformat(e.event_timestamp) - starts[e.customer_id]).days // 7 + assert is_renewal_week(week, terms[e.customer_id]), ( + f"renewal at non-anniversary week {week} (term {terms[e.customer_id]}mo)" + ) + renewal_counts[e.customer_id] = renewal_counts.get(e.customer_id, 0) + 1 + for sub in sim.subscriptions: + assert sub.renewal_count == renewal_counts.get(sub.customer_id, 0) + + +def test_payment_failures_resolve_or_censor(population, sim) -> None: + # Every failed invoice ends as recovered / written_off, unless the customer + # churned (or the window ended) before the dunning period elapsed. + churned = {s.customer_id: s for s in sim.subscriptions if s.churn_at} + for inv in sim.invoices: + assert inv.payment_status in ("paid", "failed", "recovered", "written_off") + if inv.payment_status == "written_off": + sub = churned.get(inv.customer_id) + assert sub is not None + assert sub.churn_reason == "payment_failure" + + +def test_written_off_customers_churn_with_payment_reason(sim) -> None: + wo_customers = {i.customer_id for i in sim.invoices if i.payment_status == "written_off"} + reasons = {s.customer_id: s.churn_reason for s in sim.subscriptions} + for cid in wo_customers: + assert reasons[cid] == "payment_failure" + + +def test_account_latents_influence_outcomes(population) -> None: + """Regression: account latents must NOT be shadowed by customer latents. + + The merge blends the shared traits 50/50 (account-level random effect), so + swinging an account's latent_budget_stability between the extremes must + change its customers' simulated trajectories. + """ + import copy + + lo_pop = copy.deepcopy(population) + hi_pop = copy.deepcopy(population) + for traits in lo_pop.latent_state.account_latents.values(): + traits["latent_budget_stability"] = 0.0 + for traits in hi_pop.latent_state.account_latents.values(): + traits["latent_budget_stability"] = 1.0 + + lo_sim = simulate_lifecycle(lo_pop, _SIM_SEED) + hi_sim = simulate_lifecycle(hi_pop, _SIM_SEED) + lo_failed = sum(1 for i in lo_sim.invoices if i.payment_status != "paid") + hi_failed = sum(1 for i in hi_sim.invoices if i.payment_status != "paid") + assert lo_failed > hi_failed, ( + f"account-level budget stability had no effect: lo={lo_failed}, hi={hi_failed}" + ) + + +def test_dangling_failed_invoices_are_censoring_only(population, sim) -> None: + """An invoice may end at status 'failed' only when the customer churned for + another reason mid-dunning or the simulation window ended first — never + because a second failure was silently dropped while one was pending.""" + from leadforge.schemes.lifecycle.mechanisms import assign_lifecycle_mechanisms + + dunning_weeks = assign_lifecycle_mechanisms( + population.motif_family + ).payment_failure.dunning_weeks + obs = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + subs = {s.customer_id: s for s in sim.subscriptions} + for inv in sim.invoices: + if inv.payment_status != "failed": + continue + inv_date = date.fromisoformat(inv.invoice_date) + resolution_due = inv_date + timedelta(weeks=dunning_weeks) + sub = subs[inv.customer_id] + window_end = max(obs, starts[inv.customer_id] + timedelta(weeks=4)) + timedelta(days=730) + churned_first = ( + sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= resolution_due + ) + censored = resolution_due > window_end + assert churned_first or censored, ( + f"invoice {inv.invoice_id} dangling 'failed' without churn/censoring" + ) + + +def test_recorded_depth_is_round_tripped(sim) -> None: + # The stored observable is the exact value the expansion hazard consumed. + for h in sim.health_signals: + assert h.feature_depth_score == round(h.feature_depth_score, 4) + + +# --------------------------------------------------------------------------- +# Calibration: simulated first-year churn per motif (engine-calibrated bands) +# --------------------------------------------------------------------------- + +_CHURN_BANDS = { + "product_led_retention": (0.13, 0.27), + "relationship_led_retention": (0.18, 0.33), + "expansion_led_growth": (0.09, 0.23), + "payment_fragile": (0.29, 0.45), + "churner_dominated": (0.34, 0.50), +} + + +@pytest.mark.parametrize("motif", LIFECYCLE_MOTIF_FAMILIES) +def test_first_year_churn_within_calibrated_band(motif: str) -> None: + pop = build_customer_population(600, _POP_SEED, motif_family=motif) + result = simulate_lifecycle(pop, _SIM_SEED) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in pop.customers} + yr1 = sum( + 1 + for s in result.subscriptions + if s.churn_at and (date.fromisoformat(s.churn_at) - starts[s.customer_id]).days // 7 <= 52 + ) + rate = yr1 / len(pop.customers) + lo, hi = _CHURN_BANDS[motif] + assert lo <= rate <= hi, f"{motif}: year-1 churn {rate:.1%} outside [{lo:.0%}, {hi:.0%}]" + + +def test_expansion_world_expands_most() -> None: + counts = {} + for motif in ("expansion_led_growth", "churner_dominated"): + pop = build_customer_population(400, _POP_SEED, motif_family=motif) + result = simulate_lifecycle(pop, _SIM_SEED) + counts[motif] = sum(1 for e in result.subscription_events if e.event_type == "expansion") + assert counts["expansion_led_growth"] > 2 * counts["churner_dominated"] + + +def test_most_customers_active_at_observation_date(population, sim) -> None: + # Starts are within ~56 weeks of obs and annual churn ~20%, so a clear + # majority must still be active at the observation date. + obs = date.fromisoformat(population.observation_date) + active_at_obs = sum( + 1 for s in sim.subscriptions if s.churn_at is None or date.fromisoformat(s.churn_at) > obs + ) + assert active_at_obs / len(sim.subscriptions) > 0.6