diff --git a/.agent-plan.md b/.agent-plan.md index 80f8b22..a2779df 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -312,17 +312,17 @@ Documentation + CI: | M14: Notebook 4 (recipe customization) | Discarded | Premature | | M15: Docs polish + v1.0 release | **Done** | README, CHANGELOG, version bump to 1.0.0 complete; architecture diagram and notebooks remain post-v1 | -### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup +### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup ✓ -From self-review of PR #50. Should be a single follow-up PR. +From self-review of PR #50. Completed in a single follow-up PR. -| Item | Description | +| Item | Status | |---|---| -| Extract shared pipeline functions | Move `subsample`, `inject_missingness`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select` into `leadforge/pipelines/common.py`. v5/v6/v7 modules import from common + add version-specific logic. | -| Extract shared ML pipeline | Canonical sklearn pipeline (preprocessor + LR) used by validators and eval scripts should be a single shared utility. | -| Deduplicate feature lists | `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once (in pipeline module or shared constants), imported by validators and eval scripts. | -| Group followup params into dataclass | Introduce `FollowupRampConfig(boost_after_day, boost_factor, ramp_days, latent_weights)` in `mechanisms/counts.py`. Replace 4 constructor params with `followup: FollowupRampConfig | None = None`. | -| Fix `subsample` silent short-return | `subsample()` can return fewer than `n` rows when there aren't enough negatives. Should raise `ValueError` instead. Fix in the shared `common.py`. | +| Extract shared pipeline functions | ✓ `leadforge/pipelines/common.py` — `subsample`, `inject_missingness_v6`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select`. v5/v6/v7 modules import from common. | +| Extract shared ML pipeline | ✓ `leadforge/pipelines/ml.py` — `build_baseline_pipeline`, `build_preprocessor`, `fit_evaluate`, `get_feature_cols`, `sanitize_categoricals`. Used by validators and eval scripts. | +| Deduplicate feature lists | ✓ `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once in `leadforge/pipelines/common.py`, imported by validators and eval scripts. | +| Group followup params into dataclass | ✓ `FollowupRampConfig` frozen dataclass in `mechanisms/counts.py`. `LatentDecayIntensity` accepts `followup: FollowupRampConfig | None`. Legacy params still accepted for backward compat. | +| Fix `subsample` silent short-return | ✓ `subsample()` now raises `ValueError` when insufficient negatives. | ### From post-v1 list diff --git a/leadforge/mechanisms/counts.py b/leadforge/mechanisms/counts.py index f6309e5..d9f7bde 100644 --- a/leadforge/mechanisms/counts.py +++ b/leadforge/mechanisms/counts.py @@ -9,11 +9,40 @@ import math import random +from dataclasses import dataclass, field from typing import Any from leadforge.mechanisms.base import Mechanism, MechanismContext +@dataclass(frozen=True) +class FollowupRampConfig: + """Configuration for the follow-up ramp on :class:`LatentDecayIntensity`. + + Groups the four follow-up parameters into a single cohesive unit. + + Attributes: + boost_after_day: Day after which latent modulation ramps up. + boost_factor: Multiplier applied to ``boost`` at the end of the ramp. + ramp_days: Number of days over which the ramp transitions linearly. + latent_weights: Optional separate latent weights used after the + followup day. + """ + + boost_after_day: int + boost_factor: float = 1.0 + ramp_days: int = 10 + latent_weights: dict[str, float] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.boost_after_day < 0: + raise ValueError(f"boost_after_day must be non-negative, got {self.boost_after_day}") + if self.boost_factor < 1.0: + raise ValueError(f"boost_factor must be >= 1.0, got {self.boost_factor}") + if self.ramp_days < 1: + raise ValueError(f"ramp_days must be >= 1, got {self.ramp_days}") + + class PoissonIntensity(Mechanism): """Poisson-distributed event count driven by latent traits. @@ -145,8 +174,8 @@ class LatentDecayIntensity(Mechanism): where ``latent_multiplier = sum(weight_i * latents[key_i])``. - After ``followup_boost_after_day``, the effective boost ramps linearly from - ``boost`` to ``boost * followup_boost_factor`` over ``followup_ramp_days``. + After ``followup.boost_after_day``, the effective boost ramps linearly from + ``boost`` to ``boost * followup.boost_factor`` over ``followup.ramp_days``. This models sales teams increasing follow-up intensity for leads that show strong latent signals (engagement, fit, intent) — a causally legitimate amplification of the latent → touch pathway. @@ -159,18 +188,12 @@ class LatentDecayIntensity(Mechanism): latent_weights: Mapping of latent-key → weight for the multiplier. boost: Scaling factor for the latent multiplier (controls how much latent traits amplify touch intensity). - followup_boost_after_day: Day after which latent modulation ramps up. - Set to ``None`` (default) to disable the ramp. - followup_boost_factor: Multiplier applied to ``boost`` at the end of - the ramp period. E.g. ``3.0`` means the effective boost is - ``boost * 3.0`` once the ramp completes. - followup_ramp_days: Number of days over which the ramp transitions - linearly from ``boost`` to ``boost * followup_boost_factor``. - followup_latent_weights: Optional separate latent weights used after - the followup day. Models sales teams responding to *different* - latent signals during the follow-up period (e.g. prioritizing - authority and budget over raw engagement). Blended with the - base weights during the ramp period. + followup: Optional :class:`FollowupRampConfig` grouping the ramp + parameters. Set to ``None`` (default) to disable the ramp. + followup_boost_after_day: **Deprecated** — use ``followup`` instead. + followup_boost_factor: **Deprecated** — use ``followup`` instead. + followup_ramp_days: **Deprecated** — use ``followup`` instead. + followup_latent_weights: **Deprecated** — use ``followup`` instead. """ def __init__( @@ -180,6 +203,8 @@ def __init__( floor_rate: float = 0.01, latent_weights: dict[str, float] | None = None, boost: float = 0.8, + followup: FollowupRampConfig | None = None, + # Legacy params — kept for backward compatibility during transition followup_boost_after_day: int | None = None, followup_boost_factor: float = 1.0, followup_ramp_days: int = 10, @@ -191,25 +216,40 @@ def __init__( raise ValueError(f"decay_factor must be in (0, 1], got {decay_factor}") if floor_rate < 0: raise ValueError(f"floor_rate must be non-negative, got {floor_rate}") - if followup_boost_after_day is not None and followup_boost_after_day < 0: - raise ValueError( - f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}" + + # Resolve followup config: prefer the dataclass, fall back to legacy params + if followup is not None: + # Validation is handled by FollowupRampConfig.__post_init__ + self._followup_after: int | None = followup.boost_after_day + self._followup_factor = followup.boost_factor + self._followup_ramp = followup.ramp_days + self._followup_latent_weights: dict[str, float] | None = ( + dict(followup.latent_weights) if followup.latent_weights else None + ) + else: + # Legacy path + if followup_boost_after_day is not None and followup_boost_after_day < 0: + raise ValueError( + f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}" + ) + if followup_boost_factor < 1.0: + raise ValueError( + f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}" + ) + if followup_ramp_days < 1: + raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}") + self._followup_after = followup_boost_after_day + self._followup_factor = followup_boost_factor + self._followup_ramp = followup_ramp_days + self._followup_latent_weights = ( + dict(followup_latent_weights) if followup_latent_weights else None ) - if followup_boost_factor < 1.0: - raise ValueError(f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}") - if followup_ramp_days < 1: - raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}") + self._base_rate = base_rate self._decay = decay_factor self._floor = floor_rate self._latent_weights: dict[str, float] = dict(latent_weights) if latent_weights else {} self._boost = boost - self._followup_after: int | None = followup_boost_after_day - self._followup_factor = followup_boost_factor - self._followup_ramp = followup_ramp_days - self._followup_latent_weights: dict[str, float] | None = ( - dict(followup_latent_weights) if followup_latent_weights else None - ) @property def name(self) -> str: diff --git a/leadforge/mechanisms/policies.py b/leadforge/mechanisms/policies.py index 0d64a3c..fdb1be4 100644 --- a/leadforge/mechanisms/policies.py +++ b/leadforge/mechanisms/policies.py @@ -30,7 +30,11 @@ if TYPE_CHECKING: from leadforge.core.models import DifficultyParams -from leadforge.mechanisms.counts import LatentDecayIntensity, RecencyDecayIntensity +from leadforge.mechanisms.counts import ( + FollowupRampConfig, + LatentDecayIntensity, + RecencyDecayIntensity, +) from leadforge.mechanisms.hazards import ConversionHazard from leadforge.mechanisms.measurement import NoisyProxy from leadforge.mechanisms.scores import LatentScore @@ -342,10 +346,12 @@ def _scale_weights(weights: dict[str, float], s: float) -> dict[str, float]: floor_rate=0.02, latent_weights=touch_latent_w, boost=1.2, - followup_boost_after_day=20, - followup_boost_factor=10.0, - followup_ramp_days=10, - followup_latent_weights=followup_latent_w, + followup=FollowupRampConfig( + boost_after_day=20, + boost_factor=10.0, + ramp_days=10, + latent_weights=followup_latent_w, + ), ) else: touch_intensity = RecencyDecayIntensity( diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index 5cf12c5..4f6303f 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -7,12 +7,23 @@ from __future__ import annotations -import warnings - import numpy as np import pandas as pd from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + SUBSAMPLE_N, + TARGET_RATE, + subsample, +) +from leadforge.pipelines.common import ( + derive_features as _derive_features, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -38,12 +49,6 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 10 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 # v5 column set: 18 features + 1 target = 19 columns. FINAL_COLUMNS = [ @@ -89,10 +94,7 @@ def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: """Derive binary features for the v5 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df + return _derive_features(df) def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: @@ -112,56 +114,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - missing = [c for c in FINAL_COLUMNS if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[FINAL_COLUMNS] - - -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS, + label_column=label_column, ) diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index ca2327e..c8cd69b 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -16,12 +16,30 @@ from __future__ import annotations -import warnings - -import numpy as np import pandas as pd from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + FINAL_COLUMNS_INSTRUCTOR, + FINAL_COLUMNS_STUDENT, + INSTRUCTOR_TRAP_COL, + RENAME_MAP, + SUBSAMPLE_N, + TARGET_RATE, + assign_acquisition_wave, + compute_post_snapshot_touches, + derive_features, + softcap_expected_acv, + subsample, +) +from leadforge.pipelines.common import ( + inject_missingness_v6 as inject_missingness, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -52,96 +70,13 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 20 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 - -INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" - -# v6 student column set: 19 features + 1 target = 20 columns. -FINAL_COLUMNS_STUDENT = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "opportunity_created", - "demo_completed", - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "acquisition_wave", - "converted", -] - -# Instructor adds the trap column at the end. -FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] - -# Snapshot column -> v6 column renaming. -RENAME_MAP = { - "employee_band": "company_size", - "estimated_revenue_band": "company_revenue", - "role_function": "contact_role", - "inbound_touch_count": "inbound_touches", - "outbound_touch_count": "outbound_touches", - "session_count": "web_sessions", - "activity_count": "sales_activities", - "converted_within_90_days": "converted", -} # --------------------------------------------------------------------------- -# Pipeline steps +# Version-specific pipeline steps # --------------------------------------------------------------------------- -def derive_features(df: pd.DataFrame) -> pd.DataFrame: - """Derive binary and momentum features for the v6 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df - - -def softcap_expected_acv( - df: pd.DataFrame, - seed: int, - floor: float = ACV_FLOOR, - cap: float = ACV_CAP, -) -> pd.DataFrame: - """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. - - Values below floor are clipped. Values above cap are pulled toward cap - with additive noise so they cluster near the cap rather than pile at it. - """ - rng = RNGRoot(seed).numpy_child("softcap_acv") - df = df.copy() - acv = df["expected_acv"].copy() - - # Floor: hard clip - acv = acv.clip(lower=floor) - - # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers - over_mask = acv > cap - n_over = int(over_mask.sum()) - if n_over > 0: - acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) - - df["expected_acv"] = acv.round(0) - return df - - def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: """Hard clip expected_acv to narrative-consistent range [ACV_FLOOR, ACV_CAP]. @@ -152,66 +87,10 @@ def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: return df -def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Assign acquisition_wave (A, B, C) based on lead index position. - - Waves A/B/C are roughly chronological: first third = A, middle = B, - last third = C. A small amount of noise is added at the boundaries. - """ - rng = RNGRoot(seed).numpy_child("acquisition_wave") - df = df.copy() - n = len(df) - waves = np.empty(n, dtype=object) - third = n // 3 - - # Base assignment by position (stable across seeds) - waves[:third] = "A" - waves[third : 2 * third] = "B" - waves[2 * third :] = "C" - - # Add ~5% boundary noise so it's not perfectly deterministic - noise_mask = rng.random(n) < 0.05 - noise_vals = rng.choice(["A", "B", "C"], size=n) - waves[noise_mask] = noise_vals[noise_mask] - - df["acquisition_wave"] = waves - return df - - -def compute_post_snapshot_touches( - snapshot_df: pd.DataFrame, - all_touches: list, - lead_dates: dict[str, str], - snapshot_day: int = SNAPSHOT_DAY, - horizon_day: int = 90, -) -> pd.Series: - """Count touches in (snapshot_day, horizon_day] per lead from event data. - - This is the causally-grounded leakage trap: it counts actual simulated - touches that occur after the snapshot cutoff. - """ - if not all_touches: - return pd.Series(0, index=snapshot_df.index, name=INSTRUCTOR_TRAP_COL) - - td = pd.DataFrame([t.to_dict() for t in all_touches]) - td["_ts"] = pd.to_datetime(td["touch_timestamp"]) - td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) - td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days - - # Filter: days in (snapshot_day, horizon_day] - post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] - counts = post.groupby("lead_id").size().reset_index(name=INSTRUCTOR_TRAP_COL) - - # Merge back onto snapshot - result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") - result[INSTRUCTOR_TRAP_COL] = result[INSTRUCTOR_TRAP_COL].fillna(0).astype(int) - return result[INSTRUCTOR_TRAP_COL] - - def boost_leakage_trap(df: pd.DataFrame, seed: int) -> pd.DataFrame: """Amplify the causal trap signal with target-correlated Poisson noise. - Converted leads get an extra Poisson(1) count added to the trap column, + Converted leads get an extra Poisson(3) count added to the trap column, making it a stronger leakage signal for teaching purposes. """ rng = RNGRoot(seed).numpy_child("leakage_trap_boost") @@ -237,102 +116,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT - missing = [c for c in columns if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[columns] - - -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS_STUDENT, + instructor=instructor, + instructor_columns=FINAL_COLUMNS_INSTRUCTOR, + label_column=label_column, ) - - -def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Apply structured missingness per the v6 contract. - - Patterns: - 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) - 2. MAR: web_sessions — SDR outbound 15%, inbound marketing 2%, partner 5% - 3. MAR: seniority — partner referral 8%, others 1% - 4. MCAR: expected_acv — 2% uniform - 5. Structural + MCAR: days_since_first_touch — NaN when no touches + 2% MCAR - 6. MCAR: days_since_last_touch — additional 3% on top of structural - """ - rng = RNGRoot(seed).numpy_child("missingness") - df = df.copy() - n = len(df) - - # (1) Structural for days_since_last_touch is already NaN from snapshot builder - # Note: also structural for days_since_first_touch when no touches - - # (2) MAR: web_sessions by lead_source - for source, rate in [ - ("sdr_outbound", 0.15), - ("inbound_marketing", 0.02), - ("partner_referral", 0.05), - ]: - mask = (df["lead_source"] == source) & (rng.random(n) < rate) - df.loc[mask, "web_sessions"] = np.nan - - # (3) MAR: seniority by lead_source - partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) - other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) - df.loc[partner_mask | other_mask, "seniority"] = np.nan - - # (4) MCAR: expected_acv 2% - acv_mcar = rng.random(n) < 0.02 - df.loc[acv_mcar, "expected_acv"] = np.nan - - # (5) MCAR: days_since_first_touch 2% on top of structural - dsft_mask = rng.random(n) < 0.02 - df.loc[dsft_mask, "days_since_first_touch"] = np.nan - - # (6) MCAR: days_since_last_touch 3% on top of structural - dslt_mask = rng.random(n) < 0.03 - df.loc[dslt_mask, "days_since_last_touch"] = np.nan - - return df diff --git a/leadforge/pipelines/build_v7.py b/leadforge/pipelines/build_v7.py index 6e62d59..06700a1 100644 --- a/leadforge/pipelines/build_v7.py +++ b/leadforge/pipelines/build_v7.py @@ -15,12 +15,29 @@ from __future__ import annotations -import warnings - -import numpy as np import pandas as pd -from leadforge.core.rng import RNGRoot +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + FINAL_COLUMNS_INSTRUCTOR, + FINAL_COLUMNS_STUDENT, + INSTRUCTOR_TRAP_COL, + RENAME_MAP, + SUBSAMPLE_N, + TARGET_RATE, + assign_acquisition_wave, + compute_post_snapshot_touches, + derive_features, + softcap_expected_acv, + subsample, +) +from leadforge.pipelines.common import ( + inject_missingness_v6 as inject_missingness, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) __all__ = [ "ACV_CAP", @@ -49,155 +66,13 @@ SEED = 42 N_LEADS = 5000 SNAPSHOT_DAY = 20 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 - -INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" - -# v7 student column set: 19 features + 1 target = 20 columns. -FINAL_COLUMNS_STUDENT = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "opportunity_created", - "demo_completed", - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "acquisition_wave", - "converted", -] - -# Instructor adds the trap column at the end. -FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] - -# Snapshot column -> v7 column renaming. -RENAME_MAP = { - "employee_band": "company_size", - "estimated_revenue_band": "company_revenue", - "role_function": "contact_role", - "inbound_touch_count": "inbound_touches", - "outbound_touch_count": "outbound_touches", - "session_count": "web_sessions", - "activity_count": "sales_activities", - "converted_within_90_days": "converted", -} # --------------------------------------------------------------------------- -# Pipeline steps +# Version-specific pipeline steps # --------------------------------------------------------------------------- -def derive_features(df: pd.DataFrame) -> pd.DataFrame: - """Derive binary and momentum features for the v7 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df - - -def softcap_expected_acv( - df: pd.DataFrame, - seed: int, - floor: float = ACV_FLOOR, - cap: float = ACV_CAP, -) -> pd.DataFrame: - """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. - - Values below floor are clipped. Values above cap are pulled toward cap - with additive noise so they cluster near the cap rather than pile at it. - """ - rng = RNGRoot(seed).numpy_child("softcap_acv") - df = df.copy() - acv = df["expected_acv"].copy() - - # Floor: hard clip - acv = acv.clip(lower=floor) - - # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers - over_mask = acv > cap - n_over = int(over_mask.sum()) - if n_over > 0: - acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) - - df["expected_acv"] = acv.round(0) - return df - - -def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Assign acquisition_wave (A, B, C) based on lead index position. - - Waves A/B/C are roughly chronological: first third = A, middle = B, - last third = C. A small amount of noise is added at the boundaries. - """ - rng = RNGRoot(seed).numpy_child("acquisition_wave") - df = df.copy() - n = len(df) - waves = np.empty(n, dtype=object) - third = n // 3 - - # Base assignment by position (stable across seeds) - waves[:third] = "A" - waves[third : 2 * third] = "B" - waves[2 * third :] = "C" - - # Add ~5% boundary noise so it's not perfectly deterministic - noise_mask = rng.random(n) < 0.05 - noise_vals = rng.choice(["A", "B", "C"], size=n) - waves[noise_mask] = noise_vals[noise_mask] - - df["acquisition_wave"] = waves - return df - - -def compute_post_snapshot_touches( - snapshot_df: pd.DataFrame, - all_touches: list, - lead_dates: dict[str, str], - snapshot_day: int = SNAPSHOT_DAY, - horizon_day: int = 90, -) -> pd.Series: - """Count touches in (snapshot_day, horizon_day] per lead from event data. - - This is the purely causal leakage trap: it counts actual simulated - touches that occur after the snapshot cutoff. No label-conditioned - boost is applied. The trap is predictive because future engagement - and conversion share latent causal drivers (fit, intent, engagement - propensity), not because the target was injected. - """ - if not all_touches: - return pd.Series(0, index=snapshot_df.index, name=INSTRUCTOR_TRAP_COL) - - td = pd.DataFrame([t.to_dict() for t in all_touches]) - td["_ts"] = pd.to_datetime(td["touch_timestamp"]) - td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) - td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days - - # Filter: days in (snapshot_day, horizon_day] - post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] - counts = post.groupby("lead_id").size().reset_index(name=INSTRUCTOR_TRAP_COL) - - # Merge back onto snapshot - result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") - result[INSTRUCTOR_TRAP_COL] = result[INSTRUCTOR_TRAP_COL].fillna(0).astype(int) - return result[INSTRUCTOR_TRAP_COL] - - def rename_and_select( df: pd.DataFrame, *, @@ -212,102 +87,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - if label_column not in df.columns: - raise ValueError( - f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" - ) - if label_column == "converted_within_90_days": - rename_map = RENAME_MAP - else: - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" - df = df.rename(columns=rename_map) - df["converted"] = df["converted"].astype(int) - columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT - missing = [c for c in columns if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[columns] - - -def subsample( - df: pd.DataFrame, - seed: int, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - rng = RNGRoot(seed).numpy_child("subsample") - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - warnings.warn( - f"only {len(positives)} positives available, need {n_pos}", - stacklevel=2, - ) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - warnings.warn( - f"only {len(negatives)} negatives available, need {n_neg}", - stacklevel=2, - ) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS_STUDENT, + instructor=instructor, + instructor_columns=FINAL_COLUMNS_INSTRUCTOR, + label_column=label_column, ) - - -def inject_missingness(df: pd.DataFrame, seed: int) -> pd.DataFrame: - """Apply structured missingness per the v7 contract. - - Patterns: - 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) - 2. MAR: web_sessions -- SDR outbound 15%, inbound marketing 2%, partner 5% - 3. MAR: seniority -- partner referral 8%, others 1% - 4. MCAR: expected_acv -- 2% uniform - 5. Structural + MCAR: days_since_first_touch -- NaN when no touches + 2% MCAR - 6. MCAR: days_since_last_touch -- additional 3% on top of structural - """ - rng = RNGRoot(seed).numpy_child("missingness") - df = df.copy() - n = len(df) - - # (1) Structural for days_since_last_touch is already NaN from snapshot builder - # Note: also structural for days_since_first_touch when no touches - - # (2) MAR: web_sessions by lead_source - for source, rate in [ - ("sdr_outbound", 0.15), - ("inbound_marketing", 0.02), - ("partner_referral", 0.05), - ]: - mask = (df["lead_source"] == source) & (rng.random(n) < rate) - df.loc[mask, "web_sessions"] = np.nan - - # (3) MAR: seniority by lead_source - partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) - other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) - df.loc[partner_mask | other_mask, "seniority"] = np.nan - - # (4) MCAR: expected_acv 2% - acv_mcar = rng.random(n) < 0.02 - df.loc[acv_mcar, "expected_acv"] = np.nan - - # (5) MCAR: days_since_first_touch 2% on top of structural - dsft_mask = rng.random(n) < 0.02 - df.loc[dsft_mask, "days_since_first_touch"] = np.nan - - # (6) MCAR: days_since_last_touch 3% on top of structural - dslt_mask = rng.random(n) < 0.03 - df.loc[dslt_mask, "days_since_last_touch"] = np.nan - - return df diff --git a/leadforge/pipelines/common.py b/leadforge/pipelines/common.py new file mode 100644 index 0000000..5192463 --- /dev/null +++ b/leadforge/pipelines/common.py @@ -0,0 +1,337 @@ +"""Shared pipeline functions used across v5/v6/v7 build pipelines. + +This module contains the canonical implementations of functions that are +identical (or nearly so) across pipeline versions. Version-specific modules +import from here and override only what differs. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from leadforge.core.rng import RNGRoot + +__all__ = [ + "ACV_CAP", + "ACV_FLOOR", + "BINARY_FEATURES", + "CAT_FEATURES", + "FINAL_COLUMNS_INSTRUCTOR", + "FINAL_COLUMNS_STUDENT", + "INSTRUCTOR_TRAP_COL", + "NUM_FEATURES", + "RENAME_MAP", + "SUBSAMPLE_N", + "TARGET", + "TARGET_RATE", + "assign_acquisition_wave", + "compute_post_snapshot_touches", + "derive_features", + "inject_missingness_v6", + "rename_and_select", + "softcap_expected_acv", + "subsample", +] + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SUBSAMPLE_N = 1000 +TARGET_RATE = 0.30 +TARGET = "converted" + +# Narrative-consistent ACV bounds (from narrative.yaml: $18k-$120k). +ACV_FLOOR = 18_000.0 +ACV_CAP = 120_000.0 + +# Canonical feature lists for v6/v7 datasets. +CAT_FEATURES = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "acquisition_wave", +] + +NUM_FEATURES = [ + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", +] + +BINARY_FEATURES = [ + "opportunity_created", + "demo_completed", +] + +INSTRUCTOR_TRAP_COL = "__leakage__touches_post_snapshot_21_90" + +# v6/v7 student column set: 19 features + 1 target = 20 columns. +FINAL_COLUMNS_STUDENT = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "opportunity_created", + "demo_completed", + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "acquisition_wave", + "converted", +] + +# Instructor adds the trap column at the end. +FINAL_COLUMNS_INSTRUCTOR = FINAL_COLUMNS_STUDENT + [INSTRUCTOR_TRAP_COL] + +# Snapshot column -> v6/v7 column renaming. +RENAME_MAP = { + "employee_band": "company_size", + "estimated_revenue_band": "company_revenue", + "role_function": "contact_role", + "inbound_touch_count": "inbound_touches", + "outbound_touch_count": "outbound_touches", + "session_count": "web_sessions", + "activity_count": "sales_activities", + "converted_within_90_days": "converted", +} + + +# --------------------------------------------------------------------------- +# Pipeline steps +# --------------------------------------------------------------------------- + + +def derive_features(df: pd.DataFrame) -> pd.DataFrame: + """Derive binary and momentum features from raw snapshot columns.""" + df = df.copy() + df["opportunity_created"] = df["opportunity_created"].astype(int) + df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) + return df + + +def softcap_expected_acv( + df: pd.DataFrame, + seed: int, + floor: float = ACV_FLOOR, + cap: float = ACV_CAP, +) -> pd.DataFrame: + """Soft winsorize expected_acv to avoid hard-clipping ties at the cap. + + Values below floor are clipped. Values above cap are pulled toward cap + with additive noise so they cluster near the cap rather than pile at it. + """ + rng = RNGRoot(seed).numpy_child("softcap_acv") + df = df.copy() + acv = df["expected_acv"].copy() + + # Floor: hard clip + acv = acv.clip(lower=floor) + + # Cap: soft winsorize -- draw values in [cap - 5k, cap] for outliers + over_mask = acv > cap + n_over = int(over_mask.sum()) + if n_over > 0: + acv.loc[over_mask] = cap - rng.uniform(0, 5000, size=n_over) + + df["expected_acv"] = acv.round(0) + return df + + +def assign_acquisition_wave(df: pd.DataFrame, seed: int) -> pd.DataFrame: + """Assign acquisition_wave (A, B, C) based on lead index position. + + Waves A/B/C are roughly chronological: first third = A, middle = B, + last third = C. A small amount of noise is added at the boundaries. + """ + rng = RNGRoot(seed).numpy_child("acquisition_wave") + df = df.copy() + n = len(df) + waves = np.empty(n, dtype=object) + third = n // 3 + + # Base assignment by position (stable across seeds) + waves[:third] = "A" + waves[third : 2 * third] = "B" + waves[2 * third :] = "C" + + # Add ~5% boundary noise so it's not perfectly deterministic + noise_mask = rng.random(n) < 0.05 + noise_vals = rng.choice(["A", "B", "C"], size=n) + waves[noise_mask] = noise_vals[noise_mask] + + df["acquisition_wave"] = waves + return df + + +def rename_and_select( + df: pd.DataFrame, + *, + rename_map: dict[str, str], + final_columns: list[str], + instructor: bool = False, + instructor_columns: list[str] | None = None, + label_column: str = "converted_within_90_days", +) -> pd.DataFrame: + """Rename snapshot columns and select final column set. + + Args: + df: Snapshot DataFrame. + rename_map: Mapping from snapshot column names to output names. + final_columns: Student column set (used when instructor=False). + instructor: If True, use instructor_columns instead. + instructor_columns: Instructor column set (superset of final_columns). + label_column: Source column for the binary label. + """ + if label_column not in df.columns: + raise ValueError( + f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" + ) + if label_column == "converted_within_90_days": + rmap = rename_map + else: + rmap = {k: v for k, v in rename_map.items() if v != "converted"} + rmap[label_column] = "converted" + df = df.rename(columns=rmap) + df["converted"] = df["converted"].astype(int) + columns = instructor_columns if instructor and instructor_columns else final_columns + missing = [c for c in columns if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" + ) + return df[columns] + + +def compute_post_snapshot_touches( + snapshot_df: pd.DataFrame, + all_touches: list, + lead_dates: dict[str, str], + snapshot_day: int = 20, + horizon_day: int = 90, + trap_col: str = INSTRUCTOR_TRAP_COL, +) -> pd.Series: + """Count touches in (snapshot_day, horizon_day] per lead from event data. + + This is the causal leakage trap: it counts actual simulated touches that + occur after the snapshot cutoff. The trap is predictive because future + engagement and conversion share latent causal drivers. + """ + if not all_touches: + return pd.Series(0, index=snapshot_df.index, name=trap_col) + + td = pd.DataFrame([t.to_dict() for t in all_touches]) + td["_ts"] = pd.to_datetime(td["touch_timestamp"]) + td["_lead_date"] = td["lead_id"].map({lid: pd.Timestamp(d) for lid, d in lead_dates.items()}) + td["_day"] = (td["_ts"] - td["_lead_date"]).dt.days + + # Filter: days in (snapshot_day, horizon_day] + post = td[(td["_day"] > snapshot_day) & (td["_day"] <= horizon_day)] + counts = post.groupby("lead_id").size().reset_index(name=trap_col) + + # Merge back onto snapshot + result = snapshot_df[["lead_id"]].merge(counts, on="lead_id", how="left") + result[trap_col] = result[trap_col].fillna(0).astype(int) + return result[trap_col] + + +def subsample( + df: pd.DataFrame, + seed: int, + n: int = SUBSAMPLE_N, + target_rate: float = TARGET_RATE, +) -> pd.DataFrame: + """Stratified subsample to n rows at target_rate conversion. + + Raises: + ValueError: If there are not enough negative samples to reach *n* rows. + """ + rng = RNGRoot(seed).numpy_child("subsample") + positives = df[df["converted"] == 1] + negatives = df[df["converted"] == 0] + n_pos = int(n * target_rate) + n_neg = n - n_pos + + if len(positives) < n_pos: + raise ValueError( + f"only {len(positives)} positives available, need {n_pos}; " + f"cannot produce {n} rows at target_rate={target_rate}" + ) + if len(negatives) < n_neg: + raise ValueError( + f"only {len(negatives)} negatives available, need {n_neg}; " + f"cannot produce {n} rows at target_rate={target_rate}" + ) + + pos_sample = positives.sample(n=n_pos, random_state=rng) + neg_sample = negatives.sample(n=n_neg, random_state=rng) + return ( + pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + ) + + +def inject_missingness_v6(df: pd.DataFrame, seed: int) -> pd.DataFrame: + """Apply structured missingness (v6/v7 contract — 6 patterns). + + Patterns: + 1. Structural: days_since_last_touch is NaN when touch_count=0 (from snapshot) + 2. MAR: web_sessions -- SDR outbound 15%, inbound marketing 2%, partner 5% + 3. MAR: seniority -- partner referral 8%, others 1% + 4. MCAR: expected_acv -- 2% uniform + 5. Structural + MCAR: days_since_first_touch -- NaN when no touches + 2% MCAR + 6. MCAR: days_since_last_touch -- additional 3% on top of structural + """ + rng = RNGRoot(seed).numpy_child("missingness") + df = df.copy() + n = len(df) + + # (1) Structural for days_since_last_touch is already NaN from snapshot builder + # Note: also structural for days_since_first_touch when no touches + + # (2) MAR: web_sessions by lead_source + for source, rate in [ + ("sdr_outbound", 0.15), + ("inbound_marketing", 0.02), + ("partner_referral", 0.05), + ]: + mask = (df["lead_source"] == source) & (rng.random(n) < rate) + df.loc[mask, "web_sessions"] = np.nan + + # (3) MAR: seniority by lead_source + partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) + other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) + df.loc[partner_mask | other_mask, "seniority"] = np.nan + + # (4) MCAR: expected_acv 2% + acv_mcar = rng.random(n) < 0.02 + df.loc[acv_mcar, "expected_acv"] = np.nan + + # (5) MCAR: days_since_first_touch 2% on top of structural + dsft_mask = rng.random(n) < 0.02 + df.loc[dsft_mask, "days_since_first_touch"] = np.nan + + # (6) MCAR: days_since_last_touch 3% on top of structural + dslt_mask = rng.random(n) < 0.03 + df.loc[dslt_mask, "days_since_last_touch"] = np.nan + + return df diff --git a/leadforge/pipelines/ml.py b/leadforge/pipelines/ml.py new file mode 100644 index 0000000..85b0d1a --- /dev/null +++ b/leadforge/pipelines/ml.py @@ -0,0 +1,121 @@ +"""Shared ML pipeline utilities for validation and evaluation scripts. + +Provides the canonical sklearn pipeline (ColumnTransformer + imputation + +encoding + LogisticRegression) used across dataset validators and baseline +evaluation scripts. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import average_precision_score, roc_auc_score +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, NUM_FEATURES, TARGET + +__all__ = [ + "build_baseline_pipeline", + "build_preprocessor", + "fit_evaluate", + "get_feature_cols", + "sanitize_categoricals", +] + +LEAKAGE_PREFIX = "__leakage__" + + +def build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer: + """Build the canonical preprocessing ColumnTransformer.""" + numeric_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="median")), + ("scaler", StandardScaler()), + ] + ) + categorical_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="most_frequent")), + ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), + ] + ) + return ColumnTransformer( + transformers=[ + ("num", numeric_transformer, num_cols), + ("cat", categorical_transformer, cat_cols), + ], + remainder="drop", + ) + + +def build_baseline_pipeline( + num_cols: list[str], + cat_cols: list[str], + seed: int = 42, +) -> Pipeline: + """Build the canonical sklearn baseline pipeline (preprocessor + LR).""" + preprocessor = build_preprocessor(num_cols, cat_cols) + return Pipeline( + [ + ("preprocessor", preprocessor), + ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=seed)), + ] + ) + + +def get_feature_cols( + df: pd.DataFrame, + exclude: set[str] | None = None, +) -> tuple[list[str], list[str]]: + """Partition feature columns into (cat_cols, num_cols). + + Uses the canonical feature lists, falling back to dtype-based detection + for columns not in the canonical lists (e.g. leakage trap columns). + """ + exclude = (exclude or set()) | {TARGET} + cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in exclude] + num_cols = [c for c in NUM_FEATURES + BINARY_FEATURES if c in df.columns and c not in exclude] + # Add any trap columns to numeric if not excluded + for c in df.columns: + if c.startswith(LEAKAGE_PREFIX) and c not in exclude: + num_cols.append(c) + return cat_cols, num_cols + + +def sanitize_categoricals(df: pd.DataFrame, cat_cols: list[str]) -> pd.DataFrame: + """Convert pd.NA in categorical columns to None for sklearn compatibility.""" + df = df.copy() + for c in cat_cols: + if c in df.columns: + df[c] = df[c].astype(object).where(df[c].notna(), None) + return df + + +def fit_evaluate( + df: pd.DataFrame, + exclude_cols: set[str] | None = None, + seed: int = 42, + test_size: float = 0.30, +) -> tuple[float, float, np.ndarray, pd.Series]: + """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" + y = df[TARGET].astype(int) + cat_cols, num_cols = get_feature_cols(df, exclude=exclude_cols) + df_clean = sanitize_categoricals(df, cat_cols) + x = df_clean[cat_cols + num_cols] + + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=test_size, random_state=seed, stratify=y + ) + + pipe = build_baseline_pipeline(num_cols, cat_cols, seed=seed) + pipe.fit(x_train, y_train) + probs = pipe.predict_proba(x_test)[:, 1] + + auc = float(roc_auc_score(y_test, probs)) + pr_auc = float(average_precision_score(y_test, probs)) + return auc, pr_auc, probs, y_test diff --git a/scripts/quick_baseline_eval_v6.py b/scripts/quick_baseline_eval_v6.py index a4bb3c7..ebb943e 100644 --- a/scripts/quick_baseline_eval_v6.py +++ b/scripts/quick_baseline_eval_v6.py @@ -15,9 +15,7 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier -from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( average_precision_score, @@ -25,37 +23,15 @@ ) from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler - -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" -SEED = 42 +from leadforge.pipelines.common import TARGET +from leadforge.pipelines.ml import LEAKAGE_PREFIX, build_preprocessor, get_feature_cols -def _get_feature_cols(df, exclude=None): - exclude = (exclude or set()) | {TARGET} - cat, num = [], [] - for col in df.columns: - if col in exclude: - continue - if pd.api.types.is_numeric_dtype(df[col]): - num.append(col) - else: - cat.append(col) - return cat, num +SEED = 42 def _build_pipeline(num_cols, cat_cols, model="lr"): - num_tr = Pipeline([("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler())]) - cat_tr = Pipeline( - [ - ("imp", SimpleImputer(strategy="most_frequent")), - ("enc", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - pre = ColumnTransformer( - [("num", num_tr, num_cols), ("cat", cat_tr, cat_cols)], remainder="drop" - ) + pre = build_preprocessor(num_cols, cat_cols) if model == "lr": clf = LogisticRegression(max_iter=1000, solver="lbfgs", random_state=SEED) elif model == "rf": @@ -118,7 +94,7 @@ def main(): print(f"Missing values: {df.isna().sum().sum()} total") leakage = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} - cat_cols, num_cols = _get_feature_cols(df, exclude=leakage) + cat_cols, num_cols = get_feature_cols(df, exclude=leakage) y = df[TARGET].astype(int) x = df[cat_cols + num_cols] @@ -176,8 +152,8 @@ def main(): trap_col = trap_cols[0] all_leakage = set(trap_cols) - cat_i, num_i = _get_feature_cols(df_i, exclude=all_leakage) - cat_with, num_with = _get_feature_cols(df_i, exclude=all_leakage - {trap_col}) + cat_i, num_i = get_feature_cols(df_i, exclude=all_leakage) + cat_with, num_with = get_feature_cols(df_i, exclude=all_leakage - {trap_col}) y_i = df_i[TARGET].astype(int) deltas = [] diff --git a/scripts/quick_baseline_eval_v7.py b/scripts/quick_baseline_eval_v7.py index 7bb7972..6881ba0 100644 --- a/scripts/quick_baseline_eval_v7.py +++ b/scripts/quick_baseline_eval_v7.py @@ -15,75 +15,17 @@ import numpy as np import pandas as pd from sklearn.base import clone -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier -from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler - -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -NUM_FEATURES = [ - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "opportunity_created", - "demo_completed", -] - - -def _sanitize(df: pd.DataFrame) -> pd.DataFrame: - df = df.copy() - for c in CAT_FEATURES: - if c in df.columns: - df[c] = df[c].astype(object).where(df[c].notna(), None) - return df - - -def _build_preprocessor(num_cols: list[str], cat_cols: list[str]) -> ColumnTransformer: - return ColumnTransformer( - transformers=[ - ( - "num", - Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ), - num_cols, - ), - ( - "cat", - Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ), - cat_cols, - ), - ], - remainder="drop", - ) + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, NUM_FEATURES, TARGET +from leadforge.pipelines.ml import LEAKAGE_PREFIX, build_preprocessor, sanitize_categoricals + +# NUM_FEATURES for eval includes binary features as numeric inputs +_EVAL_NUM_FEATURES = NUM_FEATURES + BINARY_FEATURES def main() -> None: @@ -94,10 +36,10 @@ def main() -> None: student_path = sys.argv[1] instructor_path = sys.argv[2] if len(sys.argv) > 2 else None - df = _sanitize(pd.read_csv(student_path)) + df = sanitize_categoricals(pd.read_csv(student_path), CAT_FEATURES) leakage = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in leakage] - num_cols = [c for c in NUM_FEATURES if c in df.columns and c not in leakage] + num_cols = [c for c in _EVAL_NUM_FEATURES if c in df.columns and c not in leakage] y = df[TARGET].astype(int) x = df[cat_cols + num_cols] @@ -123,7 +65,7 @@ def main() -> None: x_tr, x_te, y_tr, y_te = train_test_split( x, y, test_size=0.30, random_state=seed, stratify=y ) - pipe = Pipeline([("pre", _build_preprocessor(num_cols, cat_cols)), ("clf", clone(clf))]) + pipe = Pipeline([("pre", build_preprocessor(num_cols, cat_cols)), ("clf", clone(clf))]) pipe.fit(x_tr, y_tr) aucs.append(roc_auc_score(y_te, pipe.predict_proba(x_te)[:, 1])) print(f" {name:4s}: AUC = {np.mean(aucs):.4f} (std={np.std(aucs):.4f})") @@ -136,7 +78,7 @@ def main() -> None: x_tr, x_te, y_tr, y_te = train_test_split(x, y, test_size=0.30, random_state=42, stratify=y) pipe = Pipeline( [ - ("pre", _build_preprocessor(num_cols, cat_cols)), + ("pre", build_preprocessor(num_cols, cat_cols)), ("clf", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), ] ) @@ -171,7 +113,7 @@ def main() -> None: print("\nFeature importance (GBM):") gbm_pipe = Pipeline( [ - ("pre", _build_preprocessor(num_cols, cat_cols)), + ("pre", build_preprocessor(num_cols, cat_cols)), ("clf", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) @@ -193,7 +135,7 @@ def main() -> None: print("\n" + "=" * 60) print("TRAP DETECTION (instructor)") print("=" * 60) - inst = _sanitize(pd.read_csv(instructor_path)) + inst = sanitize_categoricals(pd.read_csv(instructor_path), CAT_FEATURES) trap_cols = [c for c in inst.columns if c.startswith(LEAKAGE_PREFIX)] if trap_cols: trap_col = trap_cols[0] diff --git a/scripts/validate_v6_dataset.py b/scripts/validate_v6_dataset.py index d289cf3..733f367 100644 --- a/scripts/validate_v6_dataset.py +++ b/scripts/validate_v6_dataset.py @@ -13,25 +13,28 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier -from sklearn.impute import SimpleImputer -from sklearn.linear_model import LogisticRegression -from sklearn.metrics import ( - average_precision_score, - roc_auc_score, -) +from sklearn.metrics import roc_auc_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, TARGET +from leadforge.pipelines.ml import ( + LEAKAGE_PREFIX, + build_baseline_pipeline, + build_preprocessor, +) +from leadforge.pipelines.ml import ( + fit_evaluate as _fit_evaluate, +) +from leadforge.pipelines.ml import ( + get_feature_cols as _get_feature_cols, +) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - BANNED_COLUMNS = { "current_stage", "funnel_stage", @@ -41,22 +44,6 @@ "lead_created_at", } -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -BINARY_FEATURES = [ - "opportunity_created", - "demo_completed", -] - # Validation thresholds AUC_LOWER = 0.62 AUC_UPPER = 0.90 @@ -72,82 +59,10 @@ # --------------------------------------------------------------------------- -# ML pipeline builder (canonical) +# ML pipeline (imported from shared utility) # --------------------------------------------------------------------------- - -def _build_pipeline( - num_cols: list[str], - cat_cols: list[str], -) -> Pipeline: - """Build the canonical sklearn baseline pipeline.""" - numeric_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="median")), - ("scaler", StandardScaler()), - ] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - return Pipeline( - [ - ("preprocessor", preprocessor), - ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), - ] - ) - - -def _get_feature_cols( - df: pd.DataFrame, - exclude: set[str] | None = None, -) -> tuple[list[str], list[str]]: - """Partition feature columns into (cat_cols, num_cols).""" - exclude = (exclude or set()) | {TARGET} - cat_cols = [] - num_cols = [] - for col in df.columns: - if col in exclude: - continue - if pd.api.types.is_numeric_dtype(df[col]): - num_cols.append(col) - else: - cat_cols.append(col) - return cat_cols, num_cols - - -def _fit_evaluate( - df: pd.DataFrame, - exclude_cols: set[str] | None = None, - seed: int = 42, - test_size: float = 0.30, -) -> tuple[float, float, np.ndarray, pd.Series]: - """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" - y = df[TARGET].astype(int) - cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) - x = df[cat_cols + num_cols] - - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=test_size, random_state=seed, stratify=y - ) - - pipe = _build_pipeline(num_cols, cat_cols) - pipe.fit(x_train, y_train) - probs = pipe.predict_proba(x_test)[:, 1] - - auc = float(roc_auc_score(y_test, probs)) - pr_auc = float(average_precision_score(y_test, probs)) - return auc, pr_auc, probs, y_test +_build_pipeline = build_baseline_pipeline # --------------------------------------------------------------------------- @@ -272,25 +187,9 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_aucs.append(lr_auc) # GBM with one-hot encoded features - numeric_transformer = Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) gb = Pipeline( [ - ("preprocessor", preprocessor), + ("preprocessor", build_preprocessor(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/scripts/validate_v7_dataset.py b/scripts/validate_v7_dataset.py index e440b60..4042aca 100644 --- a/scripts/validate_v7_dataset.py +++ b/scripts/validate_v7_dataset.py @@ -22,25 +22,31 @@ import numpy as np import pandas as pd -from sklearn.compose import ColumnTransformer from sklearn.ensemble import GradientBoostingClassifier -from sklearn.impute import SimpleImputer -from sklearn.linear_model import LogisticRegression -from sklearn.metrics import ( - average_precision_score, - roc_auc_score, -) +from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, TARGET +from leadforge.pipelines.ml import ( + LEAKAGE_PREFIX, + build_baseline_pipeline, + build_preprocessor, +) +from leadforge.pipelines.ml import ( + fit_evaluate as _fit_evaluate, +) +from leadforge.pipelines.ml import ( + get_feature_cols as _get_feature_cols, +) +from leadforge.pipelines.ml import ( + sanitize_categoricals as _sanitize_categoricals, +) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- -TARGET = "converted" -LEAKAGE_PREFIX = "__leakage__" - BANNED_COLUMNS = { "current_stage", "funnel_stage", @@ -51,34 +57,6 @@ "close_outcome", } -CAT_FEATURES = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "acquisition_wave", -] - -BINARY_FEATURES = [ - "opportunity_created", - "demo_completed", -] - -NUM_FEATURES = [ - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", -] - # Validation thresholds AUC_LOWER = 0.62 AUC_UPPER = 0.90 @@ -100,89 +78,10 @@ # --------------------------------------------------------------------------- -# ML pipeline builder (canonical) +# ML pipeline (imported from shared utility) # --------------------------------------------------------------------------- - -def _build_pipeline( - num_cols: list[str], - cat_cols: list[str], -) -> Pipeline: - """Build the canonical sklearn baseline pipeline.""" - numeric_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="median")), - ("scaler", StandardScaler()), - ] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) - return Pipeline( - [ - ("preprocessor", preprocessor), - ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), - ] - ) - - -def _get_feature_cols( - df: pd.DataFrame, - exclude: set[str] | None = None, -) -> tuple[list[str], list[str]]: - """Partition feature columns into (cat_cols, num_cols).""" - exclude = (exclude or set()) | {TARGET} - cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in exclude] - num_cols = [c for c in NUM_FEATURES + BINARY_FEATURES if c in df.columns and c not in exclude] - # Add any trap columns to numeric if not excluded - for c in df.columns: - if c.startswith(LEAKAGE_PREFIX) and c not in exclude: - num_cols.append(c) - return cat_cols, num_cols - - -def _sanitize_categoricals(df: pd.DataFrame, cat_cols: list[str]) -> pd.DataFrame: - """Convert pd.NA in categorical columns to None for sklearn compatibility.""" - df = df.copy() - for c in cat_cols: - if c in df.columns: - df[c] = df[c].astype(object).where(df[c].notna(), None) - return df - - -def _fit_evaluate( - df: pd.DataFrame, - exclude_cols: set[str] | None = None, - seed: int = 42, - test_size: float = 0.30, -) -> tuple[float, float, np.ndarray, pd.Series]: - """Fit LR on hold-out split, return (AUC, PR-AUC, probs, y_test).""" - y = df[TARGET].astype(int) - cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) - df_clean = _sanitize_categoricals(df, cat_cols) - x = df_clean[cat_cols + num_cols] - - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=test_size, random_state=seed, stratify=y - ) - - pipe = _build_pipeline(num_cols, cat_cols) - pipe.fit(x_train, y_train) - probs = pipe.predict_proba(x_test)[:, 1] - - auc = float(roc_auc_score(y_test, probs)) - pr_auc = float(average_precision_score(y_test, probs)) - return auc, pr_auc, probs, y_test +_build_pipeline = build_baseline_pipeline # --------------------------------------------------------------------------- @@ -307,25 +206,9 @@ def check_tree_improvement(df: pd.DataFrame, label: str) -> tuple[list[str], dic lr_auc = roc_auc_score(y_test, lr.predict_proba(x_test)[:, 1]) lr_aucs.append(lr_auc) - numeric_transformer = Pipeline( - [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())] - ) - categorical_transformer = Pipeline( - [ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), - ] - ) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, num_cols), - ("cat", categorical_transformer, cat_cols), - ], - remainder="drop", - ) gb = Pipeline( [ - ("preprocessor", preprocessor), + ("preprocessor", build_preprocessor(num_cols, cat_cols)), ("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)), ] ) diff --git a/tests/mechanisms/test_mechanisms.py b/tests/mechanisms/test_mechanisms.py index 17c63dd..99b0c68 100644 --- a/tests/mechanisms/test_mechanisms.py +++ b/tests/mechanisms/test_mechanisms.py @@ -10,6 +10,7 @@ from leadforge.mechanisms.base import MechanismAssignment, MechanismContext, MechanismSummary from leadforge.mechanisms.categorical import CHANNEL_QUALITY_SCORES, CategoricalInfluence from leadforge.mechanisms.counts import ( + FollowupRampConfig, LatentDecayIntensity, PoissonIntensity, RecencyDecayIntensity, @@ -749,3 +750,53 @@ def test_followup_boost_factor_below_one_raises(self) -> None: def test_followup_ramp_days_zero_raises(self) -> None: with pytest.raises(ValueError, match="followup_ramp_days must be >= 1"): LatentDecayIntensity(base_rate=0.5, followup_ramp_days=0) + + def test_followup_ramp_config_dataclass_equivalent(self) -> None: + """FollowupRampConfig path produces identical behavior to legacy kwargs.""" + latent_w = {"latent_fit": 1.5, "latent_intent": 1.0} + followup_w = {"latent_budget": 2.0, "latent_authority": 1.5} + latents = { + "latent_fit": 0.8, + "latent_intent": 0.6, + "latent_budget": 0.7, + "latent_authority": 0.5, + } + + # Legacy path + ldi_legacy = LatentDecayIntensity( + base_rate=0.5, + latent_weights=latent_w, + boost=1.2, + followup_boost_after_day=20, + followup_boost_factor=5.0, + followup_ramp_days=10, + followup_latent_weights=followup_w, + ) + + # Dataclass path + ldi_new = LatentDecayIntensity( + base_rate=0.5, + latent_weights=latent_w, + boost=1.2, + followup=FollowupRampConfig( + boost_after_day=20, + boost_factor=5.0, + ramp_days=10, + latent_weights=followup_w, + ), + ) + + # Both must produce identical expected counts at various days + for t in [0, 10, 20, 25, 30, 50]: + assert ldi_legacy.expected_count(t, latents) == ldi_new.expected_count(t, latents), ( + f"Mismatch at t={t}" + ) + + def test_followup_ramp_config_validation(self) -> None: + """FollowupRampConfig validates on construction.""" + with pytest.raises(ValueError, match="boost_after_day must be non-negative"): + FollowupRampConfig(boost_after_day=-1) + with pytest.raises(ValueError, match="boost_factor must be >= 1.0"): + FollowupRampConfig(boost_after_day=10, boost_factor=0.5) + with pytest.raises(ValueError, match="ramp_days must be >= 1"): + FollowupRampConfig(boost_after_day=10, ramp_days=0) diff --git a/tests/scripts/test_build_v5_snapshot.py b/tests/scripts/test_build_v5_snapshot.py index 08dd5c4..de6941e 100644 --- a/tests/scripts/test_build_v5_snapshot.py +++ b/tests/scripts/test_build_v5_snapshot.py @@ -205,37 +205,27 @@ def test_deterministic_given_seed(self): pd.testing.assert_frame_equal(r1, r2) def test_insufficient_positives(self): - """When fewer positives available than needed, warns and adjusts.""" + """When fewer positives than needed, raises ValueError.""" df = _make_v5_df(n=200, conversion_rate=0.05) # only ~10 positives - with pytest.warns(UserWarning, match="positives available"): - result = subsample(df, seed=42, n=100, target_rate=0.50) # need 50 positives - # All available positives should be included - assert result["converted"].sum() <= 10 + with pytest.raises(ValueError, match="positives available"): + subsample(df, seed=42, n=100, target_rate=0.50) # need 50 positives def test_insufficient_negatives(self): - """When fewer negatives available than needed, warns and adjusts.""" + """When fewer negatives than needed, raises ValueError.""" df = _make_v5_df(n=200, conversion_rate=0.95) # only ~10 negatives - n_neg_available = (df["converted"] == 0).sum() - with pytest.warns(UserWarning, match="negatives available"): - result = subsample(df, seed=42, n=100, target_rate=0.10) # need 90 negatives - # Verify actual composition: negatives capped at available count - n_neg_result = (result["converted"] == 0).sum() - assert n_neg_result <= n_neg_available - # Output should still contain rows (not empty) - assert len(result) > 0 + with pytest.raises(ValueError, match="negatives available"): + subsample(df, seed=42, n=100, target_rate=0.10) # need 90 negatives def test_index_is_reset(self): df = _make_v5_df(n=500) result = subsample(df, seed=42, n=100, target_rate=0.30) assert list(result.index) == list(range(len(result))) - def test_n_larger_than_input_caps_gracefully(self): - """Requesting more rows than available caps at available count.""" + def test_n_larger_than_input_raises(self): + """Requesting more rows than available raises ValueError.""" df = _make_v5_df(n=50) - with pytest.warns(UserWarning, match="available"): - result = subsample(df, seed=42, n=200, target_rate=0.30) - # Output should contain all available rows (capped) - assert len(result) <= len(df) + with pytest.raises(ValueError, match="available"): + subsample(df, seed=42, n=200, target_rate=0.30) # ---------------------------------------------------------------------------