Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ drops the lead-scoring `world_graph` param for `generation_scheme` /
as **#121** (merged). `LTV-Pn.2` (scheme-agnostic `WorldBundle` — `artifacts:
Any`; `apply_exposure` dispatches hidden truth to a
`GenerationScheme.write_metadata` hook; cleanups #2 + #3 discharged;
lead-scoring byte-identical both modes) opened as **#122**. Next: `Pn.3`
(lifecycle config + regression task model), `Pn.4` (complete `LifecycleScheme`
+ shared bundle orchestrator + e2e bundle), `LTV-Po` (recipe).
lead-scoring byte-identical both modes) opened as **#122** (merged). `LTV-Pn.3` (lifecycle `GenerationConfig` fields +
validated regression `TaskManifest` `task_type` + shared `render/tasks.py`
split writer + `schemes/lifecycle/tasks.py` task families; discharges the
`LTV-Pc` regression-task-spec leftover) opened as **#124**. Next: `Pn.4`
(complete `LifecycleScheme.build_world`/`write_bundle` + shared bundle
orchestrator + e2e bundle), `LTV-Po` (recipe).

---

Expand Down
25 changes: 17 additions & 8 deletions docs/ltv/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protocol + registry, with the package physically reorganized into
| `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) |
| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) |
| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) |
| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2) |
| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3) |
| `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | |
| `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | |

