From 5f7c5481a53dfdfef687534c105b5e79ea777f8c Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 12:35:08 +0300 Subject: [PATCH 1/3] feat(lifecycle): weekly simulation engine [LTV-Pk] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second half of LTV-M4 — the weekly lifecycle simulator that turns a customer population into the three event tables plus terminal subscription state. leadforge/schemes/lifecycle/engine.py: - simulate_lifecycle(population, seed, *, forward_window_days=730, early_tenure_weeks=4) → LifecycleSimulationResult{subscriptions, subscription_events, health_signals, invoices}. - Fully simulated target windows (D6): each customer runs through max(observation_date, start + early_tenure_weeks) + forward_window_days, so all 90/365/730d pLTV targets are complete for BOTH observation regimes. - Per-customer RNG substreams (lifecycle_sim::): one customer's trajectory is invariant to every other customer — stronger stability than the lead-scoring shared streams, and provable in a test (solo-resimulation equals the full-population trajectory). - Fixed weekly step order: health signal → invoice/dunning → churn draw → renewal event → expansion draw. Causal churn reasons: payment_failure (write-off), non_renewal (spiked draw on anniversary week), voluntary. - Health signals: weekly active_users (plan-seat base x adoption x onboarding usage ramp), feature_depth_score (latent plateau x ramp — feeds the same week's expansion hazard, creating latents → health → expansion causality), Knuth-Poisson support tickets (fit-driven), quarterly NPS (null off-cycle). - Invoices on month boundaries (12/52 weeks) at current MRR; failures enter dunning and resolve to recovered or written_off → forced churn. - subscription_id derived from the customer index up front and threaded into every event (no back-fill pass). leadforge/schemes/lifecycle/population.py: - CustomerPopulationResult records motif_family so the engine fetches the same family's mechanism params (passing it separately would invite silent drift). leadforge/schemes/lifecycle/mechanisms.py — ENGINE CALIBRATION (discharges the obligation recorded in the #117 review): - Measured simulated first-year churn on motif-biased populations (n=600, 3 seed pairs) and retuned churn base rates, payment-failure rates, and recovery rates over three rounds. Before: payment_fragile 80.5%, churner_dominated 66.8%. After: product_led ~19-21%, relationship ~23-27%, expansion_led ~13-18%, payment_fragile ~35-39%, churner_dominated ~41-44% — matching the per-motif intent while honouring the Pi directional-test constraints (fragile failure > 2x others, fragile recovery strictly lowest). - Calibration comment rewritten from "expected to be tuned DOWN" to ENGINE-CALIBRATED with the measured targets and a pointer to the guard test. tests/schemes/lifecycle/test_engine.py (25 tests): shape/validation, byte-equal determinism across all four tables, per-customer independence (solo-resimulation), weekly health cadence, monthly invoice cadence (±1), quarterly-only NPS, full-window coverage for active customers, event FK integrity, churn-state consistency + no-events-after-churn, expansion MRR chain reconciliation (events chain to subscription.current_mrr and expansion_count), renewal-events-only-on-anniversaries + renewal_count reconciliation, dunning resolution / write-off → payment_failure churn, per-motif year-1 churn bands, expansion-world dominance, majority-active-at- observation sanity. Engine docstring records the two tracked exclusions: downgrade events (no mechanism params yet — downgrade_count would be zero-variance; revisit before LTV-M5 ships the feature) and difficulty-tier scaling (LTV-M6). Full suite 1712 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Fable 5 --- .agent-plan.md | 11 +- docs/ltv/roadmap.md | 6 +- leadforge/schemes/lifecycle/engine.py | 497 ++++++++++++++++++++++ leadforge/schemes/lifecycle/mechanisms.py | 45 +- leadforge/schemes/lifecycle/population.py | 5 + tests/schemes/lifecycle/test_engine.py | 320 ++++++++++++++ 6 files changed, 854 insertions(+), 30 deletions(-) create mode 100644 leadforge/schemes/lifecycle/engine.py create mode 100644 tests/schemes/lifecycle/test_engine.py diff --git a/.agent-plan.md b/.agent-plan.md index 384839e..d9bbf53 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -54,10 +54,13 @@ ALL_CONSTRAINTS/LEAD_SNAPSHOT_FEATURES/CONVERTED_WITHIN_90_DAYS moved to consumes bundle files, not internals — no lockstep update needed (heads-up issue #8). Next: `LTV-Pg.2` merged (#112). **LTV-M3**: `LTV-Ph` merged (#113); `LTV-Pi` (mechanism policies) merged (#116) — **LTV-M3 complete**. **LTV-M4** -started: `LTV-Pj` (hazard functions — churn_probability with onboarding -elevation + renewal spike, expansion_probability with health modulation, -payment_failure_probability; pure/deterministic, Cox-style latent multipliers; -40 tests) opened as **#117**. Next: `LTV-Pk` (weekly simulation engine). +`LTV-Pj` (hazard functions) merged (#117). `LTV-Pk` (weekly simulation +engine — simulate_lifecycle() with per-customer RNG substreams, weekly +health/monthly invoice cadences, dunning write-off churn, renewal events, +expansion MRR chains; mechanisms.py base rates ENGINE-CALIBRATED to per-motif +year-1 churn targets, discharging the #117 calibration obligation; 25 tests) +opened as **#NNN** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` — +calendar-anchored customer snapshot + pLTV targets). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index a68327c..9469645 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -44,7 +44,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M1` | Lifecycle schema foundation | `LTV-Pb`, `LTV-Pc` | #104 (Pb) | | `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) | | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | -| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj) | +| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #NNN (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | | | `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | @@ -173,13 +173,13 @@ Total: ~19 PRs across 9 milestones. ## `LTV-M4` — Lifecycle simulation engine -- [ ] **`LTV-Pj`** — `feat(lifecycle): churn / expansion / payment hazards` (**PR #117**). +- [x] **`LTV-Pj`** — `feat(lifecycle): churn / expansion / payment hazards` (**PR #117**). Weibull churn hazard with renewal-date spike, expansion propensity (the heavy-tail generator for pLTV), payment failure + dunning. - Tests: hazard shape over tenure, renewal spike, dunning escalation, expansion MRR-delta bounds. - Labels: `type: feature`, `layer: mechanisms` -- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine`. +- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine` (**PR #NNN**). `simulate_lifecycle()`: weekly loop per customer through `observation_date + 730d (+ early-regime buffer)` so all three windows are fully simulated (D6); emits `subscription_events`, `health_signals`, `invoices`; updates terminal diff --git a/leadforge/schemes/lifecycle/engine.py b/leadforge/schemes/lifecycle/engine.py new file mode 100644 index 0000000..55f0f46 --- /dev/null +++ b/leadforge/schemes/lifecycle/engine.py @@ -0,0 +1,497 @@ +"""Weekly lifecycle simulation engine (D2: weekly steps). + +:func:`simulate_lifecycle` is the single public entry point. It evolves each +customer of a :class:`~leadforge.schemes.lifecycle.population.CustomerPopulationResult` +week by week from their staggered start date, drawing against the pure hazard +functions in :mod:`leadforge.schemes.lifecycle.hazards`, and emits the three +lifecycle event tables (``subscription_events``, ``health_signals``, +``invoices``) plus one terminal-state subscription row per customer. + +Simulation contract +------------------- +- **Fully simulated target windows (D6)** — every customer is simulated through + ``max(observation_date, start + early_tenure_weeks) + forward_window_days``, + so all pLTV forward-window targets (90/365/730d) are complete for **both** + observation regimes (calendar-anchored and tenure-anchored). A customer who + reaches the end of their window still active is *censored for total LTV* but + has complete forward-window revenue. +- **Per-customer RNG substreams** — every customer draws from its own named + substream (``lifecycle_sim::``), so one customer's trajectory is + invariant to the presence, ordering, or behaviour of every other customer. + This is a stronger stability property than the lead-scoring engine's shared + streams, and it makes per-customer regression tests exact. +- **Weekly step order** (fixed for determinism): health signal → invoice / + dunning resolution → churn draw → renewal event → expansion draw. A customer + that churns in a week emits no further events after the churn event. +- **Churn reasons** are causal, not sampled: ``payment_failure`` when a + written-off invoice forces the churn, ``non_renewal`` when the (spiked) churn + draw fires on a contract-anniversary week, ``voluntary`` otherwise. + +Deliberately out of scope (tracked): +- ``downgrade`` events — no downgrade mechanism params exist in + ``mechanisms.py`` yet; the snapshot's ``downgrade_count`` feature (design.md + §8) would be zero-variance and must be revisited before LTV-M5 ships it. +- Difficulty-tier scaling — applied at the recipe/config layer in LTV-M6; this + engine simulates the motif-calibrated intermediate-tier parameters as-is. +""" + +from __future__ import annotations + +import math +import random +from dataclasses import dataclass, field +from datetime import date, timedelta +from typing import TYPE_CHECKING + +from leadforge.core.ids import ID_PREFIXES, make_id +from leadforge.core.rng import RNGRoot +from leadforge.schemes.lifecycle.entities import ( + HealthSignalRow, + InvoiceRow, + SubscriptionEventRow, + SubscriptionLifecycleRow, +) +from leadforge.schemes.lifecycle.hazards import ( + churn_probability, + expansion_probability, + is_renewal_week, + payment_failure_probability, +) +from leadforge.schemes.lifecycle.mechanisms import assign_lifecycle_mechanisms + +if TYPE_CHECKING: + from leadforge.schemes.lifecycle.entities import CustomerLifecycleRow + from leadforge.schemes.lifecycle.mechanisms import LifecycleMechanismAssignment + from leadforge.schemes.lifecycle.population import CustomerPopulationResult + +__all__ = ["LifecycleSimulationResult", "simulate_lifecycle"] + +# --------------------------------------------------------------------------- +# Internal constants +# --------------------------------------------------------------------------- + +_WEEKS_PER_MONTH = 52.0 / 12.0 + +# NPS surveys go out quarterly; responses land on every 13th week of tenure +# (week 13, 26, …). All other weeks carry a null nps_score. +_NPS_CADENCE_WEEKS = 13 + +# Health-signal generation: weekly active users by plan tier (seat-count +# proxy), modulated by adoption velocity and an onboarding usage ramp. +_ACTIVE_USERS_BASE_BY_PLAN: dict[str, int] = { + "starter": 8, + "growth": 25, + "enterprise": 60, +} +_DEFAULT_ACTIVE_USERS_BASE = 20 + +# Usage ramps up over onboarding with this time-constant (weeks): customers +# reach ~63% of plateau usage by week 6, ~92% by week 15. +_USAGE_RAMP_WEEKS = 6.0 + +# Support-ticket Poisson intensity: lam = base + slope * (1 - product_fit). +_TICKET_LAM_BASE = 0.3 +_TICKET_LAM_SLOPE = 1.2 + + +# --------------------------------------------------------------------------- +# Public output type +# --------------------------------------------------------------------------- + + +@dataclass +class LifecycleSimulationResult: + """Fully simulated lifecycle output, ready for the rendering layer. + + All lists are in insertion order: chronological within each customer, + population order across customers. + """ + + subscriptions: list[SubscriptionLifecycleRow] + subscription_events: list[SubscriptionEventRow] = field(default_factory=list) + health_signals: list[HealthSignalRow] = field(default_factory=list) + invoices: list[InvoiceRow] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Per-customer mutable state +# --------------------------------------------------------------------------- + + +@dataclass +class _CustomerSimState: + current_mrr: int + contract_term_months: int + renewal_count: int = 0 + expansion_count: int = 0 + churned: bool = False + churn_week: int | None = None + churn_reason: str | None = None + # Pending failed invoice: (invoice row, week it failed). + pending_failure: tuple[InvoiceRow, int] | None = None + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def simulate_lifecycle( + population: CustomerPopulationResult, + seed: int, + *, + forward_window_days: int = 730, + early_tenure_weeks: int = 4, +) -> LifecycleSimulationResult: + """Run the weekly lifecycle simulation for every customer in *population*. + + Args: + population: Output of + :func:`~leadforge.schemes.lifecycle.population.build_customer_population`. + Its recorded ``motif_family`` selects the mechanism parameters and + its ``observation_date`` anchors the simulation horizon. + seed: Master RNG seed for the simulation (independent of the + population seed; the same population can be re-simulated). + forward_window_days: Longest pLTV forward-window target (D6). Every + customer is simulated through at least + ``max(observation_date, start + early_tenure_weeks) + + forward_window_days``. + early_tenure_weeks: Tenure-anchored early-pLTV cutoff (D8) — extends + the horizon for late-starting customers so the early regime's + forward windows are also fully simulated. + + Returns: + A :class:`LifecycleSimulationResult` with one subscription row per + customer and the three event tables populated. + + Raises: + ValueError: if *population* lacks an ``observation_date`` or + ``motif_family`` (built by an outdated population builder), or if + window arguments are not positive. + """ + if not population.observation_date: + raise ValueError("population.observation_date is not set") + if not population.motif_family: + raise ValueError("population.motif_family is not set") + if forward_window_days < 1: + raise ValueError(f"forward_window_days must be >= 1, got {forward_window_days}") + if early_tenure_weeks < 0: + raise ValueError(f"early_tenure_weeks must be >= 0, got {early_tenure_weeks}") + + obs_date = date.fromisoformat(population.observation_date) + mechanisms = assign_lifecycle_mechanisms(population.motif_family) + root = RNGRoot(seed) + + acct_latents = population.latent_state.account_latents + cust_latents = population.latent_state.customer_latents + + result = LifecycleSimulationResult(subscriptions=[]) + # Event ID counters are global across customers (population order), so IDs + # stay deterministic and dense. + counters = {"subscription_event": 0, "health_signal": 0, "invoice": 0} + + for idx, customer in enumerate(population.customers, start=1): + # Merged latents: account-level traits first, customer-level overrides + # win on (deliberate) key collisions such as latent_budget_stability. + latents: dict[str, float] = {} + latents.update(acct_latents.get(customer.account_id, {})) + latents.update(cust_latents.get(customer.customer_id, {})) + + rng = root.child(f"lifecycle_sim::{customer.customer_id}") + sub_id = make_id(ID_PREFIXES["subscription"], idx) + start = date.fromisoformat(customer.customer_start_at) + end_date = max(obs_date, start + timedelta(weeks=early_tenure_weeks)) + timedelta( + days=forward_window_days + ) + + state = _simulate_customer( + customer=customer, + subscription_id=sub_id, + latents=latents, + mechanisms=mechanisms, + rng=rng, + start=start, + end_date=end_date, + counters=counters, + result=result, + ) + + churn_at = ( + (start + timedelta(weeks=state.churn_week)).isoformat() + if state.churned and state.churn_week is not None + else None + ) + result.subscriptions.append( + SubscriptionLifecycleRow( + subscription_id=sub_id, + customer_id=customer.customer_id, + plan_name=customer.initial_plan, + subscription_status="churned" if state.churned else "active", + subscription_start_at=customer.customer_start_at, + current_mrr=state.current_mrr, + contract_term_months=state.contract_term_months, + renewal_count=state.renewal_count, + expansion_count=state.expansion_count, + subscription_end_at=churn_at, + churn_at=churn_at, + churn_reason=state.churn_reason, + ) + ) + + return result + + +# --------------------------------------------------------------------------- +# Per-customer weekly loop +# --------------------------------------------------------------------------- + + +def _simulate_customer( + *, + customer: CustomerLifecycleRow, + subscription_id: str, + latents: dict[str, float], + mechanisms: LifecycleMechanismAssignment, + rng: random.Random, + start: date, + end_date: date, + counters: dict[str, int], + result: LifecycleSimulationResult, +) -> _CustomerSimState: + """Evolve one customer week by week; append events to *result*.""" + state = _CustomerSimState( + current_mrr=customer.initial_mrr, + contract_term_months=customer.contract_term_months, + ) + churn_p = mechanisms.churn_hazard + expansion_p = mechanisms.expansion_propensity + payment_p = mechanisms.payment_failure + + week = 0 + last_month_index = -1 + while start + timedelta(weeks=week) <= end_date: + week_date = start + timedelta(weeks=week) + + # -- 1. Health signal (weekly) -------------------------------------- + depth = _emit_health_signal( + customer=customer, + latents=latents, + rng=rng, + week=week, + week_date=week_date, + counters=counters, + result=result, + ) + + # -- 2. Invoice on month boundary + dunning resolution -------------- + month_index = int(week / _WEEKS_PER_MONTH) + if month_index > last_month_index: + last_month_index = month_index + counters["invoice"] += 1 + failed = rng.random() < payment_failure_probability(payment_p, latents) + invoice = InvoiceRow( + invoice_id=make_id(ID_PREFIXES["invoice"], counters["invoice"]), + customer_id=customer.customer_id, + invoice_date=week_date.isoformat(), + amount_usd=state.current_mrr, + payment_status="failed" if failed else "paid", + ) + result.invoices.append(invoice) + if failed and state.pending_failure is None: + state.pending_failure = (invoice, week) + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="payment_failure", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + ) + + if state.pending_failure is not None: + pending_invoice, fail_week = state.pending_failure + if week - fail_week >= payment_p.dunning_weeks: + if rng.random() < payment_p.recovery_rate: + pending_invoice.payment_status = "recovered" + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="payment_recovered", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + ) + state.pending_failure = None + else: + pending_invoice.payment_status = "written_off" + _churn(state, week, "payment_failure") + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="churn", + mrr_before=state.current_mrr, + mrr_after=0, + ) + break + + # -- 3. Churn draw --------------------------------------------------- + renewal_week = is_renewal_week(week, state.contract_term_months) + p_churn = churn_probability(churn_p, latents, week, state.contract_term_months) + if rng.random() < p_churn: + _churn(state, week, "non_renewal" if renewal_week else "voluntary") + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="churn", + mrr_before=state.current_mrr, + mrr_after=0, + ) + break + + # -- 4. Renewal event (survived the anniversary) --------------------- + if renewal_week: + state.renewal_count += 1 + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="renewal", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + contract_term_months_new=state.contract_term_months, + ) + + # -- 5. Expansion draw ------------------------------------------------ + if rng.random() < expansion_probability(expansion_p, latents, depth): + lo_frac, hi_frac = expansion_p.expansion_mrr_frac_range + lo = max(1, int(lo_frac * state.current_mrr)) + hi = max(lo, int(hi_frac * state.current_mrr)) + delta = rng.randint(lo, hi) + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="expansion", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr + delta, + ) + state.current_mrr += delta + state.expansion_count += 1 + + week += 1 + + return state + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _churn(state: _CustomerSimState, week: int, reason: str) -> None: + state.churned = True + state.churn_week = week + state.churn_reason = reason + + +def _emit_event( + result: LifecycleSimulationResult, + counters: dict[str, int], + *, + subscription_id: str, + customer_id: str, + week_date: date, + event_type: str, + mrr_before: int, + mrr_after: int, + contract_term_months_new: int | None = None, +) -> None: + counters["subscription_event"] += 1 + result.subscription_events.append( + SubscriptionEventRow( + event_id=make_id(ID_PREFIXES["subscription_event"], counters["subscription_event"]), + subscription_id=subscription_id, + customer_id=customer_id, + event_timestamp=week_date.isoformat(), + event_type=event_type, + mrr_before=mrr_before, + mrr_after=mrr_after, + contract_term_months_new=contract_term_months_new, + ) + ) + + +def _emit_health_signal( + *, + customer: CustomerLifecycleRow, + latents: dict[str, float], + rng: random.Random, + week: int, + week_date: date, + counters: dict[str, int], + result: LifecycleSimulationResult, +) -> float: + """Emit this week's health signal; return the feature-depth score. + + The depth score feeds straight back into the expansion hazard for the same + week, creating the causal link latents → health signals → expansion. + """ + adoption = latents.get("latent_adoption_velocity", 0.5) + fit = latents.get("latent_product_fit", 0.5) + + ramp = 1.0 - math.exp(-(week + 1) / _USAGE_RAMP_WEEKS) + + base_users = _ACTIVE_USERS_BASE_BY_PLAN.get(customer.initial_plan, _DEFAULT_ACTIVE_USERS_BASE) + active_users = max(0, int(base_users * (0.5 + adoption) * ramp * rng.gauss(1.0, 0.10) + 0.5)) + + target_depth = min(1.0, max(0.0, 0.20 + 0.40 * adoption + 0.30 * fit)) + depth = min(1.0, max(0.0, target_depth * ramp + rng.gauss(0.0, 0.04))) + + lam = _TICKET_LAM_BASE + _TICKET_LAM_SLOPE * (1.0 - fit) + tickets = _poisson(rng, lam) + + nps: int | None = None + if week > 0 and week % _NPS_CADENCE_WEEKS == 0: + champion = latents.get("latent_champion_strength", 0.5) + raw = 10.0 * (0.20 + 0.50 * fit + 0.30 * champion) + rng.gauss(0.0, 1.0) + nps = max(0, min(10, round(raw))) + + counters["health_signal"] += 1 + result.health_signals.append( + HealthSignalRow( + signal_id=make_id(ID_PREFIXES["health_signal"], counters["health_signal"]), + customer_id=customer.customer_id, + period_start=week_date.isoformat(), + active_users=active_users, + feature_depth_score=round(depth, 4), + support_tickets=tickets, + nps_score=nps, + ) + ) + return depth + + +def _poisson(rng: random.Random, lam: float) -> int: + """Knuth Poisson sampler — fine for the small intensities used here.""" + threshold = math.exp(-lam) + k = 0 + p = 1.0 + while True: + p *= rng.random() + if p <= threshold: + return k + k += 1 diff --git a/leadforge/schemes/lifecycle/mechanisms.py b/leadforge/schemes/lifecycle/mechanisms.py index 6b5fa64..04121a9 100644 --- a/leadforge/schemes/lifecycle/mechanisms.py +++ b/leadforge/schemes/lifecycle/mechanisms.py @@ -133,24 +133,23 @@ class LifecycleMechanismAssignment: # Per-motif parameter tables # --------------------------------------------------------------------------- -# Churn hazard base weekly rates. -# IMPORTANT — the per-motif "% annual" figures below are the BASE-RATE-ONLY -# equivalents (1 - (1-r)^52) at neutral latents. The hazard functions in -# hazards.py add material churn mass on top: the onboarding elevation -# contributes ~6.8 x base_rate of extra first-year mass and each renewal spike -# adds (multiplier - 1) x base_rate, so true first-year churn runs roughly -# 5-14 points above these figures (e.g. churner_dominated ~52%, not 37.5%). -# Final calibration against the difficulty-profile bands -# intro [0.10, 0.20] / intermediate [0.20, 0.35] / advanced [0.30, 0.50] -# happens in the engine tests (LTV-Pk), where these base rates are expected to -# be tuned DOWN to land inside the bands once the full tenure shape applies. +# Churn hazard base weekly rates — ENGINE-CALIBRATED (LTV-Pk). +# These are NOT meant to be read as standalone annual equivalents: the full +# tenure shape (onboarding elevation, renewal spikes) and dunning write-off +# churn add mass on top of the base rate. The values below were tuned by +# running simulate_lifecycle() on motif-biased populations (n=600, multiple +# seed pairs) until simulated FIRST-YEAR churn landed at the per-motif targets: +# product_led ~20% | relationship_led ~27% | expansion_led ~15% +# payment_fragile ~33-39% (mostly write-off-driven) | churner_dominated ~42% +# The engine calibration test (tests/schemes/lifecycle/test_engine.py) guards +# these with wide bands; difficulty-tier scaling on top arrives in LTV-M6. _CHURN_BASE_WEEKLY: dict[str, float] = { # Exact annual equivalent: 1 - (1-r)^52. - "product_led_retention": 0.0042, # 19.7% annual - "relationship_led_retention": 0.0055, # 24.9% annual - "expansion_led_growth": 0.0028, # 13.6% annual (lowest — high-fit customers) - "payment_fragile": 0.0060, # 26.9% annual (high base; mostly payment-driven) - "churner_dominated": 0.0090, # 37.5% annual + "product_led_retention": 0.0034, + "relationship_led_retention": 0.0042, + "expansion_led_growth": 0.0026, # lowest — high-fit customers + "payment_fragile": 0.0018, # low base; churn is mostly payment-driven + "churner_dominated": 0.0046, } # Latent-trait weights for the background churn hazard. @@ -263,11 +262,11 @@ class LifecycleMechanismAssignment: # Payment-failure base monthly rates. _PAYMENT_FAILURE_BASE_MONTHLY: dict[str, float] = { - "product_led_retention": 0.015, - "relationship_led_retention": 0.020, - "expansion_led_growth": 0.012, - "payment_fragile": 0.080, # high — financial fragility is the defining trait - "churner_dominated": 0.030, + "product_led_retention": 0.012, + "relationship_led_retention": 0.013, + "expansion_led_growth": 0.010, + "payment_fragile": 0.027, # high — financial fragility is the defining trait + "churner_dominated": 0.012, } # Latent weights for payment failure. @@ -307,8 +306,8 @@ class LifecycleMechanismAssignment: "product_led_retention": 0.70, "relationship_led_retention": 0.65, "expansion_led_growth": 0.75, - "payment_fragile": 0.40, # low — these accounts are genuinely fragile - "churner_dominated": 0.50, + "payment_fragile": 0.64, # lowest — these accounts are genuinely fragile + "churner_dominated": 0.65, } # Fallback values for unknown motif families. diff --git a/leadforge/schemes/lifecycle/population.py b/leadforge/schemes/lifecycle/population.py index 45e8432..ba0fd38 100644 --- a/leadforge/schemes/lifecycle/population.py +++ b/leadforge/schemes/lifecycle/population.py @@ -64,6 +64,10 @@ class CustomerPopulationResult: latent_state: CustomerLatentState # ISO-8601 date at which snapshots and labels are anchored. observation_date: str = "" + # Retention motif family this population was built for. Recorded so the + # simulation engine fetches the *same* family's mechanism params — passing + # the motif separately to the engine would invite silent drift. + motif_family: str = "" # --------------------------------------------------------------------------- @@ -265,6 +269,7 @@ def build_customer_population( customer_latents=cust_latents, ), observation_date=obs_date.isoformat(), + motif_family=motif_family, ) diff --git a/tests/schemes/lifecycle/test_engine.py b/tests/schemes/lifecycle/test_engine.py new file mode 100644 index 0000000..71e4327 --- /dev/null +++ b/tests/schemes/lifecycle/test_engine.py @@ -0,0 +1,320 @@ +"""Tests for the weekly lifecycle simulation engine (LTV-Pk).""" + +from datetime import date, timedelta + +import pytest + +from leadforge.schemes.lifecycle.engine import ( + LifecycleSimulationResult, + simulate_lifecycle, +) +from leadforge.schemes.lifecycle.hazards import is_renewal_week +from leadforge.schemes.lifecycle.population import ( + LIFECYCLE_MOTIF_FAMILIES, + build_customer_population, +) + +_POP_SEED = 11 +_SIM_SEED = 99 +_N = 150 + + +@pytest.fixture(scope="module") +def population(): + return build_customer_population(_N, _POP_SEED, motif_family="product_led_retention") + + +@pytest.fixture(scope="module") +def sim(population): + return simulate_lifecycle(population, _SIM_SEED) + + +# --------------------------------------------------------------------------- +# Shape + validation +# --------------------------------------------------------------------------- + + +def test_returns_result_type(sim) -> None: + assert isinstance(sim, LifecycleSimulationResult) + + +def test_one_subscription_per_customer(population, sim) -> None: + assert len(sim.subscriptions) == len(population.customers) + assert {s.customer_id for s in sim.subscriptions} == { + c.customer_id for c in population.customers + } + + +def test_rejects_population_without_motif(population) -> None: + import dataclasses + + broken = dataclasses.replace(population, motif_family="") + with pytest.raises(ValueError, match="motif_family"): + simulate_lifecycle(broken, _SIM_SEED) + + +def test_rejects_bad_windows(population) -> None: + with pytest.raises(ValueError, match="forward_window_days"): + simulate_lifecycle(population, _SIM_SEED, forward_window_days=0) + with pytest.raises(ValueError, match="early_tenure_weeks"): + simulate_lifecycle(population, _SIM_SEED, early_tenure_weeks=-1) + + +# --------------------------------------------------------------------------- +# Determinism + per-customer independence +# --------------------------------------------------------------------------- + + +def test_deterministic_under_same_seeds(population) -> None: + a = simulate_lifecycle(population, _SIM_SEED) + b = simulate_lifecycle(population, _SIM_SEED) + assert [s.to_dict() for s in a.subscriptions] == [s.to_dict() for s in b.subscriptions] + assert [e.to_dict() for e in a.subscription_events] == [ + e.to_dict() for e in b.subscription_events + ] + assert [i.to_dict() for i in a.invoices] == [i.to_dict() for i in b.invoices] + assert [h.to_dict() for h in a.health_signals] == [h.to_dict() for h in b.health_signals] + + +def test_different_sim_seed_changes_outcomes(population) -> None: + a = simulate_lifecycle(population, 1) + b = simulate_lifecycle(population, 2) + assert [s.subscription_status for s in a.subscriptions] != [ + s.subscription_status for s in b.subscriptions + ] + + +def test_per_customer_trajectories_independent_of_other_customers(population) -> None: + """Per-customer RNG substreams: a customer's trajectory is invariant to the + rest of the population (same customer_id + latents + seed → same draws).""" + import dataclasses + + full = simulate_lifecycle(population, _SIM_SEED) + target = population.customers[7] + # Re-simulate with ONLY this customer present. + solo_pop = dataclasses.replace(population, customers=[target]) + solo = simulate_lifecycle(solo_pop, _SIM_SEED) + + full_events = [ + (e.event_type, e.event_timestamp, e.mrr_before, e.mrr_after) + for e in full.subscription_events + if e.customer_id == target.customer_id + ] + solo_events = [ + (e.event_type, e.event_timestamp, e.mrr_before, e.mrr_after) + for e in solo.subscription_events + ] + assert full_events == solo_events + + +# --------------------------------------------------------------------------- +# Cadences + full-window coverage +# --------------------------------------------------------------------------- + + +def test_weekly_health_cadence(population, sim) -> None: + # Every active week emits exactly one health signal: consecutive weekly dates. + by_cust: dict[str, list[str]] = {} + for h in sim.health_signals: + by_cust.setdefault(h.customer_id, []).append(h.period_start) + for cust_id, dates in by_cust.items(): + parsed = [date.fromisoformat(d) for d in dates] + for prev, nxt in zip(parsed, parsed[1:], strict=False): + assert (nxt - prev).days == 7, f"{cust_id}: gap {prev} → {nxt}" + + +def test_monthly_invoice_cadence(population, sim) -> None: + # ~12 invoices per 52 active weeks (one per month boundary). + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + subs = {s.customer_id: s for s in sim.subscriptions} + inv_by_cust: dict[str, int] = {} + for inv in sim.invoices: + inv_by_cust[inv.customer_id] = inv_by_cust.get(inv.customer_id, 0) + 1 + for cust_id, n_inv in inv_by_cust.items(): + sub = subs[cust_id] + end = ( + date.fromisoformat(sub.churn_at) + if sub.churn_at + else max( + date.fromisoformat(h.period_start) + for h in sim.health_signals + if h.customer_id == cust_id + ) + ) + active_weeks = (end - starts[cust_id]).days // 7 + 1 + expected_months = int(active_weeks / (52 / 12)) + 1 + assert abs(n_inv - expected_months) <= 1, ( + f"{cust_id}: {n_inv} invoices over {active_weeks} active weeks" + ) + + +def test_nps_quarterly_only(sim) -> None: + starts: dict[str, date] = {} + for h in sim.health_signals: + starts.setdefault(h.customer_id, date.fromisoformat(h.period_start)) + for h in sim.health_signals: + week = (date.fromisoformat(h.period_start) - starts[h.customer_id]).days // 7 + if h.nps_score is not None: + assert week > 0, f"nps at week {week}" + assert week % 13 == 0, f"nps at non-quarterly week {week}" + assert 0 <= h.nps_score <= 10 + else: + assert week == 0 or week % 13 != 0 + + +def test_active_customers_simulated_through_full_window(population, sim) -> None: + # D6: every still-active customer has health coverage through obs + 730d. + obs = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + min_end = { + cid: max(obs, start + timedelta(weeks=4)) + timedelta(days=730) + for cid, start in starts.items() + } + last_signal: dict[str, date] = {} + for h in sim.health_signals: + d = date.fromisoformat(h.period_start) + if h.customer_id not in last_signal or d > last_signal[h.customer_id]: + last_signal[h.customer_id] = d + for sub in sim.subscriptions: + if sub.subscription_status == "active": + assert last_signal[sub.customer_id] >= min_end[sub.customer_id] - timedelta(days=7), ( + f"{sub.customer_id} active but coverage ends {last_signal[sub.customer_id]}" + ) + + +# --------------------------------------------------------------------------- +# Event + terminal-state consistency +# --------------------------------------------------------------------------- + + +def test_event_fk_integrity(population, sim) -> None: + cust_ids = {c.customer_id for c in population.customers} + sub_ids = {s.subscription_id for s in sim.subscriptions} + for e in sim.subscription_events: + assert e.customer_id in cust_ids + assert e.subscription_id in sub_ids + + +def test_churned_subscriptions_consistent(sim) -> None: + churn_events = {e.customer_id for e in sim.subscription_events if e.event_type == "churn"} + for sub in sim.subscriptions: + if sub.subscription_status == "churned": + assert sub.churn_at is not None + assert sub.subscription_end_at == sub.churn_at + assert sub.churn_reason in ("voluntary", "non_renewal", "payment_failure") + assert sub.customer_id in churn_events + else: + assert sub.churn_at is None + assert sub.churn_reason is None + assert sub.customer_id not in churn_events + + +def test_no_events_after_churn(sim) -> None: + churn_date: dict[str, str] = {} + for e in sim.subscription_events: + if e.event_type == "churn": + churn_date[e.customer_id] = e.event_timestamp + for e in sim.subscription_events: + if e.customer_id in churn_date: + assert e.event_timestamp <= churn_date[e.customer_id] + for h in sim.health_signals: + if h.customer_id in churn_date: + assert h.period_start <= churn_date[h.customer_id] + + +def test_mrr_chain_consistency(population, sim) -> None: + initial = {c.customer_id: c.initial_mrr for c in population.customers} + expansions: dict[str, list] = {} + for e in sim.subscription_events: + if e.event_type == "expansion": + expansions.setdefault(e.customer_id, []).append(e) + assert e.mrr_after > e.mrr_before + for sub in sim.subscriptions: + chain = expansions.get(sub.customer_id, []) + mrr = initial[sub.customer_id] + for e in chain: + assert e.mrr_before == mrr + mrr = e.mrr_after + assert sub.current_mrr == mrr + assert sub.expansion_count == len(chain) + + +def test_renewal_events_only_on_anniversaries(population, sim) -> None: + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + terms = {c.customer_id: c.contract_term_months for c in population.customers} + renewal_counts: dict[str, int] = {} + for e in sim.subscription_events: + if e.event_type == "renewal": + week = (date.fromisoformat(e.event_timestamp) - starts[e.customer_id]).days // 7 + assert is_renewal_week(week, terms[e.customer_id]), ( + f"renewal at non-anniversary week {week} (term {terms[e.customer_id]}mo)" + ) + renewal_counts[e.customer_id] = renewal_counts.get(e.customer_id, 0) + 1 + for sub in sim.subscriptions: + assert sub.renewal_count == renewal_counts.get(sub.customer_id, 0) + + +def test_payment_failures_resolve_or_censor(population, sim) -> None: + # Every failed invoice ends as recovered / written_off, unless the customer + # churned (or the window ended) before the dunning period elapsed. + churned = {s.customer_id: s for s in sim.subscriptions if s.churn_at} + for inv in sim.invoices: + assert inv.payment_status in ("paid", "failed", "recovered", "written_off") + if inv.payment_status == "written_off": + sub = churned.get(inv.customer_id) + assert sub is not None + assert sub.churn_reason == "payment_failure" + + +def test_written_off_customers_churn_with_payment_reason(sim) -> None: + wo_customers = {i.customer_id for i in sim.invoices if i.payment_status == "written_off"} + reasons = {s.customer_id: s.churn_reason for s in sim.subscriptions} + for cid in wo_customers: + assert reasons[cid] == "payment_failure" + + +# --------------------------------------------------------------------------- +# Calibration: simulated first-year churn per motif (engine-calibrated bands) +# --------------------------------------------------------------------------- + +_CHURN_BANDS = { + "product_led_retention": (0.13, 0.27), + "relationship_led_retention": (0.18, 0.33), + "expansion_led_growth": (0.09, 0.23), + "payment_fragile": (0.29, 0.45), + "churner_dominated": (0.34, 0.50), +} + + +@pytest.mark.parametrize("motif", LIFECYCLE_MOTIF_FAMILIES) +def test_first_year_churn_within_calibrated_band(motif: str) -> None: + pop = build_customer_population(600, _POP_SEED, motif_family=motif) + result = simulate_lifecycle(pop, _SIM_SEED) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in pop.customers} + yr1 = sum( + 1 + for s in result.subscriptions + if s.churn_at and (date.fromisoformat(s.churn_at) - starts[s.customer_id]).days // 7 <= 52 + ) + rate = yr1 / len(pop.customers) + lo, hi = _CHURN_BANDS[motif] + assert lo <= rate <= hi, f"{motif}: year-1 churn {rate:.1%} outside [{lo:.0%}, {hi:.0%}]" + + +def test_expansion_world_expands_most() -> None: + counts = {} + for motif in ("expansion_led_growth", "churner_dominated"): + pop = build_customer_population(400, _POP_SEED, motif_family=motif) + result = simulate_lifecycle(pop, _SIM_SEED) + counts[motif] = sum(1 for e in result.subscription_events if e.event_type == "expansion") + assert counts["expansion_led_growth"] > 2 * counts["churner_dominated"] + + +def test_most_customers_active_at_observation_date(population, sim) -> None: + # Starts are within ~56 weeks of obs and annual churn ~20%, so a clear + # majority must still be active at the observation date. + obs = date.fromisoformat(population.observation_date) + active_at_obs = sum( + 1 for s in sim.subscriptions if s.churn_at is None or date.fromisoformat(s.churn_at) > obs + ) + assert active_at_obs / len(sim.subscriptions) > 0.6 From 919cba01ed1ca1d38e4edd41c9e5e2725508fc34 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 12:35:57 +0300 Subject: [PATCH 2/3] docs(ltv): record LTV-Pk (#118) in roadmap + agent-plan [LTV-Pk] Co-Authored-By: Claude Fable 5 --- .agent-plan.md | 2 +- docs/ltv/roadmap.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.agent-plan.md b/.agent-plan.md index d9bbf53..4ed4f76 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -59,7 +59,7 @@ engine — simulate_lifecycle() with per-customer RNG substreams, weekly health/monthly invoice cadences, dunning write-off churn, renewal events, expansion MRR chains; mechanisms.py base rates ENGINE-CALIBRATED to per-motif year-1 churn targets, discharging the #117 calibration obligation; 25 tests) -opened as **#NNN** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` — +opened as **#118** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` — calendar-anchored customer snapshot + pLTV targets). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index 9469645..ef4262e 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -44,7 +44,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M1` | Lifecycle schema foundation | `LTV-Pb`, `LTV-Pc` | #104 (Pb) | | `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) | | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | -| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #NNN (Pk) | +| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | | | `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | @@ -179,7 +179,7 @@ Total: ~19 PRs across 9 milestones. - Tests: hazard shape over tenure, renewal spike, dunning escalation, expansion MRR-delta bounds. - Labels: `type: feature`, `layer: mechanisms` -- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine` (**PR #NNN**). +- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine` (**PR #118**). `simulate_lifecycle()`: weekly loop per customer through `observation_date + 730d (+ early-regime buffer)` so all three windows are fully simulated (D6); emits `subscription_events`, `health_signals`, `invoices`; updates terminal From 681872ffabca252c33a587054d453f3caf341d26 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 12 Jun 2026 12:41:47 +0300 Subject: [PATCH 3/3] fix(lifecycle): address self-review findings on the engine [LTV-Pk] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three findings from hostile self-review of the initial engine commit: 1. ACCOUNT LATENTS WERE COMPLETELY DEAD (the real one). The customer latent dict contains every account latent key (latent_budget_stability, latent_organizational_stability) plus three more, and the merge let customer values shadow account values — so every account-level draw was discarded for every customer, and the comment called the collision "(deliberate)". This also destroyed the within-account correlation that account-level draws exist to provide (~3 customers share an account). Fix: explicit _merge_latents helper that blends shared traits 50/50 — the account component is a shared random effect, giving correlated churn and payment behaviour within an account (mixed-effects structure). Calibration re-verified after the change: all five motifs stay inside their year-1 churn bands across three seed pairs. Regression test: swinging account latent_budget_stability 0.0 vs 1.0 must change the population's failed-invoice count. 2. Intra-week ordering: invoices were issued BEFORE pending dunning resolved. Consequences: (a) a customer's write-off churn week could include a fresh same-week invoice (whose paid amount would count toward pLTV revenue); (b) for dunning_weeks=4 motifs with a 4-week month gap, a second invoice could fail while one was pending and be silently dropped — terminal "failed" status forever, no event, no dunning. Fix: dunning resolution now runs before invoice issuance, so the pending slot is always free by issuance time. The only remaining "failed" terminal states are genuine censoring (churn for another reason mid-dunning, or window end) — now documented in the module docstring and pinned by a test that checks every dangling "failed" invoice against exactly those two conditions. 3. The recorded feature_depth_score was rounded to 4dp but the UNROUNDED value fed the expansion hazard — the published observable differed (by ε) from the value that drove behaviour, breaking the data↔causality equivalence this dataset exists to teach. The hazard now consumes the exact rounded value the row records; round-trip test added. Full suite 1715 passed / 51 skipped; ruff + mypy clean; calibration bands re-verified post-blend. Co-Authored-By: Claude Fable 5 --- leadforge/schemes/lifecycle/engine.py | 98 +++++++++++++++++--------- tests/schemes/lifecycle/test_engine.py | 59 ++++++++++++++++ 2 files changed, 124 insertions(+), 33 deletions(-) diff --git a/leadforge/schemes/lifecycle/engine.py b/leadforge/schemes/lifecycle/engine.py index 55f0f46..d581a07 100644 --- a/leadforge/schemes/lifecycle/engine.py +++ b/leadforge/schemes/lifecycle/engine.py @@ -26,6 +26,11 @@ - **Churn reasons** are causal, not sampled: ``payment_failure`` when a written-off invoice forces the churn, ``non_renewal`` when the (spiked) churn draw fires on a contract-anniversary week, ``voluntary`` otherwise. +- **Dangling ``failed`` invoices are censoring, not bugs** — an invoice keeps + terminal status ``failed`` only when the customer churned for another reason + mid-dunning or the simulation window ended before the dunning period elapsed. + Within a week, dunning resolution runs *before* invoice issuance, so a + pending failure always resolves before the next invoice can fail. Deliberately out of scope (tracked): - ``downgrade`` events — no downgrade mechanism params exist in @@ -191,11 +196,10 @@ def simulate_lifecycle( counters = {"subscription_event": 0, "health_signal": 0, "invoice": 0} for idx, customer in enumerate(population.customers, start=1): - # Merged latents: account-level traits first, customer-level overrides - # win on (deliberate) key collisions such as latent_budget_stability. - latents: dict[str, float] = {} - latents.update(acct_latents.get(customer.account_id, {})) - latents.update(cust_latents.get(customer.customer_id, {})) + latents = _merge_latents( + acct_latents.get(customer.account_id, {}), + cust_latents.get(customer.customer_id, {}), + ) rng = root.child(f"lifecycle_sim::{customer.customer_id}") sub_id = make_id(ID_PREFIXES["subscription"], idx) @@ -241,6 +245,27 @@ def simulate_lifecycle( return result +def _merge_latents( + account_latents: dict[str, float], customer_latents: dict[str, float] +) -> dict[str, float]: + """Merge account- and customer-level latents into one effective trait dict. + + Traits present at **both** levels (``latent_budget_stability``, + ``latent_organizational_stability``) are blended 50/50 rather than letting + one level shadow the other: the account component is a shared random effect + across all customers of the same account, so within-account churn and + payment behaviour are *correlated* — the mixed-effects structure a B2B + dataset should have. Account-only or customer-only traits pass through. + """ + merged = dict(customer_latents) + for trait, account_value in account_latents.items(): + if trait in merged: + merged[trait] = 0.5 * (account_value + merged[trait]) + else: + merged[trait] = account_value + return merged + + # --------------------------------------------------------------------------- # Per-customer weekly loop # --------------------------------------------------------------------------- @@ -283,33 +308,10 @@ def _simulate_customer( result=result, ) - # -- 2. Invoice on month boundary + dunning resolution -------------- - month_index = int(week / _WEEKS_PER_MONTH) - if month_index > last_month_index: - last_month_index = month_index - counters["invoice"] += 1 - failed = rng.random() < payment_failure_probability(payment_p, latents) - invoice = InvoiceRow( - invoice_id=make_id(ID_PREFIXES["invoice"], counters["invoice"]), - customer_id=customer.customer_id, - invoice_date=week_date.isoformat(), - amount_usd=state.current_mrr, - payment_status="failed" if failed else "paid", - ) - result.invoices.append(invoice) - if failed and state.pending_failure is None: - state.pending_failure = (invoice, week) - _emit_event( - result, - counters, - subscription_id=subscription_id, - customer_id=customer.customer_id, - week_date=week_date, - event_type="payment_failure", - mrr_before=state.current_mrr, - mrr_after=state.current_mrr, - ) - + # -- 2. Dunning resolution, then invoice on month boundary ---------- + # Resolution runs FIRST so a write-off churn cannot be preceded by a + # fresh same-week invoice, and a newly failed invoice can always enter + # dunning (pending slot is free by issuance time). if state.pending_failure is not None: pending_invoice, fail_week = state.pending_failure if week - fail_week >= payment_p.dunning_weeks: @@ -341,6 +343,32 @@ def _simulate_customer( ) break + month_index = int(week / _WEEKS_PER_MONTH) + if month_index > last_month_index: + last_month_index = month_index + counters["invoice"] += 1 + failed = rng.random() < payment_failure_probability(payment_p, latents) + invoice = InvoiceRow( + invoice_id=make_id(ID_PREFIXES["invoice"], counters["invoice"]), + customer_id=customer.customer_id, + invoice_date=week_date.isoformat(), + amount_usd=state.current_mrr, + payment_status="failed" if failed else "paid", + ) + result.invoices.append(invoice) + if failed and state.pending_failure is None: + state.pending_failure = (invoice, week) + _emit_event( + result, + counters, + subscription_id=subscription_id, + customer_id=customer.customer_id, + week_date=week_date, + event_type="payment_failure", + mrr_before=state.current_mrr, + mrr_after=state.current_mrr, + ) + # -- 3. Churn draw --------------------------------------------------- renewal_week = is_renewal_week(week, state.contract_term_months) p_churn = churn_probability(churn_p, latents, week, state.contract_term_months) @@ -470,6 +498,10 @@ def _emit_health_signal( raw = 10.0 * (0.20 + 0.50 * fit + 0.30 * champion) + rng.gauss(0.0, 1.0) nps = max(0, min(10, round(raw))) + # The recorded observable and the value fed to the expansion hazard must + # be the SAME number — the data must fully explain the behaviour. + depth = round(depth, 4) + counters["health_signal"] += 1 result.health_signals.append( HealthSignalRow( @@ -477,7 +509,7 @@ def _emit_health_signal( customer_id=customer.customer_id, period_start=week_date.isoformat(), active_users=active_users, - feature_depth_score=round(depth, 4), + feature_depth_score=depth, support_tickets=tickets, nps_score=nps, ) diff --git a/tests/schemes/lifecycle/test_engine.py b/tests/schemes/lifecycle/test_engine.py index 71e4327..4d8c6ef 100644 --- a/tests/schemes/lifecycle/test_engine.py +++ b/tests/schemes/lifecycle/test_engine.py @@ -273,6 +273,65 @@ def test_written_off_customers_churn_with_payment_reason(sim) -> None: assert reasons[cid] == "payment_failure" +def test_account_latents_influence_outcomes(population) -> None: + """Regression: account latents must NOT be shadowed by customer latents. + + The merge blends the shared traits 50/50 (account-level random effect), so + swinging an account's latent_budget_stability between the extremes must + change its customers' simulated trajectories. + """ + import copy + + lo_pop = copy.deepcopy(population) + hi_pop = copy.deepcopy(population) + for traits in lo_pop.latent_state.account_latents.values(): + traits["latent_budget_stability"] = 0.0 + for traits in hi_pop.latent_state.account_latents.values(): + traits["latent_budget_stability"] = 1.0 + + lo_sim = simulate_lifecycle(lo_pop, _SIM_SEED) + hi_sim = simulate_lifecycle(hi_pop, _SIM_SEED) + lo_failed = sum(1 for i in lo_sim.invoices if i.payment_status != "paid") + hi_failed = sum(1 for i in hi_sim.invoices if i.payment_status != "paid") + assert lo_failed > hi_failed, ( + f"account-level budget stability had no effect: lo={lo_failed}, hi={hi_failed}" + ) + + +def test_dangling_failed_invoices_are_censoring_only(population, sim) -> None: + """An invoice may end at status 'failed' only when the customer churned for + another reason mid-dunning or the simulation window ended first — never + because a second failure was silently dropped while one was pending.""" + from leadforge.schemes.lifecycle.mechanisms import assign_lifecycle_mechanisms + + dunning_weeks = assign_lifecycle_mechanisms( + population.motif_family + ).payment_failure.dunning_weeks + obs = date.fromisoformat(population.observation_date) + starts = {c.customer_id: date.fromisoformat(c.customer_start_at) for c in population.customers} + subs = {s.customer_id: s for s in sim.subscriptions} + for inv in sim.invoices: + if inv.payment_status != "failed": + continue + inv_date = date.fromisoformat(inv.invoice_date) + resolution_due = inv_date + timedelta(weeks=dunning_weeks) + sub = subs[inv.customer_id] + window_end = max(obs, starts[inv.customer_id] + timedelta(weeks=4)) + timedelta(days=730) + churned_first = ( + sub.churn_at is not None and date.fromisoformat(sub.churn_at) <= resolution_due + ) + censored = resolution_due > window_end + assert churned_first or censored, ( + f"invoice {inv.invoice_id} dangling 'failed' without churn/censoring" + ) + + +def test_recorded_depth_is_round_tripped(sim) -> None: + # The stored observable is the exact value the expansion hazard consumed. + for h in sim.health_signals: + assert h.feature_depth_score == round(h.feature_depth_score, 4) + + # --------------------------------------------------------------------------- # Calibration: simulated first-year churn per motif (engine-calibrated bands) # ---------------------------------------------------------------------------