From 3b470eeaa22940c834455476865c1cf6711a79aa Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 4 May 2026 02:20:04 +0300 Subject: [PATCH 1/2] refactor: deduplicate pipeline functions and add FollowupRampConfig - Extract shared pipeline functions (subsample, derive_features, softcap_expected_acv, assign_acquisition_wave, rename_and_select, inject_missingness_v6) into leadforge/pipelines/common.py - Extract shared ML pipeline (build_baseline_pipeline, build_preprocessor, fit_evaluate, get_feature_cols, sanitize_categoricals) into leadforge/pipelines/ml.py - Define CAT_FEATURES, NUM_FEATURES, BINARY_FEATURES once in common.py; validators and eval scripts import from there - Introduce FollowupRampConfig frozen dataclass in mechanisms/counts.py; LatentDecayIntensity accepts followup param (legacy params still work) - Fix subsample() to raise ValueError when insufficient negatives instead of silently returning a short DataFrame All 866 tests pass, lint clean. Co-Authored-By: Claude Opus 4.6 --- .agent-plan.md | 16 +- leadforge/mechanisms/counts.py | 95 ++++++--- leadforge/mechanisms/policies.py | 16 +- leadforge/pipelines/build_v5.py | 57 +---- leadforge/pipelines/build_v6.py | 196 +++--------------- leadforge/pipelines/build_v7.py | 195 +++--------------- leadforge/pipelines/common.py | 263 ++++++++++++++++++++++++ leadforge/pipelines/ml.py | 121 +++++++++++ scripts/quick_baseline_eval_v6.py | 38 +--- scripts/quick_baseline_eval_v7.py | 82 ++------ scripts/validate_v6_dataset.py | 141 +++---------- scripts/validate_v7_dataset.py | 162 +++------------ tests/scripts/test_build_v5_snapshot.py | 22 +- 13 files changed, 608 insertions(+), 796 deletions(-) create mode 100644 leadforge/pipelines/common.py create mode 100644 leadforge/pipelines/ml.py diff --git a/.agent-plan.md b/.agent-plan.md index 80f8b22..a2779df 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -312,17 +312,17 @@ Documentation + CI: | M14: Notebook 4 (recipe customization) | Discarded | Premature | | M15: Docs polish + v1.0 release | **Done** | README, CHANGELOG, version bump to 1.0.0 complete; architecture diagram and notebooks remain post-v1 | -### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup +### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup ✓ -From self-review of PR #50. Should be a single follow-up PR. +From self-review of PR #50. Completed in a single follow-up PR. -| Item | Description | +| Item | Status | |---|---| -| Extract shared pipeline functions | Move `subsample`, `inject_missingness`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select` into `leadforge/pipelines/common.py`. v5/v6/v7 modules import from common + add version-specific logic. | -| Extract shared ML pipeline | Canonical sklearn pipeline (preprocessor + LR) used by validators and eval scripts should be a single shared utility. | -| Deduplicate feature lists | `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once (in pipeline module or shared constants), imported by validators and eval scripts. | -| Group followup params into dataclass | Introduce `FollowupRampConfig(boost_after_day, boost_factor, ramp_days, latent_weights)` in `mechanisms/counts.py`. Replace 4 constructor params with `followup: FollowupRampConfig | None = None`. | -| Fix `subsample` silent short-return | `subsample()` can return fewer than `n` rows when there aren't enough negatives. Should raise `ValueError` instead. Fix in the shared `common.py`. | +| Extract shared pipeline functions | ✓ `leadforge/pipelines/common.py` — `subsample`, `inject_missingness_v6`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select`. v5/v6/v7 modules import from common. | +| Extract shared ML pipeline | ✓ `leadforge/pipelines/ml.py` — `build_baseline_pipeline`, `build_preprocessor`, `fit_evaluate`, `get_feature_cols`, `sanitize_categoricals`. Used by validators and eval scripts. | +| Deduplicate feature lists | ✓ `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once in `leadforge/pipelines/common.py`, imported by validators and eval scripts. | +| Group followup params into dataclass | ✓ `FollowupRampConfig` frozen dataclass in `mechanisms/counts.py`. `LatentDecayIntensity` accepts `followup: FollowupRampConfig | None`. Legacy params still accepted for backward compat. | +| Fix `subsample` silent short-return | ✓ `subsample()` now raises `ValueError` when insufficient negatives. | ### From post-v1 list diff --git a/leadforge/mechanisms/counts.py b/leadforge/mechanisms/counts.py index f6309e5..b560d45 100644 --- a/leadforge/mechanisms/counts.py +++ b/leadforge/mechanisms/counts.py @@ -9,11 +9,32 @@ import math import random +from dataclasses import dataclass, field from typing import Any from leadforge.mechanisms.base import Mechanism, MechanismContext +@dataclass(frozen=True) +class FollowupRampConfig: + """Configuration for the follow-up ramp on :class:`LatentDecayIntensity`. + + Groups the four follow-up parameters into a single cohesive unit. + + Attributes: + boost_after_day: Day after which latent modulation ramps up. + boost_factor: Multiplier applied to ``boost`` at the end of the ramp. + ramp_days: Number of days over which the ramp transitions linearly. + latent_weights: Optional separate latent weights used after the + followup day. + """ + + boost_after_day: int + boost_factor: float = 1.0 + ramp_days: int = 10 + latent_weights: dict[str, float] = field(default_factory=dict) + + class PoissonIntensity(Mechanism): """Poisson-distributed event count driven by latent traits. @@ -145,8 +166,8 @@ class LatentDecayIntensity(Mechanism): where ``latent_multiplier = sum(weight_i * latents[key_i])``. - After ``followup_boost_after_day``, the effective boost ramps linearly from - ``boost`` to ``boost * followup_boost_factor`` over ``followup_ramp_days``. + After ``followup.boost_after_day``, the effective boost ramps linearly from + ``boost`` to ``boost * followup.boost_factor`` over ``followup.ramp_days``. This models sales teams increasing follow-up intensity for leads that show strong latent signals (engagement, fit, intent) — a causally legitimate amplification of the latent → touch pathway. @@ -159,18 +180,12 @@ class LatentDecayIntensity(Mechanism): latent_weights: Mapping of latent-key → weight for the multiplier. boost: Scaling factor for the latent multiplier (controls how much latent traits amplify touch intensity). - followup_boost_after_day: Day after which latent modulation ramps up. - Set to ``None`` (default) to disable the ramp. - followup_boost_factor: Multiplier applied to ``boost`` at the end of - the ramp period. E.g. ``3.0`` means the effective boost is - ``boost * 3.0`` once the ramp completes. - followup_ramp_days: Number of days over which the ramp transitions - linearly from ``boost`` to ``boost * followup_boost_factor``. - followup_latent_weights: Optional separate latent weights used after - the followup day. Models sales teams responding to *different* - latent signals during the follow-up period (e.g. prioritizing - authority and budget over raw engagement). Blended with the - base weights during the ramp period. + followup: Optional :class:`FollowupRampConfig` grouping the ramp + parameters. Set to ``None`` (default) to disable the ramp. + followup_boost_after_day: **Deprecated** — use ``followup`` instead. + followup_boost_factor: **Deprecated** — use ``followup`` instead. + followup_ramp_days: **Deprecated** — use ``followup`` instead. + followup_latent_weights: **Deprecated** — use ``followup`` instead. """ def __init__( @@ -180,6 +195,8 @@ def __init__( floor_rate: float = 0.01, latent_weights: dict[str, float] | None = None, boost: float = 0.8, + followup: FollowupRampConfig | None = None, + # Legacy params — kept for backward compatibility during transition followup_boost_after_day: int | None = None, followup_boost_factor: float = 1.0, followup_ramp_days: int = 10, @@ -191,25 +208,49 @@ def __init__( raise ValueError(f"decay_factor must be in (0, 1], got {decay_factor}") if floor_rate < 0: raise ValueError(f"floor_rate must be non-negative, got {floor_rate}") - if followup_boost_after_day is not None and followup_boost_after_day < 0: - raise ValueError( - f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}" + + # Resolve followup config: prefer the dataclass, fall back to legacy params + if followup is not None: + if followup.boost_after_day < 0: + raise ValueError( + f"followup.boost_after_day must be non-negative, got {followup.boost_after_day}" + ) + if followup.boost_factor < 1.0: + raise ValueError( + f"followup.boost_factor must be >= 1.0, got {followup.boost_factor}" + ) + if followup.ramp_days < 1: + raise ValueError(f"followup.ramp_days must be >= 1, got {followup.ramp_days}") + self._followup_after: int | None = followup.boost_after_day + self._followup_factor = followup.boost_factor + self._followup_ramp = followup.ramp_days + self._followup_latent_weights: dict[str, float] | None = ( + dict(followup.latent_weights) if followup.latent_weights else None + ) + else: + # Legacy path + if followup_boost_after_day is not None and followup_boost_after_day < 0: + raise ValueError( + f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}" + ) + if followup_boost_factor < 1.0: + raise ValueError( + f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}" + ) + if followup_ramp_days < 1: + raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}") + self._followup_after = followup_boost_after_day + self._followup_factor = followup_boost_factor + self._followup_ramp = followup_ramp_days + self._followup_latent_weights = ( + dict(followup_latent_weights) if followup_latent_weights else None ) - if followup_boost_factor < 1.0: - raise ValueError(f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}") - if followup_ramp_days < 1: - raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}") + self._base_rate = base_rate self._decay = decay_factor self._floor = floor_rate self._latent_weights: dict[str, float] = dict(latent_weights) if latent_weights else {} self._boost = boost - self._followup_after: int | None = followup_boost_after_day - self._followup_factor = followup_boost_factor - self._followup_ramp = followup_ramp_days - self._followup_latent_weights: dict[str, float] | None = ( - dict(followup_latent_weights) if followup_latent_weights else None - ) @property def name(self) -> str: diff --git a/leadforge/mechanisms/policies.py b/leadforge/mechanisms/policies.py index 0d64a3c..fdb1be4 100644 --- a/leadforge/mechanisms/policies.py +++ b/leadforge/mechanisms/policies.py @@ -30,7 +30,11 @@ if TYPE_CHECKING: from leadforge.core.models import DifficultyParams -from leadforge.mechanisms.counts import LatentDecayIntensity, RecencyDecayIntensity +from leadforge.mechanisms.counts import ( + FollowupRampConfig, + LatentDecayIntensity, + RecencyDecayIntensity, +) from leadforge.mechanisms.hazards import ConversionHazard from leadforge.mechanisms.measurement import NoisyProxy from leadforge.mechanisms.scores import LatentScore @@ -342,10 +346,12 @@ def _scale_weights(weights: dict[str, float], s: float) -> dict[str, float]: floor_rate=0.02, latent_weights=touch_latent_w, boost=1.2, - followup_boost_after_day=20, - followup_boost_factor=10.0, - followup_ramp_days=10, - followup_latent_weights=followup_latent_w, + followup=FollowupRampConfig( + boost_after_day=20, + boost_factor=10.0, + ramp_days=10, + latent_weights=followup_latent_w, + ), ) else: touch_intensity = RecencyDecayIntensity( diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index 5cf12c5..4cb196d 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -7,12 +7,20 @@ from __future__ import annotations -import warnings - import numpy as np import pandas as pd from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + SUBSAMPLE_N, + TARGET_RATE, + subsample, +) +from leadforge.pipelines.common import ( + derive_features as _derive_features, +) __all__ = [ "ACV_CAP", @@ -38,12 +46,6 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 10 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 # v5 column set: 18 features + 1 target = 19 columns. FINAL_COLUMNS = [ @@ -89,10 +91,7 @@ def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: """Derive binary features for the v5 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df + return _derive_features(df) def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: @@ -131,40 +130,6 @@ def rename_and_select( return df[FINAL_COLUMNS] -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) - ) - - def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: """Apply structured missingness per the v5 contract. diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index ca2327e..f762751 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -16,12 +16,25 @@ from __future__ import annotations -import warnings - -import numpy as np import pandas as pd from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + SUBSAMPLE_N, + TARGET_RATE, + assign_acquisition_wave, + derive_features, + softcap_expected_acv, + subsample, +) +from leadforge.pipelines.common import ( + inject_missingness_v6 as inject_missingness, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -52,12 +65,6 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 20 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" @@ -102,46 +109,10 @@ # --------------------------------------------------------------------------- -# Pipeline steps +# Version-specific pipeline steps # --------------------------------------------------------------------------- -def derive_features(df: pd.DataFrame) -> pd.DataFrame: - """Derive binary and momentum features for the v6 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df - - -def softcap_expected_acv( - df: pd.DataFrame, - seed: int, - floor: float = ACV_FLOOR, - cap: float = ACV_CAP, -) -> pd.DataFrame: - """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. - - Values below floor are clipped. Values above cap are pulled toward cap - with additive noise so they cluster near the cap rather than pile at it. - """ - rng = RNGRoot(seed).numpy_child("softcap_acv") - df = df.copy() - acv = df["expected_acv"].copy() - - # Floor: hard clip - acv = acv.clip(lower=floor) - - # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers - over_mask = acv > cap - n_over = int(over_mask.sum()) - if n_over > 0: - acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) - - df["expected_acv"] = acv.round(0) - return df - - def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: """Hard clip expected_acv to narrative-consistent range [ACV_FLOOR, ACV_CAP]. @@ -152,32 +123,6 @@ def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: return df -def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Assign acquisition_wave (A, B, C) based on lead index position. - - Waves A/B/C are roughly chronological: first third = A, middle = B, - last third = C. A small amount of noise is added at the boundaries. - """ - rng = RNGRoot(seed).numpy_child("acquisition_wave") - df = df.copy() - n = len(df) - waves = np.empty(n, dtype=object) - third = n // 3 - - # Base assignment by position (stable across seeds) - waves[:third] = "A" - waves[third : 2 * third] = "B" - waves[2 * third :] = "C" - - # Add ~5% boundary noise so it's not perfectly deterministic - noise_mask = rng.random(n) < 0.05 - noise_vals = rng.choice(["A", "B", "C"], size=n) - waves[noise_mask] = noise_vals[noise_mask] - - df["acquisition_wave"] = waves - return df - - def compute_post_snapshot_touches( snapshot_df: pd.DataFrame, all_touches: list, @@ -211,7 +156,7 @@ def compute_post_snapshot_touches( def boost_leakage_trap(df: pd.DataFrame, seed: int) -> pd.DataFrame: """Amplify the causal trap signal with target-correlated Poisson noise. - Converted leads get an extra Poisson(1) count added to the trap column, + Converted leads get an extra Poisson(3) count added to the trap column, making it a stronger leakage signal for teaching purposes. """ rng = RNGRoot(seed).numpy_child("leakage_trap_boost") @@ -237,102 +182,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT - missing = [c for c in columns if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[columns] - - -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS_STUDENT, + instructor=instructor, + instructor_columns=FINAL_COLUMNS_INSTRUCTOR, + label_column=label_column, ) - - -def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Apply structured missingness per the v6 contract. - - Patterns: - 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) - 2. MAR: web_sessions — SDR outbound 15%, inbound marketing 2%, partner 5% - 3. MAR: seniority — partner referral 8%, others 1% - 4. MCAR: expected_acv — 2% uniform - 5. Structural + MCAR: days_since_first_touch — NaN when no touches + 2% MCAR - 6. MCAR: days_since_last_touch — additional 3% on top of structural - """ - rng = RNGRoot(seed).numpy_child("missingness") - df = df.copy() - n = len(df) - - # (1) Structural for days_since_last_touch is already NaN from snapshot builder - # Note: also structural for days_since_first_touch when no touches - - # (2) MAR: web_sessions by lead_source - for source, rate in [ - ("sdr_outbound", 0.15), - ("inbound_marketing", 0.02), - ("partner_referral", 0.05), - ]: - mask = (df["lead_source"] == source) & (rng.random(n) < rate) - df.loc[mask, "web_sessions"] = np.nan - - # (3) MAR: seniority by lead_source - partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) - other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) - df.loc[partner_mask | other_mask, "seniority"] = np.nan - - # (4) MCAR: expected_acv 2% - acv_mcar = rng.random(n) < 0.02 - df.loc[acv_mcar, "expected_acv"] = np.nan - - # (5) MCAR: days_since_first_touch 2% on top of structural - dsft_mask = rng.random(n) < 0.02 - df.loc[dsft_mask, "days_since_first_touch"] = np.nan - - # (6) MCAR: days_since_last_touch 3% on top of structural - dslt_mask = rng.random(n) < 0.03 - df.loc[dslt_mask, "days_since_last_touch"] = np.nan - - return df diff --git a/leadforge/pipelines/build_v7.py b/leadforge/pipelines/build_v7.py index 6e62d59..5a165dc 100644 --- a/leadforge/pipelines/build_v7.py +++ b/leadforge/pipelines/build_v7.py @@ -15,12 +15,24 @@ from __future__ import annotations -import warnings - -import numpy as np import pandas as pd -from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + SUBSAMPLE_N, + TARGET_RATE, + assign_acquisition_wave, + derive_features, + softcap_expected_acv, + subsample, +) +from leadforge.pipelines.common import ( + inject_missingness_v6 as inject_missingness, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -49,12 +61,6 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 20 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" @@ -99,72 +105,10 @@ # --------------------------------------------------------------------------- -# Pipeline steps +# Version-specific pipeline steps # --------------------------------------------------------------------------- -def derive_features(df: pd.DataFrame) -> pd.DataFrame: - """Derive binary and momentum features for the v7 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df - - -def softcap_expected_acv( - df: pd.DataFrame, - seed: int, - floor: float = ACV_FLOOR, - cap: float = ACV_CAP, -) -> pd.DataFrame: - """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. - - Values below floor are clipped. Values above cap are pulled toward cap - with additive noise so they cluster near the cap rather than pile at it. - """ - rng = RNGRoot(seed).numpy_child("softcap_acv") - df = df.copy() - acv = df["expected_acv"].copy() - - # Floor: hard clip - acv = acv.clip(lower=floor) - - # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers - over_mask = acv > cap - n_over = int(over_mask.sum()) - if n_over > 0: - acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) - - df["expected_acv"] = acv.round(0) - return df - - -def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Assign acquisition_wave (A, B, C) based on lead index position. - - Waves A/B/C are roughly chronological: first third = A, middle = B, - last third = C. A small amount of noise is added at the boundaries. - """ - rng = RNGRoot(seed).numpy_child("acquisition_wave") - df = df.copy() - n = len(df) - waves = np.empty(n, dtype=object) - third = n // 3 - - # Base assignment by position (stable across seeds) - waves[:third] = "A" - waves[third : 2 * third] = "B" - waves[2 * third :] = "C" - - # Add ~5% boundary noise so it's not perfectly deterministic - noise_mask = rng.random(n) < 0.05 - noise_vals = rng.choice(["A", "B", "C"], size=n) - waves[noise_mask] = noise_vals[noise_mask] - - df["acquisition_wave"] = waves - return df - - def compute_post_snapshot_touches( snapshot_df: pd.DataFrame, all_touches: list, @@ -212,102 +156,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT - missing = [c for c in columns if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[columns] - - -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS_STUDENT, + instructor=instructor, + instructor_columns=FINAL_COLUMNS_INSTRUCTOR, + label_column=label_column, ) - - -def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Apply structured missingness per the v7 contract. - - Patterns: - 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) - 2. MAR: web_sessions -- SDR outbound 15%, inbound marketing 2%, partner 5% - 3. MAR: seniority -- partner referral 8%, others 1% - 4. MCAR: expected_acv -- 2% uniform - 5. Structural + MCAR: days_since_first_touch -- NaN when no touches + 2% MCAR - 6. MCAR: days_since_last_touch -- additional 3% on top of structural - """ - rng = RNGRoot(seed).numpy_child("missingness") - df = df.copy() - n = len(df) - - # (1) Structural for days_since_last_touch is already NaN from snapshot builder - # Note: also structural for days_since_first_touch when no touches - - # (2) MAR: web_sessions by lead_source - for source, rate in [ - ("sdr_outbound", 0.15), - ("inbound_marketing", 0.02), - ("partner_referral", 0.05), - ]: - mask = (df["lead_source"] == source) & (rng.random(n) < rate) - df.loc[mask, "web_sessions"] = np.nan - - # (3) MAR: seniority by lead_source - partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) - other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) - df.loc[partner_mask | other_mask, "seniority"] = np.nan - - # (4) MCAR: expected_acv 2% - acv_mcar = rng.random(n) < 0.02 - df.loc[acv_mcar, "expected_acv"] = np.nan - - # (5) MCAR: days_since_first_touch 2% on top of structural - dsft_mask = rng.random(n) < 0.02 - df.loc[dsft_mask, "days_since_first_touch"] = np.nan - - # (6) MCAR: days_since_last_touch 3% on top of structural - dslt_mask = rng.random(n) < 0.03 - df.loc[dslt_mask, "days_since_last_touch"] = np.nan - - return df diff --git a/leadforge/pipelines/common.py b/leadforge/pipelines/common.py new file mode 100644 index 0000000..6429993 --- /dev/null +++ b/leadforge/pipelines/common.py @@ -0,0 +1,263 @@ +"""Shared pipeline functions used across v5/v6/v7 build pipelines. + +This module contains the canonical implementations of functions that are +identical (or nearly so) across pipeline versions. Version-specific modules +import from here and override only what differs. +""" + +from __future__ import annotations + +import warnings + +import numpy as np +import pandas as pd + +from leadforge.core.rng import RNGRoot + +__all__ = [ + "ACV_CAP", + "ACV_FLOOR", + "BINARY_FEATURES", + "CAT_FEATURES", + "NUM_FEATURES", + "SUBSAMPLE_N", + "TARGET", + "TARGET_RATE", + "assign_acquisition_wave", + "derive_features", + "inject_missingness_v6", + "rename_and_select", + "softcap_expected_acv", + "subsample", +] + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SUBSAMPLE_N = 1000 +TARGET_RATE = 0.30 +TARGET = "converted" + +# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). +ACV_FLOOR = 18_000.0 +ACV_CAP = 120_000.0 + +# Canonical feature lists for v6/v7 datasets. +CAT_FEATURES = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "acquisition_wave", +] + +NUM_FEATURES = [ + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", +] + +BINARY_FEATURES = [ + "opportunity_created", + "demo_completed", +] + + +# --------------------------------------------------------------------------- +# Pipeline steps +# --------------------------------------------------------------------------- + + +def derive_features(df: pd.DataFrame) -> pd.DataFrame: + """Derive binary and momentum features from raw snapshot columns.""" + df = df.copy() + df["opportunity_created"] = df["opportunity_created"].astype(int) + df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) + return df + + +def softcap_expected_acv( + df: pd.DataFrame, + seed: int, + floor: float = ACV_FLOOR, + cap: float = ACV_CAP, +) -> pd.DataFrame: + """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. + + Values below floor are clipped. Values above cap are pulled toward cap + with additive noise so they cluster near the cap rather than pile at it. + """ + rng = RNGRoot(seed).numpy_child("softcap_acv") + df = df.copy() + acv = df["expected_acv"].copy() + + # Floor: hard clip + acv = acv.clip(lower=floor) + + # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers + over_mask = acv > cap + n_over = int(over_mask.sum()) + if n_over > 0: + acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) + + df["expected_acv"] = acv.round(0) + return df + + +def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: + """Assign acquisition_wave (A, B, C) based on lead index position. + + Waves A/B/C are roughly chronological: first third = A, middle = B, + last third = C. A small amount of noise is added at the boundaries. + """ + rng = RNGRoot(seed).numpy_child("acquisition_wave") + df = df.copy() + n = len(df) + waves = np.empty(n, dtype=object) + third = n // 3 + + # Base assignment by position (stable across seeds) + waves[:third] = "A" + waves[third : 2 * third] = "B" + waves[2 * third :] = "C" + + # Add ~5% boundary noise so it's not perfectly deterministic + noise_mask = rng.random(n) < 0.05 + noise_vals = rng.choice(["A", "B", "C"], size=n) + waves[noise_mask] = noise_vals[noise_mask] + + df["acquisition_wave"] = waves + return df + + +def rename_and_select( + df: pd.DataFrame, + *, + rename_map: dict[str, str], + final_columns: list[str], + instructor: bool = False, + instructor_columns: list[str] | None = None, + label_column: str = "converted_within_90_days", +) -> pd.DataFrame: + """Rename snapshot columns and select final column set. + + Args: + df: Snapshot DataFrame. + rename_map: Mapping from snapshot column names to output names. + final_columns: Student column set (used when instructor=False). + instructor: If True, use instructor_columns instead. + instructor_columns: Instructor column set (superset of final_columns). + label_column: Source column for the binary label. + """ + if label_column not in df.columns: + raise ValueError( + f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" + ) + if label_column == "converted_within_90_days": + rmap = rename_map + else: + rmap = {k: v for k, v in rename_map.items() if v != "converted"} + rmap[label_column] = "converted" + df = df.rename(columns=rmap) + df["converted"] = df["converted"].astype(int) + columns = instructor_columns if instructor and instructor_columns else final_columns + missing = [c for c in columns if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" + ) + return df[columns] + + +def subsample( + df: pd.DataFrame, + seed: int, + n: int = SUBSAMPLE_N, + target_rate: float = TARGET_RATE, +) -> pd.DataFrame: + """Stratified subsample to n rows at target_rate conversion. + + Raises: + ValueError: If there are not enough negative samples to reach *n* rows. + """ + rng = RNGRoot(seed).numpy_child("subsample") + positives = df[df["converted"] == 1] + negatives = df[df["converted"] == 0] + n_pos = int(n * target_rate) + n_neg = n - n_pos + + if len(positives) < n_pos: + warnings.warn( + f"only {len(positives)} positives available, need {n_pos}", + stacklevel=2, + ) + n_pos = len(positives) + n_neg = n - n_pos + if len(negatives) < n_neg: + raise ValueError( + f"only {len(negatives)} negatives available, need {n_neg}; " + f"cannot produce {n} rows at target_rate={target_rate}" + ) + + pos_sample = positives.sample(n=n_pos, random_state=rng) + neg_sample = negatives.sample(n=n_neg, random_state=rng) + return ( + pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + ) + + +def inject_missingness_v6(df: pd.DataFrame, seed: int) -> pd.DataFrame: + """Apply structured missingness (v6/v7 contract — 6 patterns). + + Patterns: + 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) + 2. MAR: web_sessions -- SDR outbound 15%, inbound marketing 2%, partner 5% + 3. MAR: seniority -- partner referral 8%, others 1% + 4. MCAR: expected_acv -- 2% uniform + 5. Structural + MCAR: days_since_first_touch -- NaN when no touches + 2% MCAR + 6. MCAR: days_since_last_touch -- additional 3% on top of structural + """ + rng = RNGRoot(seed).numpy_child("missingness") + df = df.copy() + n = len(df) + + # (1) Structural for days_since_last_touch is already NaN from snapshot builder + # Note: also structural for days_since_first_touch when no touches + + # (2) MAR: web_sessions by lead_source + for source, rate in [ + ("sdr_outbound", 0.15), + ("inbound_marketing", 0.02), + ("partner_referral", 0.05), + ]: + mask = (df["lead_source"] == source) & (rng.random(n) < rate) + df.loc[mask, "web_sessions"] = np.nan + + # (3) MAR: seniority by lead_source + partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) + other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) + df.loc[partner_mask | other_mask, "seniority"] = np.nan + + # (4) MCAR: expected_acv 2% + acv_mcar = rng.random(n) < 0.02 + df.loc[acv_mcar, "expected_acv"] = np.nan + + # (5) MCAR: days_since_first_touch 2% on top of structural + dsft_mask = rng.random(n) < 0.02 + df.loc[dsft_mask, "days_since_first_touch"] = np.nan + + # (6) MCAR: days_since_last_touch 3% on top of structural + dslt_mask = rng.random(n) < 0.03 + df.loc[dslt_mask, "days_since_last_touch"] = np.nan + + return df diff --git a/leadforge/pipelines/ml.py b/leadforge/pipelines/ml.py new file mode 100644 index 0000000..d2f9380 --- /dev/null +++ b/leadforge/pipelines/ml.py @@ -0,0 +1,121 @@ +"""Shared ML pipeline utilities for validation and evaluation scripts. + +Provides the canonical sklearn pipeline (ColumnTransformer + imputation + +encoding + LogisticRegression) used across dataset validators and baseline +evaluation scripts. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import average_precision_score, roc_auc_score +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, NUM_FEATURES, TARGET + +__all__ = [ + "build_baseline_pipeline", + "build_preprocessor", + "fit_evaluate", + "get_feature_cols", + "sanitize_categoricals", +] + +LEAKAGE_PREFIX = "__leakage__" + + +def build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer: + """Build the canonical preprocessing ColumnTransformer.""" + numeric_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="median")), + ("scaler", StandardScaler()), + ] + ) + categorical_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="most_frequent")), + ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), + ] + ) + return ColumnTransformer( + transformers=[ + ("num", numeric_transformer, num_cols), + ("cat", categorical_transformer, cat_cols), + ], + remainder="drop", + ) + + +def build_baseline_pipeline( + num_cols: list[str], + cat_cols: list[str], + seed: int = 42, +) -> Pipeline: + """Build the canonical sklearn baseline pipeline (preprocessor + LR).""" + preprocessor = build_preprocessor(num_cols, cat_cols) + return Pipeline( + [ + ("preprocessor", preprocessor), + ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=seed)), + ] + ) + + +def get_feature_cols( + df: pd.DataFrame, + exclude: set[str] | None = None, +) -> tuple[list[str], list[str]]: + """Partition feature columns into (cat_cols, num_cols). + + Uses the canonical feature lists, falling back to dtype-based detection + for columns not in the canonical lists (e.g. leakage trap columns). + """ + exclude = (exclude or set()) | {TARGET} + cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in exclude] + num_cols = [c for c in NUM_FEATURES + BINARY_FEATURES if c in df.columns and c not in exclude] + # Add any trap columns to numeric if not excluded + for c in df.columns: + if c.startswith(LEAKAGE_PREFIX) and c not in exclude: + num_cols.append(c) + return cat_cols, num_cols + + +def sanitize_categoricals(df: pd.DataFrame, cat_cols: list[str]) -> pd.DataFrame: + """Convert pd.NA in categorical columns to None for sklearn compatibility.""" + df = df.copy() + for c in cat_cols: + if c in df.columns: + df[c] = df[c].astype(object).where(df[c].notna(), None) + return df + + +def fit_evaluate( + df: pd.DataFrame, + exclude_cols: set[str] | None = None, + seed: int = 42, + test_size: float = 0.30, +) -> tuple[float, float, np.ndarray, pd.Series]: + """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" + y = df[TARGET].astype(int) + cat_cols, num_cols = get_feature_cols(df, exclude=exclude_cols) + df_clean = sanitize_categoricals(df, cat_cols) + x = df_clean[cat_cols + num_cols] + + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=test_size, random_state=seed, stratify=y + ) + + pipe = build_baseline_pipeline(num_cols, cat_cols, seed=42) + pipe.fit(x_train, y_train) + probs = pipe.predict_proba(x_test)[:, 1] + + auc = float(roc_auc_score(y_test, probs)) + pr_auc = float(average_precision_score(y_test, probs)) + return auc, pr_auc, probs, y_test diff --git a/scripts/quick_baseline_eval_v6.py b/scripts/quick_baseline_eval_v6.py index a4bb3c7..ebb943e 100644 --- a/scripts/quick_baseline_eval_v6.py +++ b/scripts/quick_baseline_eval_v6.py @@ -15,9 +15,7 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier -from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( average_precision_score, @@ -25,37 +23,15 @@ ) from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler - -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" -SEED = 42 +from leadforge.pipelines.common import TARGET +from leadforge.pipelines.ml import LEAKAGE_PREFIX, build_preprocessor, get_feature_cols -def _get_feature_cols(df, exclude=None): - exclude = (exclude or set()) | {TARGET} - cat, num = [], [] - for col in df.columns: - if col in exclude: - continue - if pd.api.types.is_numeric_dtype(df[col]): - num.append(col) - else: - cat.append(col) - return cat, num +SEED = 42 def _build_pipeline(num_cols, cat_cols, model="lr"): - num_tr = Pipeline([("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler())]) - cat_tr = Pipeline( - [ - ("imp", SimpleImputer(strategy="most_frequent")), - ("enc", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - pre = ColumnTransformer( - [("num", num_tr, num_cols), ("cat", cat_tr, cat_cols)], remainder="drop" - ) + pre = build_preprocessor(num_cols, cat_cols) if model == "lr": clf = LogisticRegression(max_iter=1000, solver="lbfgs", random_state=SEED) elif model == "rf": @@ -118,7 +94,7 @@ def main(): print(f"Missing values: {df.isna().sum().sum()} total") leakage = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} - cat_cols, num_cols = _get_feature_cols(df, exclude=leakage) + cat_cols, num_cols = get_feature_cols(df, exclude=leakage) y = df[TARGET].astype(int) x = df[cat_cols + num_cols] @@ -176,8 +152,8 @@ def main(): trap_col = trap_cols[0] all_leakage = set(trap_cols) - cat_i, num_i = _get_feature_cols(df_i, exclude=all_leakage) - cat_with, num_with = _get_feature_cols(df_i, exclude=all_leakage - {trap_col}) + cat_i, num_i = get_feature_cols(df_i, exclude=all_leakage) + cat_with, num_with = get_feature_cols(df_i, exclude=all_leakage - {trap_col}) y_i = df_i[TARGET].astype(int) deltas = [] diff --git a/scripts/quick_baseline_eval_v7.py b/scripts/quick_baseline_eval_v7.py index 7bb7972..6881ba0 100644 --- a/scripts/quick_baseline_eval_v7.py +++ b/scripts/quick_baseline_eval_v7.py @@ -15,75 +15,17 @@ import numpy as np import pandas as pd from sklearn.base import clone -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier -from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler - -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -NUM_FEATURES = [ - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "opportunity_created", - "demo_completed", -] - - -def _sanitize(df: pd.DataFrame) -> pd.DataFrame: - df = df.copy() - for c in CAT_FEATURES: - if c in df.columns: - df[c] = df[c].astype(object).where(df[c].notna(), None) - return df - - -def _build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer: - return ColumnTransformer( - transformers=[ - ( - "num", - Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ), - num_cols, - ), - ( - "cat", - Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ), - cat_cols, - ), - ], - remainder="drop", - ) + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, NUM_FEATURES, TARGET +from leadforge.pipelines.ml import LEAKAGE_PREFIX, build_preprocessor, sanitize_categoricals + +# NUM_FEATURES for eval includes binary features as numeric inputs +_EVAL_NUM_FEATURES = NUM_FEATURES + BINARY_FEATURES def main() -> None: @@ -94,10 +36,10 @@ def main() -> None: student_path = sys.argv[1] instructor_path = sys.argv[2] if len(sys.argv) > 2 else None - df = _sanitize(pd.read_csv(student_path)) + df = sanitize_categoricals(pd.read_csv(student_path), CAT_FEATURES) leakage = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in leakage] - num_cols = [c for c in NUM_FEATURES if c in df.columns and c not in leakage] + num_cols = [c for c in _EVAL_NUM_FEATURES if c in df.columns and c not in leakage] y = df[TARGET].astype(int) x = df[cat_cols + num_cols] @@ -123,7 +65,7 @@ def main() -> None: x_tr, x_te, y_tr, y_te = train_test_split( x, y, test_size=0.30, random_state=seed, stratify=y ) - pipe = Pipeline([("pre", _build_preprocessor(num_cols, cat_cols)), ("clf", clone(clf))]) + pipe = Pipeline([("pre", build_preprocessor(num_cols, cat_cols)), ("clf", clone(clf))]) pipe.fit(x_tr, y_tr) aucs.append(roc_auc_score(y_te, pipe.predict_proba(x_te)[:, 1])) print(f" {name:4s}: AUC = {np.mean(aucs):.4f} (std={np.std(aucs):.4f})") @@ -136,7 +78,7 @@ def main() -> None: x_tr, x_te, y_tr, y_te = train_test_split(x, y, test_size=0.30, random_state=42, stratify=y) pipe = Pipeline( [ - ("pre", _build_preprocessor(num_cols, cat_cols)), + ("pre", build_preprocessor(num_cols, cat_cols)), ("clf", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), ] ) @@ -171,7 +113,7 @@ def main() -> None: print("\nFeature importance (GBM):") gbm_pipe = Pipeline( [ - ("pre", _build_preprocessor(num_cols, cat_cols)), + ("pre", build_preprocessor(num_cols, cat_cols)), ("clf", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) @@ -193,7 +135,7 @@ def main() -> None: print("\n" + "=" * 60) print("TRAP DETECTION (instructor)") print("=" * 60) - inst = _sanitize(pd.read_csv(instructor_path)) + inst = sanitize_categoricals(pd.read_csv(instructor_path), CAT_FEATURES) trap_cols = [c for c in inst.columns if c.startswith(LEAKAGE_PREFIX)] if trap_cols: trap_col = trap_cols[0] diff --git a/scripts/validate_v6_dataset.py b/scripts/validate_v6_dataset.py index d289cf3..019cd0f 100644 --- a/scripts/validate_v6_dataset.py +++ b/scripts/validate_v6_dataset.py @@ -13,25 +13,32 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier -from sklearn.impute import SimpleImputer -from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( - average_precision_score, roc_auc_score, ) from sklearn.model_selection import train_test_split -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import ( + BINARY_FEATURES, + CAT_FEATURES, + TARGET, +) +from leadforge.pipelines.ml import ( + LEAKAGE_PREFIX, + build_baseline_pipeline, +) +from leadforge.pipelines.ml import ( + fit_evaluate as _fit_evaluate, +) +from leadforge.pipelines.ml import ( + get_feature_cols as _get_feature_cols, +) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - BANNED_COLUMNS = { "current_stage", "funnel_stage", @@ -41,22 +48,6 @@ "lead_created_at", } -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -BINARY_FEATURES = [ - "opportunity_created", - "demo_completed", -] - # Validation thresholds AUC_LOWER = 0.62 AUC_UPPER = 0.90 @@ -72,82 +63,10 @@ # --------------------------------------------------------------------------- -# ML pipeline builder (canonical) +# ML pipeline (imported from shared utility) # --------------------------------------------------------------------------- - -def _build_pipeline( - num_cols: list[str], - cat_cols: list[str], -) -> Pipeline: - """Build the canonical sklearn baseline pipeline.""" - numeric_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="median")), - ("scaler", StandardScaler()), - ] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - return Pipeline( - [ - ("preprocessor", preprocessor), - ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), - ] - ) - - -def _get_feature_cols( - df: pd.DataFrame, - exclude: set[str] | None = None, -) -> tuple[list[str], list[str]]: - """Partition feature columns into (cat_cols, num_cols).""" - exclude = (exclude or set()) | {TARGET} - cat_cols = [] - num_cols = [] - for col in df.columns: - if col in exclude: - continue - if pd.api.types.is_numeric_dtype(df[col]): - num_cols.append(col) - else: - cat_cols.append(col) - return cat_cols, num_cols - - -def _fit_evaluate( - df: pd.DataFrame, - exclude_cols: set[str] | None = None, - seed: int = 42, - test_size: float = 0.30, -) -> tuple[float, float, np.ndarray, pd.Series]: - """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" - y = df[TARGET].astype(int) - cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) - x = df[cat_cols + num_cols] - - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=test_size, random_state=seed, stratify=y - ) - - pipe = _build_pipeline(num_cols, cat_cols) - pipe.fit(x_train, y_train) - probs = pipe.predict_proba(x_test)[:, 1] - - auc = float(roc_auc_score(y_test, probs)) - pr_auc = float(average_precision_score(y_test, probs)) - return auc, pr_auc, probs, y_test +_build_pipeline = build_baseline_pipeline # --------------------------------------------------------------------------- @@ -272,25 +191,13 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_aucs.append(lr_auc) # GBM with one-hot encoded features - numeric_transformer = Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - gb = Pipeline( + from sklearn.pipeline import Pipeline as _Pipeline + + from leadforge.pipelines.ml import build_preprocessor as _build_pre + + gb = _Pipeline( [ - ("preprocessor", preprocessor), + ("preprocessor", _build_pre(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/scripts/validate_v7_dataset.py b/scripts/validate_v7_dataset.py index e440b60..c828803 100644 --- a/scripts/validate_v7_dataset.py +++ b/scripts/validate_v7_dataset.py @@ -22,25 +22,36 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier -from sklearn.impute import SimpleImputer -from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( average_precision_score, roc_auc_score, ) from sklearn.model_selection import train_test_split -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import ( + BINARY_FEATURES, + CAT_FEATURES, + TARGET, +) +from leadforge.pipelines.ml import ( + LEAKAGE_PREFIX, + build_baseline_pipeline, +) +from leadforge.pipelines.ml import ( + fit_evaluate as _fit_evaluate, +) +from leadforge.pipelines.ml import ( + get_feature_cols as _get_feature_cols, +) +from leadforge.pipelines.ml import ( + sanitize_categoricals as _sanitize_categoricals, +) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - BANNED_COLUMNS = { "current_stage", "funnel_stage", @@ -51,34 +62,6 @@ "close_outcome", } -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -BINARY_FEATURES = [ - "opportunity_created", - "demo_completed", -] - -NUM_FEATURES = [ - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", -] - # Validation thresholds AUC_LOWER = 0.62 AUC_UPPER = 0.90 @@ -100,89 +83,10 @@ # --------------------------------------------------------------------------- -# ML pipeline builder (canonical) +# ML pipeline (imported from shared utility) # --------------------------------------------------------------------------- - -def _build_pipeline( - num_cols: list[str], - cat_cols: list[str], -) -> Pipeline: - """Build the canonical sklearn baseline pipeline.""" - numeric_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="median")), - ("scaler", StandardScaler()), - ] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - return Pipeline( - [ - ("preprocessor", preprocessor), - ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), - ] - ) - - -def _get_feature_cols( - df: pd.DataFrame, - exclude: set[str] | None = None, -) -> tuple[list[str], list[str]]: - """Partition feature columns into (cat_cols, num_cols).""" - exclude = (exclude or set()) | {TARGET} - cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in exclude] - num_cols = [c for c in NUM_FEATURES + BINARY_FEATURES if c in df.columns and c not in exclude] - # Add any trap columns to numeric if not excluded - for c in df.columns: - if c.startswith(LEAKAGE_PREFIX) and c not in exclude: - num_cols.append(c) - return cat_cols, num_cols - - -def _sanitize_categoricals(df: pd.DataFrame, cat_cols: list[str]) -> pd.DataFrame: - """Convert pd.NA in categorical columns to None for sklearn compatibility.""" - df = df.copy() - for c in cat_cols: - if c in df.columns: - df[c] = df[c].astype(object).where(df[c].notna(), None) - return df - - -def _fit_evaluate( - df: pd.DataFrame, - exclude_cols: set[str] | None = None, - seed: int = 42, - test_size: float = 0.30, -) -> tuple[float, float, np.ndarray, pd.Series]: - """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" - y = df[TARGET].astype(int) - cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) - df_clean = _sanitize_categoricals(df, cat_cols) - x = df_clean[cat_cols + num_cols] - - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=test_size, random_state=seed, stratify=y - ) - - pipe = _build_pipeline(num_cols, cat_cols) - pipe.fit(x_train, y_train) - probs = pipe.predict_proba(x_test)[:, 1] - - auc = float(roc_auc_score(y_test, probs)) - pr_auc = float(average_precision_score(y_test, probs)) - return auc, pr_auc, probs, y_test +_build_pipeline = build_baseline_pipeline # --------------------------------------------------------------------------- @@ -307,25 +211,13 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_auc = roc_auc_score(y_test, lr.predict_proba(x_test)[:, 1]) lr_aucs.append(lr_auc) - numeric_transformer = Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - gb = Pipeline( + from sklearn.pipeline import Pipeline as _Pipeline + + from leadforge.pipelines.ml import build_preprocessor as _build_pre + + gb = _Pipeline( [ - ("preprocessor", preprocessor), + ("preprocessor", _build_pre(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/tests/scripts/test_build_v5_snapshot.py b/tests/scripts/test_build_v5_snapshot.py index 08dd5c4..25f4565 100644 --- a/tests/scripts/test_build_v5_snapshot.py +++ b/tests/scripts/test_build_v5_snapshot.py @@ -213,29 +213,21 @@ def test_insufficient_positives(self): assert result["converted"].sum() <= 10 def test_insufficient_negatives(self): - """When fewer negatives available than needed, warns and adjusts.""" + """When fewer negatives than needed, raises ValueError.""" df = _make_v5_df(n=200, conversion_rate=0.95) # only ~10 negatives - n_neg_available = (df["converted"] == 0).sum() - with pytest.warns(UserWarning, match="negatives available"): - result = subsample(df, seed=42, n=100, target_rate=0.10) # need 90 negatives - # Verify actual composition: negatives capped at available count - n_neg_result = (result["converted"] == 0).sum() - assert n_neg_result <= n_neg_available - # Output should still contain rows (not empty) - assert len(result) > 0 + with pytest.raises(ValueError, match="negatives available"): + subsample(df, seed=42, n=100, target_rate=0.10) # need 90 negatives def test_index_is_reset(self): df = _make_v5_df(n=500) result = subsample(df, seed=42, n=100, target_rate=0.30) assert list(result.index) == list(range(len(result))) - def test_n_larger_than_input_caps_gracefully(self): - """Requesting more rows than available caps at available count.""" + def test_n_larger_than_input_raises(self): + """Requesting more negatives than available raises ValueError.""" df = _make_v5_df(n=50) - with pytest.warns(UserWarning, match="available"): - result = subsample(df, seed=42, n=200, target_rate=0.30) - # Output should contain all available rows (capped) - assert len(result) <= len(df) + with pytest.raises(ValueError, match="negatives available"): + subsample(df, seed=42, n=200, target_rate=0.30) # --------------------------------------------------------------------------- From 276e8db12ded4626d9e35079d49fb19a4b1c410c Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 4 May 2026 08:42:47 +0300 Subject: [PATCH 2/2] fix: address self-review findings from pipeline dedup PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Extract compute_post_snapshot_touches to common.py (was duplicated between v6 and v7 — byte-for-byte identical) 2. Move FINAL_COLUMNS_STUDENT, FINAL_COLUMNS_INSTRUCTOR, RENAME_MAP, INSTRUCTOR_TRAP_COL to common.py (identical in v6/v7) 3. Make v5.rename_and_select delegate to common._rename_and_select_generic (was the only version not using the shared implementation) 4. Fix fit_evaluate() hardcoding seed=42 for the LR model — now passes the caller's seed through 5. Make insufficient positives raise ValueError (was asymmetric: negatives raised but positives only warned) 6. Move loop-internal imports to module level in validator scripts 7. Add __post_init__ validation to FollowupRampConfig so invalid configs fail at construction, not when passed to LatentDecayIntensity 8. Add tests: FollowupRampConfig dataclass path equivalence test, FollowupRampConfig validation test 868 tests pass, lint clean. Co-Authored-By: Claude Opus 4.6 --- leadforge/mechanisms/counts.py | 19 +++--- leadforge/pipelines/build_v5.py | 26 +++----- leadforge/pipelines/build_v6.py | 76 ++------------------- leadforge/pipelines/build_v7.py | 79 ++-------------------- leadforge/pipelines/common.py | 88 +++++++++++++++++++++++-- leadforge/pipelines/ml.py | 2 +- scripts/validate_v6_dataset.py | 20 ++---- scripts/validate_v7_dataset.py | 21 ++---- tests/mechanisms/test_mechanisms.py | 51 ++++++++++++++ tests/scripts/test_build_v5_snapshot.py | 12 ++-- 10 files changed, 178 insertions(+), 216 deletions(-) diff --git a/leadforge/mechanisms/counts.py b/leadforge/mechanisms/counts.py index b560d45..d9f7bde 100644 --- a/leadforge/mechanisms/counts.py +++ b/leadforge/mechanisms/counts.py @@ -34,6 +34,14 @@ class FollowupRampConfig: ramp_days: int = 10 latent_weights: dict[str, float] = field(default_factory=dict) + def __post_init__(self) -> None: + if self.boost_after_day < 0: + raise ValueError(f"boost_after_day must be non-negative, got {self.boost_after_day}") + if self.boost_factor < 1.0: + raise ValueError(f"boost_factor must be >= 1.0, got {self.boost_factor}") + if self.ramp_days < 1: + raise ValueError(f"ramp_days must be >= 1, got {self.ramp_days}") + class PoissonIntensity(Mechanism): """Poisson-distributed event count driven by latent traits. @@ -211,16 +219,7 @@ def __init__( # Resolve followup config: prefer the dataclass, fall back to legacy params if followup is not None: - if followup.boost_after_day < 0: - raise ValueError( - f"followup.boost_after_day must be non-negative, got {followup.boost_after_day}" - ) - if followup.boost_factor < 1.0: - raise ValueError( - f"followup.boost_factor must be >= 1.0, got {followup.boost_factor}" - ) - if followup.ramp_days < 1: - raise ValueError(f"followup.ramp_days must be >= 1, got {followup.ramp_days}") + # Validation is handled by FollowupRampConfig.__post_init__ self._followup_after: int | None = followup.boost_after_day self._followup_factor = followup.boost_factor self._followup_ramp = followup.ramp_days diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index 4cb196d..4f6303f 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -21,6 +21,9 @@ from leadforge.pipelines.common import ( derive_features as _derive_features, ) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -111,23 +114,12 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - missing = [c for c in FINAL_COLUMNS if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[FINAL_COLUMNS] + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS, + label_column=label_column, + ) def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index f762751..c8cd69b 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -22,9 +22,14 @@ from leadforge.pipelines.common import ( ACV_CAP, ACV_FLOOR, + FINAL_COLUMNS_INSTRUCTOR, + FINAL_COLUMNS_STUDENT, + INSTRUCTOR_TRAP_COL, + RENAME_MAP, SUBSAMPLE_N, TARGET_RATE, assign_acquisition_wave, + compute_post_snapshot_touches, derive_features, softcap_expected_acv, subsample, @@ -66,47 +71,6 @@ N_LEADS = 5000 SNAPSHOT_DAY = 20 -INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" - -# v6 student column set: 19 features + 1 target = 20 columns. -FINAL_COLUMNS_STUDENT = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "opportunity_created", - "demo_completed", - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "acquisition_wave", - "converted", -] - -# Instructor adds the trap column at the end. -FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] - -# Snapshot column -> v6 column renaming. -RENAME_MAP = { - "employee_band": "company_size", - "estimated_revenue_band": "company_revenue", - "role_function": "contact_role", - "inbound_touch_count": "inbound_touches", - "outbound_touch_count": "outbound_touches", - "session_count": "web_sessions", - "activity_count": "sales_activities", - "converted_within_90_days": "converted", -} - # --------------------------------------------------------------------------- # Version-specific pipeline steps @@ -123,36 +87,6 @@ def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: return df -def compute_post_snapshot_touches( - snapshot_df: pd.DataFrame, - all_touches: list, - lead_dates: dict[str, str], - snapshot_day: int = SNAPSHOT_DAY, - horizon_day: int = 90, -) -> pd.Series: - """Count touches in (snapshot_day, horizon_day] per lead from event data. - - This is the causally-grounded leakage trap: it counts actual simulated - touches that occur after the snapshot cutoff. - """ - if not all_touches: - return pd.Series(0, index=snapshot_df.index, name=INSTRUCTOR_TRAP_COL) - - td = pd.DataFrame([t.to_dict() for t in all_touches]) - td["_ts"] = pd.to_datetime(td["touch_timestamp"]) - td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) - td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days - - # Filter: days in (snapshot_day, horizon_day] - post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] - counts = post.groupby("lead_id").size().reset_index(name=INSTRUCTOR_TRAP_COL) - - # Merge back onto snapshot - result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") - result[INSTRUCTOR_TRAP_COL] = result[INSTRUCTOR_TRAP_COL].fillna(0).astype(int) - return result[INSTRUCTOR_TRAP_COL] - - def boost_leakage_trap(df: pd.DataFrame, seed: int) -> pd.DataFrame: """Amplify the causal trap signal with target-correlated Poisson noise. diff --git a/leadforge/pipelines/build_v7.py b/leadforge/pipelines/build_v7.py index 5a165dc..06700a1 100644 --- a/leadforge/pipelines/build_v7.py +++ b/leadforge/pipelines/build_v7.py @@ -20,9 +20,14 @@ from leadforge.pipelines.common import ( ACV_CAP, ACV_FLOOR, + FINAL_COLUMNS_INSTRUCTOR, + FINAL_COLUMNS_STUDENT, + INSTRUCTOR_TRAP_COL, + RENAME_MAP, SUBSAMPLE_N, TARGET_RATE, assign_acquisition_wave, + compute_post_snapshot_touches, derive_features, softcap_expected_acv, subsample, @@ -62,86 +67,12 @@ N_LEADS = 5000 SNAPSHOT_DAY = 20 -INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" - -# v7 student column set: 19 features + 1 target = 20 columns. -FINAL_COLUMNS_STUDENT = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "opportunity_created", - "demo_completed", - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "acquisition_wave", - "converted", -] - -# Instructor adds the trap column at the end. -FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] - -# Snapshot column -> v7 column renaming. -RENAME_MAP = { - "employee_band": "company_size", - "estimated_revenue_band": "company_revenue", - "role_function": "contact_role", - "inbound_touch_count": "inbound_touches", - "outbound_touch_count": "outbound_touches", - "session_count": "web_sessions", - "activity_count": "sales_activities", - "converted_within_90_days": "converted", -} - # --------------------------------------------------------------------------- # Version-specific pipeline steps # --------------------------------------------------------------------------- -def compute_post_snapshot_touches( - snapshot_df: pd.DataFrame, - all_touches: list, - lead_dates: dict[str, str], - snapshot_day: int = SNAPSHOT_DAY, - horizon_day: int = 90, -) -> pd.Series: - """Count touches in (snapshot_day, horizon_day] per lead from event data. - - This is the purely causal leakage trap: it counts actual simulated - touches that occur after the snapshot cutoff. No label-conditioned - boost is applied. The trap is predictive because future engagement - and conversion share latent causal drivers (fit, intent, engagement - propensity), not because the target was injected. - """ - if not all_touches: - return pd.Series(0, index=snapshot_df.index, name=INSTRUCTOR_TRAP_COL) - - td = pd.DataFrame([t.to_dict() for t in all_touches]) - td["_ts"] = pd.to_datetime(td["touch_timestamp"]) - td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) - td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days - - # Filter: days in (snapshot_day, horizon_day] - post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] - counts = post.groupby("lead_id").size().reset_index(name=INSTRUCTOR_TRAP_COL) - - # Merge back onto snapshot - result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") - result[INSTRUCTOR_TRAP_COL] = result[INSTRUCTOR_TRAP_COL].fillna(0).astype(int) - return result[INSTRUCTOR_TRAP_COL] - - def rename_and_select( df: pd.DataFrame, *, diff --git a/leadforge/pipelines/common.py b/leadforge/pipelines/common.py index 6429993..5192463 100644 --- a/leadforge/pipelines/common.py +++ b/leadforge/pipelines/common.py @@ -7,8 +7,6 @@ from __future__ import annotations -import warnings - import numpy as np import pandas as pd @@ -19,11 +17,16 @@ "ACV_FLOOR", "BINARY_FEATURES", "CAT_FEATURES", + "FINAL_COLUMNS_INSTRUCTOR", + "FINAL_COLUMNS_STUDENT", + "INSTRUCTOR_TRAP_COL", "NUM_FEATURES", + "RENAME_MAP", "SUBSAMPLE_N", "TARGET", "TARGET_RATE", "assign_acquisition_wave", + "compute_post_snapshot_touches", "derive_features", "inject_missingness_v6", "rename_and_select", @@ -72,6 +75,47 @@ "demo_completed", ] +INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" + +# v6/v7 student column set: 19 features + 1 target = 20 columns. +FINAL_COLUMNS_STUDENT = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "opportunity_created", + "demo_completed", + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "acquisition_wave", + "converted", +] + +# Instructor adds the trap column at the end. +FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] + +# Snapshot column -> v6/v7 column renaming. +RENAME_MAP = { + "employee_band": "company_size", + "estimated_revenue_band": "company_revenue", + "role_function": "contact_role", + "inbound_touch_count": "inbound_touches", + "outbound_touch_count": "outbound_touches", + "session_count": "web_sessions", + "activity_count": "sales_activities", + "converted_within_90_days": "converted", +} + # --------------------------------------------------------------------------- # Pipeline steps @@ -179,6 +223,38 @@ def rename_and_select( return df[columns] +def compute_post_snapshot_touches( + snapshot_df: pd.DataFrame, + all_touches: list, + lead_dates: dict[str, str], + snapshot_day: int = 20, + horizon_day: int = 90, + trap_col: str = INSTRUCTOR_TRAP_COL, +) -> pd.Series: + """Count touches in (snapshot_day, horizon_day] per lead from event data. + + This is the causal leakage trap: it counts actual simulated touches that + occur after the snapshot cutoff. The trap is predictive because future + engagement and conversion share latent causal drivers. + """ + if not all_touches: + return pd.Series(0, index=snapshot_df.index, name=trap_col) + + td = pd.DataFrame([t.to_dict() for t in all_touches]) + td["_ts"] = pd.to_datetime(td["touch_timestamp"]) + td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) + td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days + + # Filter: days in (snapshot_day, horizon_day] + post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] + counts = post.groupby("lead_id").size().reset_index(name=trap_col) + + # Merge back onto snapshot + result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") + result[trap_col] = result[trap_col].fillna(0).astype(int) + return result[trap_col] + + def subsample( df: pd.DataFrame, seed: int, @@ -197,12 +273,10 @@ def subsample( n_neg = n - n_pos if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, + raise ValueError( + f"only {len(positives)} positives available, need {n_pos}; " + f"cannot produce {n} rows at target_rate={target_rate}" ) - n_pos = len(positives) - n_neg = n - n_pos if len(negatives) < n_neg: raise ValueError( f"only {len(negatives)} negatives available, need {n_neg}; " diff --git a/leadforge/pipelines/ml.py b/leadforge/pipelines/ml.py index d2f9380..85b0d1a 100644 --- a/leadforge/pipelines/ml.py +++ b/leadforge/pipelines/ml.py @@ -112,7 +112,7 @@ def fit_evaluate( x, y, test_size=test_size, random_state=seed, stratify=y ) - pipe = build_baseline_pipeline(num_cols, cat_cols, seed=42) + pipe = build_baseline_pipeline(num_cols, cat_cols, seed=seed) pipe.fit(x_train, y_train) probs = pipe.predict_proba(x_test)[:, 1] diff --git a/scripts/validate_v6_dataset.py b/scripts/validate_v6_dataset.py index 019cd0f..733f367 100644 --- a/scripts/validate_v6_dataset.py +++ b/scripts/validate_v6_dataset.py @@ -14,19 +14,15 @@ import numpy as np import pandas as pd from sklearn.ensemble import GradientBoostingClassifier -from sklearn.metrics import ( - roc_auc_score, -) +from sklearn.metrics import roc_auc_score from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline -from leadforge.pipelines.common import ( - BINARY_FEATURES, - CAT_FEATURES, - TARGET, -) +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, TARGET from leadforge.pipelines.ml import ( LEAKAGE_PREFIX, build_baseline_pipeline, + build_preprocessor, ) from leadforge.pipelines.ml import ( fit_evaluate as _fit_evaluate, @@ -191,13 +187,9 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_aucs.append(lr_auc) # GBM with one-hot encoded features - from sklearn.pipeline import Pipeline as _Pipeline - - from leadforge.pipelines.ml import build_preprocessor as _build_pre - - gb = _Pipeline( + gb = Pipeline( [ - ("preprocessor", _build_pre(num_cols, cat_cols)), + ("preprocessor", build_preprocessor(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/scripts/validate_v7_dataset.py b/scripts/validate_v7_dataset.py index c828803..4042aca 100644 --- a/scripts/validate_v7_dataset.py +++ b/scripts/validate_v7_dataset.py @@ -23,20 +23,15 @@ import numpy as np import pandas as pd from sklearn.ensemble import GradientBoostingClassifier -from sklearn.metrics import ( - average_precision_score, - roc_auc_score, -) +from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline -from leadforge.pipelines.common import ( - BINARY_FEATURES, - CAT_FEATURES, - TARGET, -) +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, TARGET from leadforge.pipelines.ml import ( LEAKAGE_PREFIX, build_baseline_pipeline, + build_preprocessor, ) from leadforge.pipelines.ml import ( fit_evaluate as _fit_evaluate, @@ -211,13 +206,9 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_auc = roc_auc_score(y_test, lr.predict_proba(x_test)[:, 1]) lr_aucs.append(lr_auc) - from sklearn.pipeline import Pipeline as _Pipeline - - from leadforge.pipelines.ml import build_preprocessor as _build_pre - - gb = _Pipeline( + gb = Pipeline( [ - ("preprocessor", _build_pre(num_cols, cat_cols)), + ("preprocessor", build_preprocessor(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/tests/mechanisms/test_mechanisms.py b/tests/mechanisms/test_mechanisms.py index 17c63dd..99b0c68 100644 --- a/tests/mechanisms/test_mechanisms.py +++ b/tests/mechanisms/test_mechanisms.py @@ -10,6 +10,7 @@ from leadforge.mechanisms.base import MechanismAssignment, MechanismContext, MechanismSummary from leadforge.mechanisms.categorical import CHANNEL_QUALITY_SCORES, CategoricalInfluence from leadforge.mechanisms.counts import ( + FollowupRampConfig, LatentDecayIntensity, PoissonIntensity, RecencyDecayIntensity, @@ -749,3 +750,53 @@ def test_followup_boost_factor_below_one_raises(self) -> None: def test_followup_ramp_days_zero_raises(self) -> None: with pytest.raises(ValueError, match="followup_ramp_days must be >= 1"): LatentDecayIntensity(base_rate=0.5, followup_ramp_days=0) + + def test_followup_ramp_config_dataclass_equivalent(self) -> None: + """FollowupRampConfig path produces identical behavior to legacy kwargs.""" + latent_w = {"latent_fit": 1.5, "latent_intent": 1.0} + followup_w = {"latent_budget": 2.0, "latent_authority": 1.5} + latents = { + "latent_fit": 0.8, + "latent_intent": 0.6, + "latent_budget": 0.7, + "latent_authority": 0.5, + } + + # Legacy path + ldi_legacy = LatentDecayIntensity( + base_rate=0.5, + latent_weights=latent_w, + boost=1.2, + followup_boost_after_day=20, + followup_boost_factor=5.0, + followup_ramp_days=10, + followup_latent_weights=followup_w, + ) + + # Dataclass path + ldi_new = LatentDecayIntensity( + base_rate=0.5, + latent_weights=latent_w, + boost=1.2, + followup=FollowupRampConfig( + boost_after_day=20, + boost_factor=5.0, + ramp_days=10, + latent_weights=followup_w, + ), + ) + + # Both must produce identical expected counts at various days + for t in [0, 10, 20, 25, 30, 50]: + assert ldi_legacy.expected_count(t, latents) == ldi_new.expected_count(t, latents), ( + f"Mismatch at t={t}" + ) + + def test_followup_ramp_config_validation(self) -> None: + """FollowupRampConfig validates on construction.""" + with pytest.raises(ValueError, match="boost_after_day must be non-negative"): + FollowupRampConfig(boost_after_day=-1) + with pytest.raises(ValueError, match="boost_factor must be >= 1.0"): + FollowupRampConfig(boost_after_day=10, boost_factor=0.5) + with pytest.raises(ValueError, match="ramp_days must be >= 1"): + FollowupRampConfig(boost_after_day=10, ramp_days=0) diff --git a/tests/scripts/test_build_v5_snapshot.py b/tests/scripts/test_build_v5_snapshot.py index 25f4565..de6941e 100644 --- a/tests/scripts/test_build_v5_snapshot.py +++ b/tests/scripts/test_build_v5_snapshot.py @@ -205,12 +205,10 @@ def test_deterministic_given_seed(self): pd.testing.assert_frame_equal(r1, r2) def test_insufficient_positives(self): - """When fewer positives available than needed, warns and adjusts.""" + """When fewer positives than needed, raises ValueError.""" df = _make_v5_df(n=200, conversion_rate=0.05) # only ~10 positives - with pytest.warns(UserWarning, match="positives available"): - result = subsample(df, seed=42, n=100, target_rate=0.50) # need 50 positives - # All available positives should be included - assert result["converted"].sum() <= 10 + with pytest.raises(ValueError, match="positives available"): + subsample(df, seed=42, n=100, target_rate=0.50) # need 50 positives def test_insufficient_negatives(self): """When fewer negatives than needed, raises ValueError.""" @@ -224,9 +222,9 @@ def test_index_is_reset(self): assert list(result.index) == list(range(len(result))) def test_n_larger_than_input_raises(self): - """Requesting more negatives than available raises ValueError.""" + """Requesting more rows than available raises ValueError.""" df = _make_v5_df(n=50) - with pytest.raises(ValueError, match="negatives available"): + with pytest.raises(ValueError, match="available"): subsample(df, seed=42, n=200, target_rate=0.30)