Expand All @@ -72,7 +72,7 @@ Total: ~19 PRs across 9 milestones.
Lead-scoring catalog untouched. (These rows relocate into
`schemes/lifecycle/` during `LTV-M2`.)
- Labels: `type: feature`, `layer: schema`
- [~] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`.
- [x] **`LTV-Pc`** — `feat(schema): pLTV feature spec + regression task specs`.
**Feature-catalog half discharged in `LTV-Pl` (#119):**
`CUSTOMER_SNAPSHOT_FEATURES` (three `ltv_revenue_{90,365,730}d` targets, the
secondary `churned_within_180d`, the `mrr_change_full_period` trap) is
Expand All @@ -81,7 +81,8 @@ Total: ~19 PRs across 9 milestones.
scope (folds into `LTV-Pn`):** regression task specs + a `task_type`
(`regression` | `classification`) on the task model — they belong with the
task-split writer's continuous-target path.
- Tests: feature-spec invariants ✓ (#119); regression task-spec shape → `LTV-Pn`.
- Tests: feature-spec invariants ✓ (#119); regression task-spec shape ✓
(#124, `LTV-Pn.3`). **`LTV-Pc` fully discharged.**
- Labels: `type: feature`, `layer: schema`

---
Expand Down Expand Up @@ -297,11 +298,19 @@ pipeline + schema bump). Split into four sub-PRs in dependency order:
scheme's `write_bundle` in hand*; building it now against one scheme would
guess the hook shape.
- Labels: `type: refactor`, `layer: api`, `layer: core`, `layer: render`
- [ ] **`LTV-Pn.3`** — `feat: lifecycle config + regression task model`. Add
`n_customers` + lifecycle config (forward windows, early-tenure, observation
anchor) to `GenerationConfig` (validated); add a regression `task_type`
(`regression` | `classification`) to `TaskManifest` + a continuous-target
split writer (the `LTV-Pc` / `LTV-Pl` / `LTV-Pm` deferral). No e2e yet.
- [x] **`LTV-Pn.3`** — `feat: lifecycle config + regression task model`
(**PR #124**). `GenerationConfig` gains validated lifecycle fields
(`n_customers`, `forward_windows_days`, `early_tenure_weeks`,
`observation_date`). `TaskManifest` gains a validated `task_type`
(`VALID_TASK_TYPES = {binary_classification, regression}`) and target-agnostic
docs. The deterministic split writer is lifted to the shared envelope
(`leadforge/render/tasks.py`, byte-identical; lead-scoring delegates) so it
serves continuous pLTV targets. `schemes/lifecycle/tasks.py` defines the
per-regime task families (3 `pltv_revenue_*` regression + `churned_within_180d`
classification, `early_`-prefixed for the tenure regime) — **completing the
`LTV-Pc` regression-task-spec deferral**. Data definitions only; wiring is
Pn.4. Lead-scoring data byte-identical (only `world_spec.json` gains the new
config fields, by design).
- Labels: `type: feature`, `layer: api`, `layer: schema`, `layer: render`
- [ ] **`LTV-Pn.4`** — `feat(lifecycle): complete LifecycleScheme + e2e bundle`.
Implement `LifecycleScheme.build_world` (population → sim) and `write_bundle`
Expand Down
64 changes: 64 additions & 0 deletions leadforge/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,32 @@ class GenerationConfig:
package_version: str = field(default_factory=lambda: __version__)
difficulty_params: DifficultyParams | None = None

# --- lifecycle scheme (b2b_saas_ltv_v1) config -------------------------
# Consumed only by the lifecycle generation scheme; the lead-scoring scheme
# ignores these. They live on the shared config (like ``n_leads`` /
# ``snapshot_day`` do for lead-scoring) so recipe/CLI resolution stays
# uniform across schemes. A nested per-scheme config is a possible future
# refactor; kept flat here to match the existing precedent.
#
# NOTE: these are not threaded into the lifecycle pipeline yet — that wiring
# is LTV-Pn.4, at which point this config becomes the source of truth and
# overrides the scheme's module-level defaults. Until then the scheme's own
# constants are authoritative. ``forward_windows_days`` / ``early_tenure_weeks``
# intentionally duplicate ``schemes.lifecycle.snapshots.FORWARD_WINDOWS_DAYS``
# / ``DEFAULT_EARLY_TENURE_WEEKS`` (core must not import a scheme — see the
# LTV-Pn.2 layering cleanup), so a cross-layer test
# (tests/schemes/lifecycle/test_config_consistency.py) pins the defaults
# equal to guard against drift.
n_customers: int = 1500
# pLTV forward-window targets, in days (D6): ltv_revenue_{90,365,730}d.
forward_windows_days: tuple[int, ...] = (90, 365, 730)
# Tenure anchor (whole weeks) for the early-pLTV regime (D8).
early_tenure_weeks: int = 4
# Absolute calendar observation anchor (ISO date) for the calendar regime
# (D4). ``None`` lets the population builder derive it from the world
# calendar.
observation_date: str | None = None

def __post_init__(self) -> None:
if isinstance(self.seed, bool) or not isinstance(self.seed, int):
raise InvalidConfigError(f"seed must be an int, got {type(self.seed).__name__!r}")
Expand Down Expand Up @@ -135,6 +161,44 @@ def __post_init__(self) -> None:
f"difficulty has invalid value {self.difficulty!r}. "
f"Valid values: {[d.value for d in DifficultyProfile]}"
) from exc
self._validate_lifecycle_fields()

def _validate_lifecycle_fields(self) -> None:
"""Validate the lifecycle-scheme config fields.

Kept separate from the main body for readability; these constrain only
the lifecycle fields and never touch the lead-scoring path.
"""
_require_positive_int(self.n_customers, "n_customers")
_require_positive_int(self.early_tenure_weeks, "early_tenure_weeks")

windows = self.forward_windows_days
if not isinstance(windows, tuple) or not windows:
raise InvalidConfigError(
f"forward_windows_days must be a non-empty tuple, got {windows!r}"
)
for w in windows:
_require_positive_int(w, "forward_windows_days entry")
if list(windows) != sorted(set(windows)):
raise InvalidConfigError(
f"forward_windows_days must be strictly increasing and unique, got {windows!r}"
)

if self.observation_date is not None:
if not isinstance(self.observation_date, str):
raise InvalidConfigError(
f"observation_date must be an ISO date string or None, "
f"got {type(self.observation_date).__name__!r}"
)
from datetime import date

try:
date.fromisoformat(self.observation_date)
except ValueError as exc:
raise InvalidConfigError(
f"observation_date must be an ISO date (YYYY-MM-DD), "
f"got {self.observation_date!r}"
) from exc


@dataclass
Expand Down
87 changes: 87 additions & 0 deletions leadforge/render/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Scheme-agnostic task export — deterministic train/valid/test split + Parquet.

:func:`write_task_splits` shuffles a snapshot DataFrame deterministically,
splits it by the task manifest's ratios, and writes ``train``/``valid``/``test``
Parquet files plus ``task_manifest.json`` into the task directory.

The split logic is target-agnostic: it never inspects the label/target column,
so it serves both classification labels (lead-scoring ``converted_within_90_days``,
lifecycle secondary churn) and continuous regression targets (lifecycle pLTV
``ltv_revenue_*``). Each scheme passes its own :class:`~leadforge.schema.tasks.TaskManifest`.
"""

from __future__ import annotations

import json
from typing import TYPE_CHECKING

from leadforge.core.rng import RNGRoot

if TYPE_CHECKING:
from pathlib import Path

import pandas as pd

from leadforge.schema.tasks import TaskManifest

__all__ = ["write_task_splits"]


def write_task_splits(
snapshot: pd.DataFrame,
out_dir: Path,
*,
seed: int,
task: TaskManifest,
) -> dict[str, int]:
"""Shuffle, split, and write snapshot Parquet files for *task*.

Files written under ``out_dir / task.task_id /``::

train.parquet
valid.parquet
test.parquet
task_manifest.json

Args:
snapshot: The task's source snapshot DataFrame.
out_dir: Parent directory for task outputs (typically
``bundle_root / "tasks"``).
seed: Seed used for the deterministic row shuffle.
task: Task manifest describing the split ratios, target column, and
task type.

Returns:
Dict mapping split name (``"train"``, ``"valid"``, ``"test"``) to the
number of rows written.
"""
task_dir = out_dir / task.task_id
task_dir.mkdir(parents=True, exist_ok=True)

# Deterministic shuffle via the project's RNG substream system.
rng = RNGRoot(seed).child("task_split_shuffle")
indices = list(range(len(snapshot)))
rng.shuffle(indices)
shuffled = snapshot.iloc[indices].reset_index(drop=True)

n = len(shuffled)
n_train = int(n * task.split.train)
n_valid = int(n * task.split.valid)

splits: dict[str, pd.DataFrame] = {
"train": shuffled.iloc[:n_train],
"valid": shuffled.iloc[n_train : n_train + n_valid],
"test": shuffled.iloc[n_train + n_valid :], # remainder avoids rounding off-by-one
}

row_counts: dict[str, int] = {}
for split_name, df in splits.items():
path = task_dir / f"{split_name}.parquet"
df.to_parquet(path, index=False, engine="pyarrow")
row_counts[split_name] = len(df)

# Write task_manifest.json alongside the Parquet files.
manifest_path = task_dir / "task_manifest.json"
manifest_path.write_text(json.dumps(task.to_dict(), indent=2))

return row_counts
36 changes: 29 additions & 7 deletions leadforge/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,38 @@ def __post_init__(self) -> None:
raise ValueError(f"SplitSpec fractions must sum to 1.0, got {total:.6f}")


#: ML task types a :class:`TaskManifest` may declare. ``binary_classification``
#: covers the lead-scoring ``converted_within_90_days`` label and the lifecycle
#: secondary churn label; ``regression`` covers the continuous pLTV
#: ``ltv_revenue_*`` targets (D1).
VALID_TASK_TYPES: frozenset[str] = frozenset({"binary_classification", "regression"})


@dataclass(frozen=True)
class TaskManifest:
"""Immutable descriptor for one ML task exported from a bundle.

Serves both classification and regression tasks; ``task_type`` distinguishes
them and ``label_column`` names the target either way.

Attributes:
task_id: Machine-readable task identifier.
label_column: Column name in the task Parquet files that holds the
binary label.
label_window_days: Number of days after the snapshot anchor date
within which the target event counts as positive.
task_id: Machine-readable task identifier (also the task directory name,
so it must be unique within a bundle).
label_column: Column in the task Parquet files holding the target — a
binary label for ``binary_classification`` or a continuous value
for ``regression``.
label_window_days: Forward window in days that defines the target — the
positive-event window for a classification label, or the
revenue-accumulation horizon for a pLTV regression target.
primary_table: The relational table the snapshot rows are derived
from (usually ``"leads"``).
from (e.g. ``"leads"`` / ``"customers"``).
split: Train/valid/test proportions.
task_type: ML task type string (``"binary_classification"`` for v1).
task_type: One of :data:`VALID_TASK_TYPES`.
description: Human-readable description of the task, suitable for
display in dataset cards and documentation.

Raises:
ValueError: if ``task_type`` is not in :data:`VALID_TASK_TYPES`.
"""

task_id: str
Expand All @@ -73,6 +89,12 @@ class TaskManifest:
task_type: str = "binary_classification"
description: str = ""

def __post_init__(self) -> None:
if self.task_type not in VALID_TASK_TYPES:
raise ValueError(
f"task_type must be one of {sorted(VALID_TASK_TYPES)}, got {self.task_type!r}"
)

def to_dict(self) -> dict[str, object]:
"""Return a JSON-serializable representation."""
return {
Expand Down
78 changes: 19 additions & 59 deletions leadforge/schemes/lead_scoring/render/tasks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
"""Task export — deterministic train/valid/test split and Parquet output.
"""Lead-scoring task export — thin wrapper over the shared split writer.

:func:`write_task_splits` takes the lead snapshot DataFrame, shuffles it
deterministically, splits it according to the task manifest ratios, and
writes the three Parquet files plus a ``task_manifest.json`` into the
tasks directory.
The deterministic shuffle/split/write logic is scheme-agnostic and lives in
:func:`leadforge.render.tasks.write_task_splits` (lifted there in LTV-Pn.3,
byte-identical for this scheme). This wrapper preserves the lead-scoring
default task so existing call sites are unchanged.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING

import pandas as pd
from leadforge.render.tasks import write_task_splits as _write_task_splits
from leadforge.schemes.lead_scoring.tasks import CONVERTED_WITHIN_90_DAYS

from leadforge.core.rng import RNGRoot
from leadforge.schemes.lead_scoring.tasks import CONVERTED_WITHIN_90_DAYS, TaskManifest
if TYPE_CHECKING:
from pathlib import Path

import pandas as pd

from leadforge.schema.tasks import TaskManifest

__all__ = ["write_task_splits"]


def write_task_splits(
Expand All @@ -24,54 +30,8 @@ def write_task_splits(
seed: int,
task: TaskManifest = CONVERTED_WITHIN_90_DAYS,
) -> dict[str, int]:
"""Shuffle, split, and write snapshot Parquet files for *task*.

Files written under ``out_dir / task.task_id /``::

train.parquet
valid.parquet
test.parquet
task_manifest.json

Args:
snapshot: Lead snapshot DataFrame from
:func:`~leadforge.schemes.lead_scoring.render.snapshots.build_snapshot`.
out_dir: Parent directory for task outputs (typically
``bundle_root / "tasks"``).
seed: Seed used for deterministic row shuffle.
task: Task manifest describing the split ratios and label column.
"""Write lead-scoring task splits (see :func:`leadforge.render.tasks.write_task_splits`).

Returns:
Dict mapping split name (``"train"``, ``"valid"``, ``"test"``) to
the number of rows written.
Defaults ``task`` to :data:`CONVERTED_WITHIN_90_DAYS` for this scheme.
"""
task_dir = out_dir / task.task_id
task_dir.mkdir(parents=True, exist_ok=True)

# Deterministic shuffle via the project's RNG substream system.
rng = RNGRoot(seed).child("task_split_shuffle")
indices = list(range(len(snapshot)))
rng.shuffle(indices)
shuffled = snapshot.iloc[indices].reset_index(drop=True)

n = len(shuffled)
n_train = int(n * task.split.train)
n_valid = int(n * task.split.valid)

splits: dict[str, pd.DataFrame] = {
"train": shuffled.iloc[:n_train],
"valid": shuffled.iloc[n_train : n_train + n_valid],
"test": shuffled.iloc[n_train + n_valid :], # remainder avoids rounding off-by-one
}

row_counts: dict[str, int] = {}
for split_name, df in splits.items():
path = task_dir / f"{split_name}.parquet"
df.to_parquet(path, index=False, engine="pyarrow")
row_counts[split_name] = len(df)

# Write task_manifest.json alongside the Parquet files.
manifest_path = task_dir / "task_manifest.json"
manifest_path.write_text(json.dumps(task.to_dict(), indent=2))

return row_counts
return _write_task_splits(snapshot, out_dir, seed=seed, task=task)
Loading
Loading