Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ engine — simulate_lifecycle() with per-customer RNG substreams, weekly
health/monthly invoice cadences, dunning write-off churn, renewal events,
expansion MRR chains; mechanisms.py base rates ENGINE-CALIBRATED to per-motif
year-1 churn targets, discharging the #117 calibration obligation; 25 tests)
opened as **#118** — completes **LTV-M4**. Next: `LTV-M5` (`LTV-Pl` —
calendar-anchored customer snapshot + pLTV targets).
merged (#118) — **LTV-M4 complete**. **LTV-M5**: `LTV-Pl`
(calendar-anchored customer snapshot — `build_customer_snapshot()` +
`CUSTOMER_SNAPSHOT_FEATURES` with the three `ltv_revenue_{90,365,730}d`
targets, `churned_within_180d`, and the `mrr_change_full_period` trap;
difficulty distortions extracted to scheme-agnostic `render/distortions.py`,
lead-scoring byte-identical; 39 tests) opened as **#119**. Next: `LTV-Pm`
(early-pLTV tenure-anchored task family).

---

Expand Down
56 changes: 44 additions & 12 deletions docs/ltv/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protocol + registry, with the package physically reorganized into
| `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe), #109 (Pf.1), #110 (Pf.2), #111 (Pg.1), #112 (Pg.2) |
| `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) |
| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) |
| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | |
| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl) |
| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | |
| `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | |
| `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | |
Expand All @@ -72,11 +72,16 @@ Total: ~19 PRs across 9 milestones.
Lead-scoring catalog untouched. (These rows relocate into
`schemes/lifecycle/` during `LTV-M2`.)
- Labels: `type: feature`, `layer: schema`
- [ ] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`.
- [~] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`.
**Feature-catalog half discharged in `LTV-Pl` (#119):**
`CUSTOMER_SNAPSHOT_FEATURES` (three `ltv_revenue_{90,365,730}d` targets, the
secondary `churned_within_180d`, the `mrr_change_full_period` trap); regression
task specs + a `task_type` (`regression` | `classification`) on the task model.
- Tests: feature-spec invariants, regression task-spec shape.
secondary `churned_within_180d`, the `mrr_change_full_period` trap) is
authored in `schemes/lifecycle/features.py` (post-reorg home, per the
`LTV-M2` note above) because the snapshot builder needs it. **Remaining
scope (folds into `LTV-Pn`):** regression task specs + a `task_type`
(`regression` | `classification`) on the task model — they belong with the
task-split writer's continuous-target path.
- Tests: feature-spec invariants ✓ (#119); regression task-spec shape → `LTV-Pn`.
- Labels: `type: feature`, `layer: schema`

---
Expand Down Expand Up @@ -192,13 +197,40 @@ Total: ~19 PRs across 9 milestones.

## `LTV-M5` — Customer snapshots + pLTV targets (both regimes)

- [ ] **`LTV-Pl`** — `feat(lifecycle): calendar-anchored customer snapshot`.
`build_customer_snapshot(cutoff=observation_date)`: last-12-week health
aggregates; `mrr_change_at_snapshot` (valid) + `mrr_change_full_period`
(trap); the three `ltv_revenue_{90,365,730}d` gross-revenue targets +
`churned_within_180d`; difficulty distortions.
- Tests: no post-cutoff data in windowed columns; ZILN target shape; trap
invariant; target derivation; trap exempt from distortion.
- [x] **`LTV-Pl`** — `feat(lifecycle): calendar-anchored customer snapshot`
(**PR #119**). `schemes/lifecycle/snapshots.py`:
`build_customer_snapshot(cutoff=…)` — one row per active-at-cutoff customer;
at-cutoff subscription state reconstructed from the event chain (not the
terminal row); last-12-week health aggregates + whole-history `last_nps_score`;
`mrr_change_at_snapshot` (valid) + `mrr_change_full_period` (trap, all modes,
distortion-exempt); `ltv_revenue_{90,365,730}d` (gross = paid + recovered
invoices, attributed by issuance date) + `churned_within_180d`.
`CUSTOMER_SNAPSHOT_FEATURES` catalog in `schemes/lifecycle/features.py`
(discharges the `LTV-Pc` catalog half). Difficulty distortions extracted to
the scheme-agnostic `render/distortions.py` (lead-scoring delegates;
verified byte-identical). **Deliberately omitted from the catalog:**
`current_plan` (no plan-change mechanism → exact duplicate of
`initial_plan`) and `downgrade_count` (no downgrade mechanism →
zero-variance); re-add only with the mechanism.
- Tests (43): censoring-based leakage probe (features identical when all
post-cutoff events are deleted); target derivation vs the invoice table;
failed/written-off exclusion (D7); ZILN target shape; trap-divergence
invariant; trap + targets exempt from distortion; weeks_to_next_renewal
agrees with `is_renewal_week`.
- Self-review hardening: `LifecycleSimulationResult` records its
`forward_window_days`/`early_tenure_weeks` and the builder rejects sims
whose horizon cannot cover the 730d/180d target windows (silent-censoring
guard); anniversary boundary single-sourced via public
`hazards.next_renewal_week`; population/sim mismatch raises a real error.
- **Deferred to `LTV-Pn` (difficulty wiring):** the design.md §7 secondary
advanced-tier trap `last_health_signal_post_obs` — it is tier-conditional,
so it belongs with the difficulty-profile plumbing, not the builder.
- **Deferred to `LTV-Pn` (bundle writer):** an opt-in dtype-preserving
missingness mode for `render/distortions.py` (`pd.NA` into nullable
`Int64` instead of the Float64 conversion) — the lead-scoring default is
byte-identity-locked, but the lifecycle scheme has no shipped bundles yet
and can pick the cleaner semantics when its parquet schemas are fixed
(Copilot review suggestion on #119).
- Labels: `type: feature`, `layer: render`
- [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`.
Reuse the snapshot builder with a per-customer relative cutoff
Expand Down
167 changes: 167 additions & 0 deletions leadforge/render/distortions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Scheme-agnostic difficulty distortions for snapshot tables.

:func:`apply_difficulty_distortions` injects Gaussian noise, MCAR missingness,
and outliers into the numeric feature columns of a snapshot DataFrame,
parameterized by a scheme's :class:`~leadforge.schema.features.FeatureSpec`
catalog. Extracted from the lead-scoring snapshot builder (verbatim op order
and RNG substream, so existing outputs stay byte-identical) so the lifecycle
scheme can share it.

Known wart (inherited, locked by byte-identity with shipped lead-scoring
bundles): missingness injection converts an Int64 column to Float64 **only if
at least one of its cells is masked**, so the post-distortion dtype of integer
columns varies with seed and missing_rate. Consumers must not rely on
integer dtypes surviving distortion.

Column eligibility is derived from the feature catalog rather than runtime
dtype sniffing — categoricals, booleans, IDs, and target columns are never
distorted even if their runtime dtype happens to be numeric. Callers exempt
pedagogical leakage-trap columns explicitly (distorting a trap muddies the
lesson the trap exists to teach).
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from leadforge.core.rng import RNGRoot

if TYPE_CHECKING:
from collections.abc import Sequence

import pandas as pd

from leadforge.core.models import DifficultyParams
from leadforge.schema.features import FeatureSpec

__all__ = ["apply_difficulty_distortions"]

_FLOAT_DTYPES = ("Float64", "float64")
_NUMERIC_DTYPES = ("Float64", "float64", "Int64", "int64")


def apply_difficulty_distortions(
df: pd.DataFrame,
params: DifficultyParams,
seed: int,
*,
feature_specs: Sequence[FeatureSpec],
exempt_cols: frozenset[str] = frozenset(),
rng_substream: str = "snapshot_distortions",
) -> pd.DataFrame:
"""Apply noise, missingness, and outliers to numeric snapshot features.

Args:
df: The snapshot table. Not mutated — a new DataFrame is returned.
params: Difficulty knobs (``noise_scale``, ``missing_rate``,
``outlier_rate``); a knob at 0 disables that distortion.
seed: Seed for the distortion RNG substream. Pass the generation
seed so distortions are deterministic per run.
feature_specs: The scheme's snapshot feature catalog. Float-dtyped,
non-target, non-exempt features receive noise and outliers; all
numeric non-target, non-exempt features receive missingness.
Targets are never distorted.
exempt_cols: Columns excluded from every distortion — deliberate
leakage traps whose signal must survive intact.
rng_substream: Name of the numpy child stream. Schemes with multiple
distortion call sites must use distinct names.

Returns:
A distorted copy of *df*.
"""
float_distortion_cols = [
f.name
for f in feature_specs
if f.dtype in _FLOAT_DTYPES and not f.is_target and f.name not in exempt_cols
]
numeric_distortion_cols = [
f.name
for f in feature_specs
if f.dtype in _NUMERIC_DTYPES and not f.is_target and f.name not in exempt_cols
]
# Post-noise physical-range clamps, derived from FeatureSpec.non_negative
# so the lists stay in sync automatically when features are added/renamed.
# Targets and exempt columns are excluded like the distortion lists above:
# they never receive noise, so clamping them was always a no-op — but the
# "targets are never distorted" contract should hold by construction, not
# by coincidence.
nonneg_float_cols = frozenset(
f.name
for f in feature_specs
if f.dtype in _FLOAT_DTYPES
and f.non_negative
and not f.is_target
and f.name not in exempt_cols
)
nonneg_int_cols = frozenset(
f.name
for f in feature_specs
if f.dtype in ("Int64", "int64")
and f.non_negative
and not f.is_target
and f.name not in exempt_cols
)

df = df.copy()
rng_root = RNGRoot(seed)
np_rng = rng_root.numpy_child(rng_substream)

# Filter to columns actually present (guards against feature spec drift).
float_cols = [c for c in float_distortion_cols if c in df.columns]
all_numeric_cols = [c for c in numeric_distortion_cols if c in df.columns]

# 1. Gaussian noise on float features only (avoids int casting issues).
if params.noise_scale > 0:
for col in float_cols:
valid_mask = df[col].notna()
if valid_mask.sum() == 0:
continue
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df))
# Add noise only where values are valid.
values = df[col].copy()
values[valid_mask] = values[valid_mask] + noise[valid_mask.values]
df[col] = values

# 1b. Post-noise clamp to physical ranges.
# Non-negative float columns: clip to >= 0.
for col in nonneg_float_cols:
if col in df.columns and df[col].notna().any():
df[col] = df[col].clip(lower=0)
# Non-negative int columns: clip to >= 0. clip() preserves Int64 dtype.
for col in nonneg_int_cols:
if col in df.columns and df[col].notna().any():
df[col] = df[col].clip(lower=0)

# 2. MCAR missingness injection (all numeric columns).
if params.missing_rate > 0:
mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate
for i, col in enumerate(all_numeric_cols):
col_mask = mask[:, i]
if col_mask.any():
# Convert int columns to float to support NaN.
if df[col].dtype in ("int64", "Int64"):
df[col] = df[col].astype("Float64")
df.loc[col_mask, col] = np.nan
Comment thread
shaypal5 marked this conversation as resolved.

# 3. Outlier injection (float columns only). Uses 5σ to produce values
# clearly distinguishable from natural variation.
if params.outlier_rate > 0:
for col in float_cols:
valid_mask = df[col].notna()
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
col_median = float(df[col].median())
outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate
signs = np_rng.choice([-1, 1], size=len(df)).astype(float)
outlier_values = col_median + signs * 5 * col_std
combined = outlier_mask & valid_mask.values
if combined.any():
df.loc[combined, col] = outlier_values[combined]

return df
94 changes: 16 additions & 78 deletions leadforge/schemes/lead_scoring/render/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from leadforge.core.rng import RNGRoot
from leadforge.render.distortions import apply_difficulty_distortions
from leadforge.schemes.lead_scoring.entities import (
OpportunityRow,
SalesActivityRow,
Expand Down Expand Up @@ -349,14 +348,16 @@ def build_snapshot(
# Difficulty distortion helpers
# ---------------------------------------------------------------------------

# Derive eligible columns from the feature spec rather than runtime dtype
# sniffing. This guarantees categoricals, booleans, IDs, and labels are
# never distorted even if their runtime dtype happens to be numeric.
# The distortion algorithm itself is scheme-agnostic and lives in
# leadforge.render.distortions (extracted in LTV-Pl, byte-identical for this
# scheme). The lead-scoring-specific knowledge kept here: which columns are
# exempt, and the catalog the column lists derive from.

# total_touches_all is a pedagogical leakage trap — distorting it muddies the
# lesson (up to 18% NaN on Advanced tier hides the trap). Exempt it explicitly.
_DISTORTION_EXEMPT_COLS: frozenset[str] = frozenset({"total_touches_all"})

# Derived column lists, retained for tests and introspection.
_FLOAT_DISTORTION_COLS: list[str] = [
f.name
for f in LEAD_SNAPSHOT_FEATURES
Expand All @@ -372,17 +373,6 @@ def build_snapshot(
and f.name not in _DISTORTION_EXEMPT_COLS
]

# Post-noise physical-range clamps. Applied after Gaussian noise to prevent
# non-physical values (e.g. negative durations, negative counts).
# Derived from FeatureSpec.non_negative so the lists stay in sync automatically
# when features are added or renamed — no manual maintenance required.
_NONNEG_FLOAT_COLS: frozenset[str] = frozenset(
f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Float64", "float64") and f.non_negative
)
_NONNEG_INT_COLS: frozenset[str] = frozenset(
f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Int64", "int64") and f.non_negative
)


def _apply_difficulty_distortions(
df: pd.DataFrame,
Expand All @@ -391,66 +381,14 @@ def _apply_difficulty_distortions(
) -> pd.DataFrame:
"""Apply noise, missingness, and outliers to numeric snapshot features.

Returns a new DataFrame — the input is not mutated.
Returns a new DataFrame — the input is not mutated. Delegates to the
shared scheme-agnostic implementation with this scheme's feature catalog
and exemptions.
"""
df = df.copy()
rng_root = RNGRoot(seed)
np_rng = rng_root.numpy_child("snapshot_distortions")

# Filter to columns actually present (guards against feature spec drift).
float_cols = [c for c in _FLOAT_DISTORTION_COLS if c in df.columns]
all_numeric_cols = [c for c in _NUMERIC_DISTORTION_COLS if c in df.columns]

# 1. Gaussian noise on float features only (avoids int casting issues).
if params.noise_scale > 0:
for col in float_cols:
valid_mask = df[col].notna()
if valid_mask.sum() == 0:
continue
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df))
# Add noise only where values are valid.
values = df[col].copy()
values[valid_mask] = values[valid_mask] + noise[valid_mask.values]
df[col] = values

# 1b. Post-noise clamp to physical ranges.
# Non-negative float columns: clip to >= 0.
for col in _NONNEG_FLOAT_COLS:
if col in df.columns and df[col].notna().any():
df[col] = df[col].clip(lower=0)
# Non-negative int columns: clip to >= 0. clip() preserves Int64 dtype.
for col in _NONNEG_INT_COLS:
if col in df.columns and df[col].notna().any():
df[col] = df[col].clip(lower=0)

# 2. MCAR missingness injection (all numeric columns).
if params.missing_rate > 0:
mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate
for i, col in enumerate(all_numeric_cols):
col_mask = mask[:, i]
if col_mask.any():
# Convert int columns to float to support NaN.
if df[col].dtype in ("int64", "Int64"):
df[col] = df[col].astype("Float64")
df.loc[col_mask, col] = np.nan

# 3. Outlier injection (float columns only). Uses 5σ to produce values
# clearly distinguishable from natural variation.
if params.outlier_rate > 0:
for col in float_cols:
valid_mask = df[col].notna()
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
col_median = float(df[col].median())
outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate
signs = np_rng.choice([-1, 1], size=len(df)).astype(float)
outlier_values = col_median + signs * 5 * col_std
combined = outlier_mask & valid_mask.values
if combined.any():
df.loc[combined, col] = outlier_values[combined]

return df
return apply_difficulty_distortions(
df,
params,
seed,
feature_specs=LEAD_SNAPSHOT_FEATURES,
exempt_cols=_DISTORTION_EXEMPT_COLS,
)
Loading
Loading