diff --git a/.agent-plan.md b/.agent-plan.md index 6c63ee0..3fc7c97 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -76,9 +76,12 @@ drops the lead-scoring `world_graph` param for `generation_scheme` / as **#121** (merged). `LTV-Pn.2` (scheme-agnostic `WorldBundle` — `artifacts: Any`; `apply_exposure` dispatches hidden truth to a `GenerationScheme.write_metadata` hook; cleanups #2 + #3 discharged; -lead-scoring byte-identical both modes) opened as **#122**. Next: `Pn.3` -(lifecycle config + regression task model), `Pn.4` (complete `LifecycleScheme` -+ shared bundle orchestrator + e2e bundle), `LTV-Po` (recipe). +lead-scoring byte-identical both modes) opened as **#122** (merged). `LTV-Pn.3` (lifecycle `GenerationConfig` fields + +validated regression `TaskManifest` `task_type` + shared `render/tasks.py` +split writer + `schemes/lifecycle/tasks.py` task families; discharges the +`LTV-Pc` regression-task-spec leftover) opened as **#124**. Next: `Pn.4` +(complete `LifecycleScheme.build_world`/`write_bundle` + shared bundle +orchestrator + e2e bundle), `LTV-Po` (recipe). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index 88d5b87..56356ed 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -46,7 +46,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | | `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) | -| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2) | +| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3) | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | | `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | @@ -72,7 +72,7 @@ Total: ~19 PRs across 9 milestones. Lead-scoring catalog untouched. (These rows relocate into `schemes/lifecycle/` during `LTV-M2`.) - Labels: `type: feature`, `layer: schema` -- [~] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`. +- [x] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`. **Feature-catalog half discharged in `LTV-Pl` (#119):** `CUSTOMER_SNAPSHOT_FEATURES` (three `ltv_revenue_{90,365,730}d` targets, the secondary `churned_within_180d`, the `mrr_change_full_period` trap) is @@ -81,7 +81,8 @@ Total: ~19 PRs across 9 milestones. scope (folds into `LTV-Pn`):** regression task specs + a `task_type` (`regression` | `classification`) on the task model — they belong with the task-split writer's continuous-target path. - - Tests: feature-spec invariants ✓ (#119); regression task-spec shape → `LTV-Pn`. + - Tests: feature-spec invariants ✓ (#119); regression task-spec shape ✓ + (#124, `LTV-Pn.3`). **`LTV-Pc` fully discharged.** - Labels: `type: feature`, `layer: schema` --- @@ -297,11 +298,19 @@ pipeline + schema bump). Split into four sub-PRs in dependency order: scheme's `write_bundle` in hand*; building it now against one scheme would guess the hook shape. - Labels: `type: refactor`, `layer: api`, `layer: core`, `layer: render` -- [ ] **`LTV-Pn.3`** — `feat: lifecycle config + regression task model`. Add - `n_customers` + lifecycle config (forward windows, early-tenure, observation - anchor) to `GenerationConfig` (validated); add a regression `task_type` - (`regression` | `classification`) to `TaskManifest` + a continuous-target - split writer (the `LTV-Pc` / `LTV-Pl` / `LTV-Pm` deferral). No e2e yet. +- [x] **`LTV-Pn.3`** — `feat: lifecycle config + regression task model` + (**PR #124**). `GenerationConfig` gains validated lifecycle fields + (`n_customers`, `forward_windows_days`, `early_tenure_weeks`, + `observation_date`). `TaskManifest` gains a validated `task_type` + (`VALID_TASK_TYPES = {binary_classification, regression}`) and target-agnostic + docs. The deterministic split writer is lifted to the shared envelope + (`leadforge/render/tasks.py`, byte-identical; lead-scoring delegates) so it + serves continuous pLTV targets. `schemes/lifecycle/tasks.py` defines the + per-regime task families (3 `pltv_revenue_*` regression + `churned_within_180d` + classification, `early_`-prefixed for the tenure regime) — **completing the + `LTV-Pc` regression-task-spec deferral**. Data definitions only; wiring is + Pn.4. Lead-scoring data byte-identical (only `world_spec.json` gains the new + config fields, by design). - Labels: `type: feature`, `layer: api`, `layer: schema`, `layer: render` - [ ] **`LTV-Pn.4`** — `feat(lifecycle): complete LifecycleScheme + e2e bundle`. Implement `LifecycleScheme.build_world` (population → sim) and `write_bundle` diff --git a/leadforge/core/models.py b/leadforge/core/models.py index 6957c18..16bbba2 100644 --- a/leadforge/core/models.py +++ b/leadforge/core/models.py @@ -73,6 +73,32 @@ class GenerationConfig: package_version: str = field(default_factory=lambda: __version__) difficulty_params: DifficultyParams | None = None + # --- lifecycle scheme (b2b_saas_ltv_v1) config ------------------------- + # Consumed only by the lifecycle generation scheme; the lead-scoring scheme + # ignores these. They live on the shared config (like ``n_leads`` / + # ``snapshot_day`` do for lead-scoring) so recipe/CLI resolution stays + # uniform across schemes. A nested per-scheme config is a possible future + # refactor; kept flat here to match the existing precedent. + # + # NOTE: these are not threaded into the lifecycle pipeline yet — that wiring + # is LTV-Pn.4, at which point this config becomes the source of truth and + # overrides the scheme's module-level defaults. Until then the scheme's own + # constants are authoritative. ``forward_windows_days`` / ``early_tenure_weeks`` + # intentionally duplicate ``schemes.lifecycle.snapshots.FORWARD_WINDOWS_DAYS`` + # / ``DEFAULT_EARLY_TENURE_WEEKS`` (core must not import a scheme — see the + # LTV-Pn.2 layering cleanup), so a cross-layer test + # (tests/schemes/lifecycle/test_config_consistency.py) pins the defaults + # equal to guard against drift. + n_customers: int = 1500 + # pLTV forward-window targets, in days (D6): ltv_revenue_{90,365,730}d. + forward_windows_days: tuple[int, ...] = (90, 365, 730) + # Tenure anchor (whole weeks) for the early-pLTV regime (D8). + early_tenure_weeks: int = 4 + # Absolute calendar observation anchor (ISO date) for the calendar regime + # (D4). ``None`` lets the population builder derive it from the world + # calendar. + observation_date: str | None = None + def __post_init__(self) -> None: if isinstance(self.seed, bool) or not isinstance(self.seed, int): raise InvalidConfigError(f"seed must be an int, got {type(self.seed).__name__!r}") @@ -135,6 +161,44 @@ def __post_init__(self) -> None: f"difficulty has invalid value {self.difficulty!r}. " f"Valid values: {[d.value for d in DifficultyProfile]}" ) from exc + self._validate_lifecycle_fields() + + def _validate_lifecycle_fields(self) -> None: + """Validate the lifecycle-scheme config fields. + + Kept separate from the main body for readability; these constrain only + the lifecycle fields and never touch the lead-scoring path. + """ + _require_positive_int(self.n_customers, "n_customers") + _require_positive_int(self.early_tenure_weeks, "early_tenure_weeks") + + windows = self.forward_windows_days + if not isinstance(windows, tuple) or not windows: + raise InvalidConfigError( + f"forward_windows_days must be a non-empty tuple, got {windows!r}" + ) + for w in windows: + _require_positive_int(w, "forward_windows_days entry") + if list(windows) != sorted(set(windows)): + raise InvalidConfigError( + f"forward_windows_days must be strictly increasing and unique, got {windows!r}" + ) + + if self.observation_date is not None: + if not isinstance(self.observation_date, str): + raise InvalidConfigError( + f"observation_date must be an ISO date string or None, " + f"got {type(self.observation_date).__name__!r}" + ) + from datetime import date + + try: + date.fromisoformat(self.observation_date) + except ValueError as exc: + raise InvalidConfigError( + f"observation_date must be an ISO date (YYYY-MM-DD), " + f"got {self.observation_date!r}" + ) from exc @dataclass diff --git a/leadforge/render/tasks.py b/leadforge/render/tasks.py new file mode 100644 index 0000000..9e76df1 --- /dev/null +++ b/leadforge/render/tasks.py @@ -0,0 +1,87 @@ +"""Scheme-agnostic task export — deterministic train/valid/test split + Parquet. + +:func:`write_task_splits` shuffles a snapshot DataFrame deterministically, +splits it by the task manifest's ratios, and writes ``train``/``valid``/``test`` +Parquet files plus ``task_manifest.json`` into the task directory. + +The split logic is target-agnostic: it never inspects the label/target column, +so it serves both classification labels (lead-scoring ``converted_within_90_days``, +lifecycle secondary churn) and continuous regression targets (lifecycle pLTV +``ltv_revenue_*``). Each scheme passes its own :class:`~leadforge.schema.tasks.TaskManifest`. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +from leadforge.core.rng import RNGRoot + +if TYPE_CHECKING: + from pathlib import Path + + import pandas as pd + + from leadforge.schema.tasks import TaskManifest + +__all__ = ["write_task_splits"] + + +def write_task_splits( + snapshot: pd.DataFrame, + out_dir: Path, + *, + seed: int, + task: TaskManifest, +) -> dict[str, int]: + """Shuffle, split, and write snapshot Parquet files for *task*. + + Files written under ``out_dir / task.task_id /``:: + + train.parquet + valid.parquet + test.parquet + task_manifest.json + + Args: + snapshot: The task's source snapshot DataFrame. + out_dir: Parent directory for task outputs (typically + ``bundle_root / "tasks"``). + seed: Seed used for the deterministic row shuffle. + task: Task manifest describing the split ratios, target column, and + task type. + + Returns: + Dict mapping split name (``"train"``, ``"valid"``, ``"test"``) to the + number of rows written. + """ + task_dir = out_dir / task.task_id + task_dir.mkdir(parents=True, exist_ok=True) + + # Deterministic shuffle via the project's RNG substream system. + rng = RNGRoot(seed).child("task_split_shuffle") + indices = list(range(len(snapshot))) + rng.shuffle(indices) + shuffled = snapshot.iloc[indices].reset_index(drop=True) + + n = len(shuffled) + n_train = int(n * task.split.train) + n_valid = int(n * task.split.valid) + + splits: dict[str, pd.DataFrame] = { + "train": shuffled.iloc[:n_train], + "valid": shuffled.iloc[n_train : n_train + n_valid], + "test": shuffled.iloc[n_train + n_valid :], # remainder avoids rounding off-by-one + } + + row_counts: dict[str, int] = {} + for split_name, df in splits.items(): + path = task_dir / f"{split_name}.parquet" + df.to_parquet(path, index=False, engine="pyarrow") + row_counts[split_name] = len(df) + + # Write task_manifest.json alongside the Parquet files. + manifest_path = task_dir / "task_manifest.json" + manifest_path.write_text(json.dumps(task.to_dict(), indent=2)) + + return row_counts diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index 0a2d6ef..7ed58b5 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -47,22 +47,38 @@ def __post_init__(self) -> None: raise ValueError(f"SplitSpec fractions must sum to 1.0, got {total:.6f}") +#: ML task types a :class:`TaskManifest` may declare. ``binary_classification`` +#: covers the lead-scoring ``converted_within_90_days`` label and the lifecycle +#: secondary churn label; ``regression`` covers the continuous pLTV +#: ``ltv_revenue_*`` targets (D1). +VALID_TASK_TYPES: frozenset[str] = frozenset({"binary_classification", "regression"}) + + @dataclass(frozen=True) class TaskManifest: """Immutable descriptor for one ML task exported from a bundle. + Serves both classification and regression tasks; ``task_type`` distinguishes + them and ``label_column`` names the target either way. + Attributes: - task_id: Machine-readable task identifier. - label_column: Column name in the task Parquet files that holds the - binary label. - label_window_days: Number of days after the snapshot anchor date - within which the target event counts as positive. + task_id: Machine-readable task identifier (also the task directory name, + so it must be unique within a bundle). + label_column: Column in the task Parquet files holding the target — a + binary label for ``binary_classification`` or a continuous value + for ``regression``. + label_window_days: Forward window in days that defines the target — the + positive-event window for a classification label, or the + revenue-accumulation horizon for a pLTV regression target. primary_table: The relational table the snapshot rows are derived - from (usually ``"leads"``). + from (e.g. ``"leads"`` / ``"customers"``). split: Train/valid/test proportions. - task_type: ML task type string (``"binary_classification"`` for v1). + task_type: One of :data:`VALID_TASK_TYPES`. description: Human-readable description of the task, suitable for display in dataset cards and documentation. + + Raises: + ValueError: if ``task_type`` is not in :data:`VALID_TASK_TYPES`. """ task_id: str @@ -73,6 +89,12 @@ class TaskManifest: task_type: str = "binary_classification" description: str = "" + def __post_init__(self) -> None: + if self.task_type not in VALID_TASK_TYPES: + raise ValueError( + f"task_type must be one of {sorted(VALID_TASK_TYPES)}, got {self.task_type!r}" + ) + def to_dict(self) -> dict[str, object]: """Return a JSON-serializable representation.""" return { diff --git a/leadforge/schemes/lead_scoring/render/tasks.py b/leadforge/schemes/lead_scoring/render/tasks.py index 8a69882..a431189 100644 --- a/leadforge/schemes/lead_scoring/render/tasks.py +++ b/leadforge/schemes/lead_scoring/render/tasks.py @@ -1,20 +1,26 @@ -"""Task export — deterministic train/valid/test split and Parquet output. +"""Lead-scoring task export — thin wrapper over the shared split writer. -:func:`write_task_splits` takes the lead snapshot DataFrame, shuffles it -deterministically, splits it according to the task manifest ratios, and -writes the three Parquet files plus a ``task_manifest.json`` into the -tasks directory. +The deterministic shuffle/split/write logic is scheme-agnostic and lives in +:func:`leadforge.render.tasks.write_task_splits` (lifted there in LTV-Pn.3, +byte-identical for this scheme). This wrapper preserves the lead-scoring +default task so existing call sites are unchanged. """ from __future__ import annotations -import json -from pathlib import Path +from typing import TYPE_CHECKING -import pandas as pd +from leadforge.render.tasks import write_task_splits as _write_task_splits +from leadforge.schemes.lead_scoring.tasks import CONVERTED_WITHIN_90_DAYS -from leadforge.core.rng import RNGRoot -from leadforge.schemes.lead_scoring.tasks import CONVERTED_WITHIN_90_DAYS, TaskManifest +if TYPE_CHECKING: + from pathlib import Path + + import pandas as pd + + from leadforge.schema.tasks import TaskManifest + +__all__ = ["write_task_splits"] def write_task_splits( @@ -24,54 +30,8 @@ def write_task_splits( seed: int, task: TaskManifest = CONVERTED_WITHIN_90_DAYS, ) -> dict[str, int]: - """Shuffle, split, and write snapshot Parquet files for *task*. - - Files written under ``out_dir / task.task_id /``:: - - train.parquet - valid.parquet - test.parquet - task_manifest.json - - Args: - snapshot: Lead snapshot DataFrame from - :func:`~leadforge.schemes.lead_scoring.render.snapshots.build_snapshot`. - out_dir: Parent directory for task outputs (typically - ``bundle_root / "tasks"``). - seed: Seed used for deterministic row shuffle. - task: Task manifest describing the split ratios and label column. + """Write lead-scoring task splits (see :func:`leadforge.render.tasks.write_task_splits`). - Returns: - Dict mapping split name (``"train"``, ``"valid"``, ``"test"``) to - the number of rows written. + Defaults ``task`` to :data:`CONVERTED_WITHIN_90_DAYS` for this scheme. """ - task_dir = out_dir / task.task_id - task_dir.mkdir(parents=True, exist_ok=True) - - # Deterministic shuffle via the project's RNG substream system. - rng = RNGRoot(seed).child("task_split_shuffle") - indices = list(range(len(snapshot))) - rng.shuffle(indices) - shuffled = snapshot.iloc[indices].reset_index(drop=True) - - n = len(shuffled) - n_train = int(n * task.split.train) - n_valid = int(n * task.split.valid) - - splits: dict[str, pd.DataFrame] = { - "train": shuffled.iloc[:n_train], - "valid": shuffled.iloc[n_train : n_train + n_valid], - "test": shuffled.iloc[n_train + n_valid :], # remainder avoids rounding off-by-one - } - - row_counts: dict[str, int] = {} - for split_name, df in splits.items(): - path = task_dir / f"{split_name}.parquet" - df.to_parquet(path, index=False, engine="pyarrow") - row_counts[split_name] = len(df) - - # Write task_manifest.json alongside the Parquet files. - manifest_path = task_dir / "task_manifest.json" - manifest_path.write_text(json.dumps(task.to_dict(), indent=2)) - - return row_counts + return _write_task_splits(snapshot, out_dir, seed=seed, task=task) diff --git a/leadforge/schemes/lifecycle/tasks.py b/leadforge/schemes/lifecycle/tasks.py new file mode 100644 index 0000000..fe1086e --- /dev/null +++ b/leadforge/schemes/lifecycle/tasks.py @@ -0,0 +1,102 @@ +"""Lifecycle (``b2b_saas_ltv_v1``) task definitions — pLTV regression + churn. + +Each observation regime (design.md §3.1) exports one task family: + +- **calendar-anchored** (standard): task ids ``pltv_revenue_{90,365,730}d`` + + ``churned_within_180d``. +- **tenure-anchored** (early-pLTV, D8): the same set prefixed ``early_`` so the + two families occupy separate task directories within one bundle. + +The three ``pltv_revenue_*`` tasks are **regression** (continuous, ZILN-shaped +gross-revenue targets, D1); ``churned_within_180d`` is the secondary +**binary_classification** task (D9). Target columns and windows mirror the +snapshot catalog (:data:`~leadforge.schemes.lifecycle.snapshots.FORWARD_WINDOWS_DAYS` +/ :data:`~leadforge.schemes.lifecycle.snapshots.CHURN_WINDOW_DAYS`) so the task +specs and the snapshot columns can never drift. + +These are data definitions only; wiring them into the bundle writer is LTV-Pn.4. +""" + +from __future__ import annotations + +from leadforge.schema.tasks import SplitSpec, TaskManifest +from leadforge.schemes.lifecycle.snapshots import CHURN_WINDOW_DAYS, FORWARD_WINDOWS_DAYS + +__all__ = [ + "CALENDAR_REGIME", + "EARLY_REGIME", + "lifecycle_task_manifests", +] + +CALENDAR_REGIME = "calendar" +EARLY_REGIME = "early" + +# Shared split ratios across all lifecycle tasks (matches the lead-scoring task). +_SPLIT = SplitSpec(train=0.7, valid=0.15, test=0.15) + +# Per-regime task-id prefix. The calendar (standard) regime is unprefixed; the +# early-pLTV regime is ``early_`` so both families coexist in one bundle. +_REGIME_PREFIX = {CALENDAR_REGIME: "", EARLY_REGIME: "early_"} + +_PRIMARY_TABLE = "customers" + + +def lifecycle_task_manifests(regime: str) -> tuple[TaskManifest, ...]: + """Return the pLTV regression + churn task manifests for *regime*. + + Args: + regime: :data:`CALENDAR_REGIME` or :data:`EARLY_REGIME`. + + Returns: + One :class:`~leadforge.schema.tasks.TaskManifest` per forward window + (regression) plus the secondary churn classification task. + + Raises: + ValueError: if *regime* is not a known regime. + """ + if regime not in _REGIME_PREFIX: + raise ValueError(f"unknown regime {regime!r}; expected one of {sorted(_REGIME_PREFIX)}") + prefix = _REGIME_PREFIX[regime] + anchor = ( + "the fixed observation date" + if regime == CALENDAR_REGIME + else "each customer's tenure anchor (customer_start + early_tenure_weeks)" + ) + + tasks: list[TaskManifest] = [] + for window in FORWARD_WINDOWS_DAYS: + tasks.append( + TaskManifest( + task_id=f"{prefix}pltv_revenue_{window}d", + label_column=f"ltv_revenue_{window}d", + label_window_days=window, + primary_table=_PRIMARY_TABLE, + split=_SPLIT, + task_type="regression", + description=( + f"Predict gross revenue (paid + recovered invoices) in the " + f"{window} days after {anchor}. Continuous, zero-inflated, " + f"right-skewed pLTV regression target. All features are " + f"computed at or before the cutoff (leakage-free by " + f"construction, except the documented mrr_change_full_period " + f"trap)." + ), + ) + ) + + tasks.append( + TaskManifest( + task_id=f"{prefix}churned_within_180d", + label_column="churned_within_180d", + label_window_days=CHURN_WINDOW_DAYS, + primary_table=_PRIMARY_TABLE, + split=_SPLIT, + task_type="binary_classification", + description=( + f"Secondary task: whether the customer churns within " + f"{CHURN_WINDOW_DAYS} days after {anchor}. Doubles as the " + f"ZILN zero-inflation indicator for the pLTV targets." + ), + ) + ) + return tuple(tasks) diff --git a/tests/core/test_config_lifecycle_fields.py b/tests/core/test_config_lifecycle_fields.py new file mode 100644 index 0000000..0797147 --- /dev/null +++ b/tests/core/test_config_lifecycle_fields.py @@ -0,0 +1,73 @@ +"""Tests for the lifecycle-scheme fields on GenerationConfig (LTV-Pn.3).""" + +from __future__ import annotations + +import pytest + +from leadforge.core.exceptions import InvalidConfigError +from leadforge.core.models import GenerationConfig + + +def test_defaults() -> None: + c = GenerationConfig() + assert c.n_customers == 1500 + assert c.forward_windows_days == (90, 365, 730) + assert c.early_tenure_weeks == 4 + assert c.observation_date is None + + +def test_accepts_valid_overrides() -> None: + c = GenerationConfig( + n_customers=500, + forward_windows_days=(30, 90), + early_tenure_weeks=8, + observation_date="2026-06-01", + ) + assert c.n_customers == 500 + assert c.forward_windows_days == (30, 90) + assert c.early_tenure_weeks == 8 + assert c.observation_date == "2026-06-01" + + +@pytest.mark.parametrize("bad", [0, -1, True]) +def test_rejects_bad_n_customers(bad) -> None: + with pytest.raises(InvalidConfigError, match="n_customers"): + GenerationConfig(n_customers=bad) + + +@pytest.mark.parametrize("bad", [0, -4, True]) +def test_rejects_bad_early_tenure(bad) -> None: + with pytest.raises(InvalidConfigError, match="early_tenure_weeks"): + GenerationConfig(early_tenure_weeks=bad) + + +def test_rejects_empty_windows() -> None: + with pytest.raises(InvalidConfigError, match="non-empty tuple"): + GenerationConfig(forward_windows_days=()) + + +def test_rejects_nonpositive_window_entry() -> None: + with pytest.raises(InvalidConfigError, match="forward_windows_days entry"): + GenerationConfig(forward_windows_days=(90, 0, 365)) + + +def test_rejects_unsorted_or_duplicate_windows() -> None: + with pytest.raises(InvalidConfigError, match="strictly increasing"): + GenerationConfig(forward_windows_days=(365, 90)) + with pytest.raises(InvalidConfigError, match="strictly increasing"): + GenerationConfig(forward_windows_days=(90, 90, 365)) + + +def test_rejects_bad_observation_date() -> None: + with pytest.raises(InvalidConfigError, match="ISO date"): + GenerationConfig(observation_date="06/01/2026") + with pytest.raises(InvalidConfigError, match="observation_date"): + GenerationConfig(observation_date=20260601) # type: ignore[arg-type] + + +def test_lead_scoring_path_unaffected_by_defaults() -> None: + # The lead-scoring scheme ignores the lifecycle fields; a default config + # still constructs and the lifecycle fields carry their documented defaults. + c = GenerationConfig(n_leads=100, snapshot_day=30) + assert c.n_leads == 100 + assert c.forward_windows_days == (90, 365, 730) diff --git a/tests/render/test_shared_task_writer.py b/tests/render/test_shared_task_writer.py new file mode 100644 index 0000000..89a426b --- /dev/null +++ b/tests/render/test_shared_task_writer.py @@ -0,0 +1,67 @@ +"""Tests for the scheme-agnostic task-split writer (LTV-Pn.3).""" + +from __future__ import annotations + +import json + +import pandas as pd + +from leadforge.render.tasks import write_task_splits +from leadforge.schema.tasks import SplitSpec, TaskManifest + + +def _regression_task() -> TaskManifest: + return TaskManifest( + task_id="pltv_revenue_365d", + label_column="ltv_revenue_365d", + label_window_days=365, + primary_table="customers", + split=SplitSpec(0.7, 0.15, 0.15), + task_type="regression", + description="continuous target", + ) + + +def test_writes_splits_and_manifest_for_continuous_target(tmp_path) -> None: + df = pd.DataFrame( + { + "customer_id": [f"cust_{i:03d}" for i in range(100)], + "ltv_revenue_365d": [float(i) * 1.5 for i in range(100)], + } + ) + task = _regression_task() + counts = write_task_splits(df, tmp_path, seed=42, task=task) + + task_dir = tmp_path / "pltv_revenue_365d" + for split in ("train", "valid", "test"): + assert (task_dir / f"{split}.parquet").exists() + assert sum(counts.values()) == 100 + assert counts["train"] == 70 + + manifest = json.loads((task_dir / "task_manifest.json").read_text()) + assert manifest["task_type"] == "regression" + assert manifest["label_column"] == "ltv_revenue_365d" + + +def test_continuous_target_values_preserved(tmp_path) -> None: + # The writer is target-agnostic: it must not coerce/round the continuous + # target — the union of split values equals the input set. + df = pd.DataFrame({"id": range(50), "ltv_revenue_365d": [i + 0.25 for i in range(50)]}) + write_task_splits(df, tmp_path, seed=7, task=_regression_task()) + task_dir = tmp_path / "pltv_revenue_365d" + recombined = pd.concat( + [pd.read_parquet(task_dir / f"{s}.parquet") for s in ("train", "valid", "test")] + ) + assert set(recombined["ltv_revenue_365d"]) == set(df["ltv_revenue_365d"]) + + +def test_deterministic_given_seed(tmp_path) -> None: + df = pd.DataFrame({"id": range(40), "ltv_revenue_365d": [float(i) for i in range(40)]}) + a = tmp_path / "a" + b = tmp_path / "b" + write_task_splits(df, a, seed=11, task=_regression_task()) + write_task_splits(df, b, seed=11, task=_regression_task()) + for split in ("train", "valid", "test"): + left = pd.read_parquet(a / "pltv_revenue_365d" / f"{split}.parquet") + right = pd.read_parquet(b / "pltv_revenue_365d" / f"{split}.parquet") + pd.testing.assert_frame_equal(left, right) diff --git a/tests/schemes/lifecycle/test_config_consistency.py b/tests/schemes/lifecycle/test_config_consistency.py new file mode 100644 index 0000000..c9d9d8b --- /dev/null +++ b/tests/schemes/lifecycle/test_config_consistency.py @@ -0,0 +1,38 @@ +"""Pin the lifecycle config defaults to the scheme's canonical constants. + +``GenerationConfig`` lives in the shared ``core`` layer, which must not import a +scheme (the LTV-Pn.2 layering cleanup). So the lifecycle window / tenure +defaults are *duplicated* literals: one copy on ``GenerationConfig`` and the +authoritative copy in ``schemes.lifecycle``. Until LTV-Pn.4 threads the config +through, these must stay numerically equal — otherwise a bundle generated with +config defaults would carry windows/tenure that disagree with the columns the +snapshot builder actually produces. This test (which, unlike ``core``, *may* +import both layers) is the guard against that drift. +""" + +from __future__ import annotations + +import inspect + +from leadforge.core.models import GenerationConfig +from leadforge.schemes.lifecycle.engine import simulate_lifecycle +from leadforge.schemes.lifecycle.snapshots import ( + DEFAULT_EARLY_TENURE_WEEKS, + FORWARD_WINDOWS_DAYS, +) + + +def test_config_forward_windows_match_snapshot_constant() -> None: + assert GenerationConfig().forward_windows_days == FORWARD_WINDOWS_DAYS + + +def test_config_early_tenure_matches_snapshot_constant() -> None: + assert GenerationConfig().early_tenure_weeks == DEFAULT_EARLY_TENURE_WEEKS + + +def test_engine_early_tenure_default_matches_snapshot_constant() -> None: + # The engine carries its own early_tenure_weeks default (the horizon it + # simulates); it must agree with the snapshot anchor default so a + # default-config run is fully covered. + default = inspect.signature(simulate_lifecycle).parameters["early_tenure_weeks"].default + assert default == DEFAULT_EARLY_TENURE_WEEKS diff --git a/tests/schemes/lifecycle/test_tasks.py b/tests/schemes/lifecycle/test_tasks.py new file mode 100644 index 0000000..26a20b1 --- /dev/null +++ b/tests/schemes/lifecycle/test_tasks.py @@ -0,0 +1,101 @@ +"""Tests for the lifecycle task manifests + regression task model (LTV-Pn.3).""" + +from __future__ import annotations + +import pytest + +from leadforge.schema.tasks import VALID_TASK_TYPES, SplitSpec, TaskManifest +from leadforge.schemes.lifecycle.snapshots import CHURN_WINDOW_DAYS, FORWARD_WINDOWS_DAYS +from leadforge.schemes.lifecycle.tasks import ( + CALENDAR_REGIME, + EARLY_REGIME, + lifecycle_task_manifests, +) + +# --------------------------------------------------------------------------- +# TaskManifest regression support +# --------------------------------------------------------------------------- + + +def test_regression_is_a_valid_task_type() -> None: + assert "regression" in VALID_TASK_TYPES + t = TaskManifest( + task_id="x", + label_column="y", + label_window_days=365, + primary_table="customers", + split=SplitSpec(0.7, 0.15, 0.15), + task_type="regression", + ) + assert t.task_type == "regression" + assert t.to_dict()["task_type"] == "regression" + + +def test_invalid_task_type_rejected() -> None: + with pytest.raises(ValueError, match="task_type must be one of"): + TaskManifest( + task_id="x", + label_column="y", + label_window_days=1, + primary_table="t", + split=SplitSpec(0.7, 0.15, 0.15), + task_type="ranking", + ) + + +# --------------------------------------------------------------------------- +# Lifecycle task families +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("regime", [CALENDAR_REGIME, EARLY_REGIME]) +def test_family_shape(regime: str) -> None: + tasks = lifecycle_task_manifests(regime) + # One regression task per forward window + one churn classification. + assert len(tasks) == len(FORWARD_WINDOWS_DAYS) + 1 + regression = [t for t in tasks if t.task_type == "regression"] + classification = [t for t in tasks if t.task_type == "binary_classification"] + assert len(regression) == len(FORWARD_WINDOWS_DAYS) + assert len(classification) == 1 + + +@pytest.mark.parametrize("regime", [CALENDAR_REGIME, EARLY_REGIME]) +def test_targets_match_snapshot_columns(regime: str) -> None: + tasks = {t.task_id: t for t in lifecycle_task_manifests(regime)} + for window in FORWARD_WINDOWS_DAYS: + reg = next( + t + for t in tasks.values() + if t.label_window_days == window and t.task_type == "regression" + ) + assert reg.label_column == f"ltv_revenue_{window}d" + churn = next(t for t in tasks.values() if t.task_type == "binary_classification") + assert churn.label_column == "churned_within_180d" + assert churn.label_window_days == CHURN_WINDOW_DAYS + + +def test_all_target_customers_table() -> None: + for regime in (CALENDAR_REGIME, EARLY_REGIME): + for t in lifecycle_task_manifests(regime): + assert t.primary_table == "customers" + + +def test_task_ids_unique_across_regimes() -> None: + ids = [ + t.task_id + for regime in (CALENDAR_REGIME, EARLY_REGIME) + for t in lifecycle_task_manifests(regime) + ] + assert len(ids) == len(set(ids)), "task ids collide across regimes (would share a task dir)" + + +def test_early_regime_is_prefixed() -> None: + for t in lifecycle_task_manifests(EARLY_REGIME): + assert t.task_id.startswith("early_") + for t in lifecycle_task_manifests(CALENDAR_REGIME): + assert not t.task_id.startswith("early_") + + +def test_unknown_regime_raises() -> None: + with pytest.raises(ValueError, match="unknown regime"): + lifecycle_task_manifests("monthly") diff --git a/tests/schemes/test_module_layout.py b/tests/schemes/test_module_layout.py index 2f08e31..6120ab2 100644 --- a/tests/schemes/test_module_layout.py +++ b/tests/schemes/test_module_layout.py @@ -26,7 +26,11 @@ "leadforge.render.relational_snapshot_safe", "leadforge.schemes.lead_scoring.render.relational_snapshot_safe", ), - ("leadforge.render.tasks", "leadforge.schemes.lead_scoring.render.tasks"), + # NOTE: ``leadforge.render.tasks`` was vacated by LTV-Pf.2 but deliberately + # *repopulated* in LTV-Pn.3 as the shared, scheme-agnostic split writer + # (see test_render_envelope_package_stays). It is therefore no longer an + # "old path is gone" case — the lead-scoring writer at + # ``schemes.lead_scoring.render.tasks`` is now a thin wrapper over it. ] @@ -55,8 +59,15 @@ def test_render_envelope_package_stays() -> None: # relational.py assembler). import leadforge.render.manifests # noqa: F401 import leadforge.render.relational_io as shared_writer + import leadforge.render.tasks as shared_tasks assert hasattr(shared_writer, "write_relational_tables") + # LTV-Pn.3: the scheme-agnostic task-split writer lives in the shared + # envelope; the lead-scoring module is a thin wrapper that defaults the task. + assert hasattr(shared_tasks, "write_task_splits") + import leadforge.schemes.lead_scoring.render.tasks as ls_tasks + + assert hasattr(ls_tasks, "write_task_splits") def test_relational_split_to_dataframes_moved_to_scheme() -> None: