diff --git a/.agent-plan.md b/.agent-plan.md index 8b5873e..0a55f90 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -395,6 +395,19 @@ From self-review of PR #50. Completed in a single follow-up PR. | Group followup params into dataclass | ✓ `FollowupRampConfig` frozen dataclass in `mechanisms/counts.py`. `LatentDecayIntensity` accepts `followup: FollowupRampConfig | None`. Legacy params still accepted for backward compat. | | Fix `subsample` silent short-return | ✓ `subsample()` now raises `ValueError` when insufficient negatives. | +### mid-project: generation framework for 3-week pair mid-project dataset ✓ + +Generator code only — no dataset artifacts committed here (public repo). +Dataset artifacts live in `leadforge-datasets-private/lead_scoring_midproject/`. + +| Item | Status | +|---|---| +| `leadforge/pipelines/build_midproject.py` | ✓ Pipeline module (seed=100, SUBSAMPLE_N=1200) | +| `scripts/build_midproject_lead_scoring.py` | ✓ Build CLI | +| `scripts/validate_midproject_lead_scoring.py` | ✓ Validation script | +| `scripts/quick_baseline_eval_midproject.py` | ✓ Baseline evaluation script | +| Dataset artifacts | ✓ In `leadforge-datasets-private` (private repo) | + ### From post-v1 list - Second vertical diff --git a/leadforge/pipelines/build_midproject.py b/leadforge/pipelines/build_midproject.py new file mode 100644 index 0000000..f1a1d22 --- /dev/null +++ b/leadforge/pipelines/build_midproject.py @@ -0,0 +1,79 @@ +"""Pipeline functions for building the mid-project lead scoring dataset. + +Produces a single student-safe CSV with 1,200 rows at ~30% conversion rate. +No leakage trap column — this dataset is published directly to students. + +Key parameters vs v7: +- SEED = 100 (different seed → different rows from v7's seed=42) +- SUBSAMPLE_N = 1200 (slightly larger than v7's 1000) +- No instructor/trap variant +- Same schema, narrative, missingness patterns as v7 +""" + +from __future__ import annotations + +import pandas as pd + +from leadforge.pipelines.common import ( + ACV_CAP, + ACV_FLOOR, + FINAL_COLUMNS_STUDENT, + RENAME_MAP, + TARGET_RATE, + assign_acquisition_wave, + derive_features, + softcap_expected_acv, + subsample, +) +from leadforge.pipelines.common import ( + inject_missingness_v6 as inject_missingness, +) +from leadforge.pipelines.common import ( + rename_and_select as _rename_and_select_generic, +) + +__all__ = [ + "ACV_CAP", + "ACV_FLOOR", + "FINAL_COLUMNS_STUDENT", + "N_LEADS", + "RENAME_MAP", + "SEED", + "SNAPSHOT_DAY", + "SUBSAMPLE_N", + "TARGET_RATE", + "assign_acquisition_wave", + "derive_features", + "inject_missingness", + "rename_and_select", + "softcap_expected_acv", + "subsample", +] + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SEED = 100 +N_LEADS = 5000 +SNAPSHOT_DAY = 20 +SUBSAMPLE_N = 1200 + + +# --------------------------------------------------------------------------- +# Version-specific pipeline steps +# --------------------------------------------------------------------------- + + +def rename_and_select( + df: pd.DataFrame, + *, + label_column: str = "converted_within_90_days", +) -> pd.DataFrame: + """Rename snapshot columns to midproject names and select final column set.""" + return _rename_and_select_generic( + df, + rename_map=RENAME_MAP, + final_columns=FINAL_COLUMNS_STUDENT, + instructor=False, + label_column=label_column, + ) diff --git a/scripts/build_midproject_lead_scoring.py b/scripts/build_midproject_lead_scoring.py new file mode 100644 index 0000000..3363b37 --- /dev/null +++ b/scripts/build_midproject_lead_scoring.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""Build the mid-project lead scoring dataset. + +Usage: + python scripts/build_midproject_lead_scoring.py OUTPUT_DIR + +Produces one file in OUTPUT_DIR: + lead_scoring_midproject.csv (student-safe, no leakage columns) + +1,200 rows at ~30% conversion rate, snapshot day 20. +Seed: 100. Schema identical to lead_scoring_intro_v7.csv. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pandas as pd + +from leadforge.api.generator import Generator +from leadforge.pipelines.build_midproject import ( + N_LEADS, + SEED, + SNAPSHOT_DAY, + SUBSAMPLE_N, + assign_acquisition_wave, + derive_features, + inject_missingness, + rename_and_select, + softcap_expected_acv, + subsample, +) +from leadforge.render.snapshots import build_snapshot + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def generate_bundle(seed: int = SEED, n_leads: int = N_LEADS): + """Generate a full bundle and return (snapshot, bundle).""" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=seed, + exposure_mode="research_instructor", + n_leads=n_leads, + difficulty="intro", + ) + bundle = gen.generate(latent_touch_intensity=True) + snapshot = build_snapshot( + bundle.simulation_result, + bundle.population, + snapshot_day=SNAPSHOT_DAY, + ) + return snapshot, bundle + + +def build_midproject_dataset(seed: int = SEED) -> pd.DataFrame: + """Full pipeline: generate → derive → process → subsample → missingness.""" + print("Generating bundle...", file=sys.stderr) + snapshot, _bundle = generate_bundle(seed=seed) + conv = snapshot["converted_within_90_days"].mean() + print( + f" Raw snapshot: {len(snapshot)} rows, conversion={conv:.1%}", + file=sys.stderr, + ) + + df = derive_features(snapshot) + df = softcap_expected_acv(df, seed) + df = assign_acquisition_wave(df, seed) + df = rename_and_select(df) + + print(f"Subsampling to {SUBSAMPLE_N} rows...", file=sys.stderr) + df = subsample(df, seed, n=SUBSAMPLE_N) + print( + f" Subsampled: {len(df)} rows, conversion={df['converted'].mean():.1%}", + file=sys.stderr, + ) + + print("Injecting missingness...", file=sys.stderr) + df = inject_missingness(df, seed) + + return df + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} OUTPUT_DIR", file=sys.stderr) + sys.exit(1) + + output_dir = Path(sys.argv[1]) + output_dir.mkdir(parents=True, exist_ok=True) + + df = build_midproject_dataset() + + out_path = output_dir / "lead_scoring_midproject.csv" + df.to_csv(out_path, index=False) + print( + f"Midproject: {len(df)} rows x {len(df.columns)} cols → {out_path}", + file=sys.stderr, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/quick_baseline_eval_midproject.py b/scripts/quick_baseline_eval_midproject.py new file mode 100644 index 0000000..e698f77 --- /dev/null +++ b/scripts/quick_baseline_eval_midproject.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Quick baseline evaluation for the mid-project lead scoring dataset. + +Usage: + python scripts/quick_baseline_eval_midproject.py CSV_PATH + +Runs LR + RF + GBM baselines, value-aware scoring, and feature importance. +""" + +from __future__ import annotations + +import sys + +import numpy as np +import pandas as pd +from sklearn.base import clone +from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import average_precision_score, roc_auc_score +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, NUM_FEATURES, TARGET +from leadforge.pipelines.ml import LEAKAGE_PREFIX, build_preprocessor, sanitize_categoricals + +_EVAL_NUM_FEATURES = NUM_FEATURES + BINARY_FEATURES + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} CSV_PATH", file=sys.stderr) + sys.exit(1) + + df = sanitize_categoricals(pd.read_csv(sys.argv[1]), CAT_FEATURES) + leakage = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} + cat_cols = [c for c in CAT_FEATURES if c in df.columns and c not in leakage] + num_cols = [c for c in _EVAL_NUM_FEATURES if c in df.columns and c not in leakage] + + y = df[TARGET].astype(int) + x = df[cat_cols + num_cols] + + print(f"Dataset: {len(df)} rows, {len(df.columns)} cols") + print(f"Conversion rate: {y.mean():.1%}") + print(f"Features: {len(cat_cols)} cat + {len(num_cols)} num = {len(cat_cols) + len(num_cols)}") + + print("\n" + "=" * 60) + print("MODEL COMPARISON (5-seed average, 70/30 stratified)") + print("=" * 60) + + models = { + "LR": LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42), + "RF": RandomForestClassifier(n_estimators=100, random_state=42), + "GBM": GradientBoostingClassifier(n_estimators=100, random_state=42), + } + for name, clf in models.items(): + aucs = [] + for seed in range(42, 47): + x_tr, x_te, y_tr, y_te = train_test_split( + x, y, test_size=0.30, random_state=seed, stratify=y + ) + pipe = Pipeline([("pre", build_preprocessor(num_cols, cat_cols)), ("clf", clone(clf))]) + pipe.fit(x_tr, y_tr) + aucs.append(roc_auc_score(y_te, pipe.predict_proba(x_te)[:, 1])) + print(f" {name:4s}: AUC = {np.mean(aucs):.4f} (std={np.std(aucs):.4f})") + + print("\n" + "=" * 60) + print("DETAILED METRICS (seed 42)") + print("=" * 60) + + x_tr, x_te, y_tr, y_te = train_test_split(x, y, test_size=0.30, random_state=42, stratify=y) + pipe = Pipeline( + [ + ("pre", build_preprocessor(num_cols, cat_cols)), + ("clf", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), + ] + ) + pipe.fit(x_tr, y_tr) + probs = pipe.predict_proba(x_te)[:, 1] + auc = roc_auc_score(y_te, probs) + pr_auc = average_precision_score(y_te, probs) + base = y_te.mean() + print(f" AUC: {auc:.4f}") + print(f" PR-AUC: {pr_auc:.4f}") + print(f" Base rate: {base:.1%}") + + order = np.argsort(-probs) + y_sorted = y_te.values[order] + for k in [25, 50, 100]: + if k <= len(y_te): + prec = y_sorted[:k].mean() + rec = y_sorted[:k].sum() / y_te.sum() + lift = prec / base + print(f" P@{k}={prec:.3f} R@{k}={rec:.3f} Lift@{k}={lift:.2f}x") + + print("\nValue-aware ranking:") + test_acv = pd.to_numeric(df.loc[x_te.index, "expected_acv"], errors="coerce").fillna(0).values + test_conv = y_te.values + ev = probs * test_acv + for k in [25, 50]: + top_prob = np.argsort(-probs)[:k] + cap_prob = np.sum(test_acv[top_prob] * test_conv[top_prob]) + conv_prob = int(test_conv[top_prob].sum()) + top_ev = np.argsort(-ev)[:k] + cap_ev = np.sum(test_acv[top_ev] * test_conv[top_ev]) + conv_ev = int(test_conv[top_ev].sum()) + uplift = (cap_ev - cap_prob) / cap_prob * 100 if cap_prob > 0 else 0.0 + print( + f" K={k}: prob=${cap_prob:,.0f} (conv={conv_prob}) " + f"ev=${cap_ev:,.0f} (conv={conv_ev}) uplift={uplift:+.1f}%" + ) + + print("\nFeature importance (GBM):") + gbm_pipe = Pipeline( + [ + ("pre", build_preprocessor(num_cols, cat_cols)), + ("clf", GradientBoostingClassifier(n_estimators=100, random_state=42)), + ] + ) + gbm_pipe.fit(x_tr, y_tr) + importances = gbm_pipe.named_steps["clf"].feature_importances_ + ohe = gbm_pipe.named_steps["pre"].named_transformers_["cat"].named_steps["encoder"] + cat_names = list(ohe.get_feature_names_out(cat_cols)) + feature_names = num_cols + cat_names + imp_df = pd.DataFrame({"feature": feature_names, "importance": importances}) + imp_df = imp_df.sort_values("importance", ascending=False) + for _, row in imp_df.head(15).iterrows(): + print(f" {row['feature']:40s} {row['importance']:.4f}") + + print("\nMissingness summary:") + for col in df.columns: + n_miss = df[col].isna().sum() + if n_miss > 0: + print(f" {col}: {n_miss} ({n_miss / len(df):.1%})") + + +if __name__ == "__main__": + main() diff --git a/scripts/validate_midproject_lead_scoring.py b/scripts/validate_midproject_lead_scoring.py new file mode 100644 index 0000000..e78602f --- /dev/null +++ b/scripts/validate_midproject_lead_scoring.py @@ -0,0 +1,457 @@ +#!/usr/bin/env python3 +"""Validate the mid-project lead scoring dataset against spec. + +Usage: + python scripts/validate_midproject_lead_scoring.py CSV_PATH [--out-json PATH] + +Validates a single student-safe CSV (no instructor/trap variant for midproject). +Exit code 0 = all mandatory checks pass. + +Canonical pipeline: +- Numeric: SimpleImputer(median) + StandardScaler +- Categorical: SimpleImputer(most_frequent) + OneHotEncoder(handle_unknown='ignore') +- Model: LogisticRegression(max_iter=1000, solver='lbfgs', random_state=42) +- Split: 70/30 stratified hold-out, random_state=42 +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import numpy as np +import pandas as pd +from sklearn.metrics import average_precision_score, roc_auc_score +from sklearn.model_selection import train_test_split + +from leadforge.pipelines.common import BINARY_FEATURES, CAT_FEATURES, TARGET +from leadforge.pipelines.common import FINAL_COLUMNS_STUDENT as EXPECTED_COLUMNS +from leadforge.pipelines.ml import ( + LEAKAGE_PREFIX, + build_baseline_pipeline, + fit_evaluate, + get_feature_cols, + sanitize_categoricals, +) + +# --------------------------------------------------------------------------- +# Thresholds +# --------------------------------------------------------------------------- +AUC_LOWER = 0.62 +AUC_UPPER = 0.80 +PR_AUC_LOWER = 0.35 +MAX_COL_MISSING_RATE = 0.10 +MAX_DUPLICATE_RATE = 0.005 +MIN_CONVERSION_RATE = 0.25 +MAX_CONVERSION_RATE = 0.35 +MIN_GROUP_SIZE = 50 +RATE_LOWER = 0.02 +RATE_UPPER = 0.98 +ACV_MIN = 18_000.0 +ACV_MAX = 120_000.0 +ACV_PILE_UP_WARN = 0.05 + +BANNED_COLUMNS = { + "current_stage", + "funnel_stage", + "conversion_timestamp", + "is_sql", + "is_mql", + "lead_created_at", + "close_outcome", + "converted_within_90_days", +} + + +# --------------------------------------------------------------------------- +# Checks +# --------------------------------------------------------------------------- + + +def check_basic(df: pd.DataFrame) -> list[str]: + errors = [] + + n = len(df) + if not (1000 <= n <= 1500): + errors.append(f"Row count {n} outside acceptable range [1000, 1500]") + + if TARGET not in df.columns: + errors.append(f"Missing target column '{TARGET}'") + return errors + + target_vals = set(df[TARGET].dropna().unique()) + if not target_vals <= {0, 1}: + errors.append(f"Target values not binary: {target_vals}") + if df[TARGET].isna().any(): + errors.append("Target has missing values") + + conv_rate = df[TARGET].mean() + if conv_rate < MIN_CONVERSION_RATE: + errors.append(f"Conversion rate {conv_rate:.2%} < {MIN_CONVERSION_RATE:.0%}") + if conv_rate > MAX_CONVERSION_RATE: + errors.append(f"Conversion rate {conv_rate:.2%} > {MAX_CONVERSION_RATE:.0%}") + + leakage = [c for c in df.columns if c.startswith(LEAKAGE_PREFIX)] + if leakage: + errors.append(f"Leakage columns must not appear in student CSV: {leakage}") + + banned = BANNED_COLUMNS & set(df.columns) + if banned: + errors.append(f"Banned columns present: {sorted(banned)}") + + id_cols = [c for c in df.columns if c.endswith("_id")] + if id_cols: + errors.append(f"ID columns found (should not appear): {sorted(id_cols)}") + + n_dupes = df.duplicated().sum() + dupe_rate = n_dupes / n if n > 0 else 0.0 + if dupe_rate > MAX_DUPLICATE_RATE: + errors.append(f"{n_dupes} duplicate rows ({dupe_rate:.1%}) > {MAX_DUPLICATE_RATE:.1%}") + + return errors + + +def check_schema(df: pd.DataFrame) -> list[str]: + errors = [] + expected = list(EXPECTED_COLUMNS) + actual = list(df.columns) + if actual != expected: + missing = [c for c in expected if c not in df.columns] + extra = [c for c in df.columns if c not in expected] + if missing: + errors.append(f"Missing expected columns: {missing}") + if extra: + errors.append(f"Extra unexpected columns: {extra}") + if actual != expected and not missing and not extra: + errors.append("Column order differs from v7 student schema") + return errors + + +def check_missingness(df: pd.DataFrame) -> tuple[list[str], dict]: + errors = [] + report: dict[str, dict] = {} + for col in df.columns: + if col == TARGET: + continue + n_miss = int(df[col].isna().sum()) + if n_miss > 0: + rate = n_miss / len(df) + report[col] = {"count": n_miss, "rate": round(rate, 4)} + if rate > MAX_COL_MISSING_RATE: + errors.append(f"{col}: {rate:.1%} missing > {MAX_COL_MISSING_RATE:.0%} limit") + if df[TARGET].isna().any(): + errors.append("Target column has missing values") + # Check structural missingness exists in expected columns + for col in ["web_sessions", "days_since_last_touch"]: + if col in df.columns and df[col].isna().sum() == 0: + errors.append(f"{col} has zero missing values (expected structured missingness)") + return errors, report + + +def check_determinism(df: pd.DataFrame) -> list[str]: + errors = [] + check_cols = [c for c in CAT_FEATURES + BINARY_FEATURES if c in df.columns] + for col in check_cols: + stats = df.groupby(col)[TARGET].agg(["mean", "count"]) + for val, row in stats[stats["count"] >= MIN_GROUP_SIZE].iterrows(): + if row["mean"] < RATE_LOWER: + errors.append( + f"DETERMINISTIC: {col}={val}: {row['mean']:.1%} (n={int(row['count'])})" + ) + elif row["mean"] > RATE_UPPER: + errors.append( + f"DETERMINISTIC: {col}={val}: {row['mean']:.1%} (n={int(row['count'])})" + ) + return errors + + +def check_acv(df: pd.DataFrame) -> tuple[list[str], dict]: + errors = [] + if "expected_acv" not in df.columns: + return ["expected_acv column missing"], {} + acv = pd.to_numeric(df["expected_acv"], errors="coerce").dropna() + if acv.empty: + return ["expected_acv has no non-null values"], {} + stats = { + "min": float(acv.min()), + "mean": float(acv.mean()), + "median": float(acv.median()), + "p95": float(acv.quantile(0.95)), + "p99": float(acv.quantile(0.99)), + "max": float(acv.max()), + "at_max_frac": float((acv >= acv.max() - 1).sum() / len(acv)), + } + if acv.min() < ACV_MIN - 1: + errors.append(f"expected_acv min {acv.min():.0f} < {ACV_MIN:.0f}") + if acv.max() > ACV_MAX + 1: + errors.append(f"expected_acv max {acv.max():.0f} > {ACV_MAX:.0f}") + if stats["at_max_frac"] > ACV_PILE_UP_WARN: + errors.append(f"{stats['at_max_frac']:.1%} of expected_acv at max — possible pile-up") + return errors, stats + + +def check_baseline(df: pd.DataFrame) -> tuple[list[str], dict]: + auc, pr_auc, probs, y_test = fit_evaluate(df) + errors = [] + if auc < AUC_LOWER: + errors.append(f"Baseline AUC {auc:.3f} < {AUC_LOWER}") + if auc > AUC_UPPER: + errors.append(f"Baseline AUC {auc:.3f} > {AUC_UPPER}") + if pr_auc < PR_AUC_LOWER: + errors.append(f"Baseline PR-AUC {pr_auc:.3f} < {PR_AUC_LOWER}") + + base_rate = float(y_test.mean()) + n_pos = int(y_test.sum()) + order = np.argsort(-probs, kind="stable") + y_sorted = y_test.values[order] + + metrics: dict[str, float] = {"auc": auc, "pr_auc": pr_auc, "base_rate": base_rate} + for k in [25, 50, 100]: + if k > len(y_test): + continue + prec = float(y_sorted[:k].mean()) + rec = float(y_sorted[:k].sum() / n_pos) if n_pos > 0 else 0.0 + lift = prec / base_rate if base_rate > 0 else 0.0 + metrics[f"precision@{k}"] = prec + metrics[f"recall@{k}"] = rec + metrics[f"lift@{k}"] = lift + metrics[f"conversions@{k}"] = int(y_sorted[:k].sum()) + metrics[f"random_conversions@{k}"] = round(k * base_rate, 1) + + return errors, metrics + + +def check_value_aware(df: pd.DataFrame) -> tuple[list[str], list[dict]]: + if "expected_acv" not in df.columns: + return ["expected_acv column missing"], [] + + cat_cols, num_cols = get_feature_cols(df) + df_clean = sanitize_categoricals(df, cat_cols) + y = df[TARGET].astype(int) + x = df_clean[cat_cols + num_cols] + + x_tr, x_te, y_tr, y_te = train_test_split(x, y, test_size=0.30, random_state=42, stratify=y) + pipe = build_baseline_pipeline(num_cols, cat_cols) + pipe.fit(x_tr, y_tr) + probs = pipe.predict_proba(x_te)[:, 1] + + test_acv = pd.to_numeric(df.loc[x_te.index, "expected_acv"], errors="coerce").fillna(0).values + test_conv = y_te.values + ev = probs * test_acv + + results = [] + for k in [25, 50]: + if k > len(y_te): + continue + top_prob_idx = np.argsort(-probs)[:k] + cap_prob = float(np.sum(test_acv[top_prob_idx] * test_conv[top_prob_idx])) + conv_prob = int(test_conv[top_prob_idx].sum()) + + top_ev_idx = np.argsort(-ev)[:k] + cap_ev = float(np.sum(test_acv[top_ev_idx] * test_conv[top_ev_idx])) + conv_ev = int(test_conv[top_ev_idx].sum()) + + uplift = (cap_ev - cap_prob) / cap_prob * 100 if cap_prob > 0 else 0.0 + results.append( + { + "k": k, + "captured_prob": cap_prob, + "captured_ev": cap_ev, + "conversions_prob": conv_prob, + "conversions_ev": conv_ev, + "uplift_pct": uplift, + } + ) + + return [], results + + +def check_cohort(df: pd.DataFrame) -> dict | None: + if "acquisition_wave" not in df.columns: + return None + cat_cols, num_cols = get_feature_cols(df, exclude={"acquisition_wave"}) + df_clean = sanitize_categoricals(df, cat_cols) + y = df[TARGET].astype(int) + x = df_clean[cat_cols + num_cols] + + x_tr, x_te, y_tr, y_te = train_test_split(x, y, test_size=0.30, random_state=42, stratify=y) + pipe_r = build_baseline_pipeline(num_cols, cat_cols) + pipe_r.fit(x_tr, y_tr) + random_auc = roc_auc_score(y_te, pipe_r.predict_proba(x_te)[:, 1]) + random_pr = average_precision_score(y_te, pipe_r.predict_proba(x_te)[:, 1]) + + train_mask = df["acquisition_wave"].isin(["A", "B"]) + test_mask = df["acquisition_wave"] == "C" + if test_mask.sum() < 30 or train_mask.sum() < 100: + return None + + pipe_c = build_baseline_pipeline(num_cols, cat_cols) + pipe_c.fit(x[train_mask], y[train_mask]) + cohort_auc = roc_auc_score(y[test_mask], pipe_c.predict_proba(x[test_mask])[:, 1]) + cohort_pr = average_precision_score(y[test_mask], pipe_c.predict_proba(x[test_mask])[:, 1]) + + return { + "random_auc": random_auc, + "random_pr_auc": random_pr, + "cohort_auc": cohort_auc, + "cohort_pr_auc": cohort_pr, + "drop": random_auc - cohort_auc, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def validate(csv_path: str, out_json: str | None = None) -> int: + df = pd.read_csv(csv_path) + all_errors: list[str] = [] + all_warnings: list[str] = [] + report: dict = {"csv_path": csv_path} + + print("=" * 60) + print("BASIC CHECKS") + print("=" * 60) + errs = check_basic(df) + print(f" Shape: {df.shape[0]} rows x {df.shape[1]} cols") + if TARGET in df.columns: + conv_rate = df[TARGET].mean() + print(f" Conversion rate: {conv_rate:.1%}") + report["conversion_rate"] = float(conv_rate) + else: + print(" Conversion rate: N/A (target column missing)") + report["conversion_rate"] = None + print(f" Status: {'FAIL' if errs else 'PASS'}") + all_errors.extend(errs) + report["shape"] = list(df.shape) + + print("\nSCHEMA CHECKS") + print("=" * 60) + errs = check_schema(df) + print(f" Columns: {list(df.columns)}") + print(f" Status: {'FAIL' if errs else 'PASS'}") + all_errors.extend(errs) + report["columns"] = list(df.columns) + + print("\nMISSINGNESS") + print("=" * 60) + errs, miss_report = check_missingness(df) + for col, info in miss_report.items(): + print(f" {col}: {info['count']} ({info['rate']:.1%})") + print(f" Total missing: {df.isnull().sum().sum()}") + print(f" Status: {'FAIL' if errs else 'PASS'}") + all_errors.extend(errs) + report["missingness"] = miss_report + + print("\nDETERMINISM CHECKS") + print("=" * 60) + errs = check_determinism(df) + print(f" Status: {'FAIL' if errs else 'PASS'}") + if errs: + for e in errs: + print(f" * {e}") + all_errors.extend(errs) + + print("\nACV STATISTICS") + print("=" * 60) + errs, acv_stats = check_acv(df) + if acv_stats: + print( + f" min=${acv_stats['min']:,.0f} mean=${acv_stats['mean']:,.0f} " + f"median=${acv_stats['median']:,.0f} p95=${acv_stats['p95']:,.0f} " + f"max=${acv_stats['max']:,.0f}" + ) + print(f" At-max pile-up: {acv_stats['at_max_frac']:.1%}") + print(f" Status: {'FAIL' if errs else 'PASS'}") + all_errors.extend(errs) + report["acv_stats"] = acv_stats + + print("\nBASELINE MODEL (LR, seed=42, 70/30 stratified)") + print("=" * 60) + errs, baseline = check_baseline(df) + auc = baseline.get("auc", 0.0) + pr_auc = baseline.get("pr_auc", 0.0) + base_rate = baseline.get("base_rate", 0.0) + print(f" ROC-AUC: {auc:.4f} PR-AUC: {pr_auc:.4f} Base rate: {base_rate:.1%}") + for k in [25, 50, 100]: + pk = baseline.get(f"precision@{k}") + lk = baseline.get(f"lift@{k}") + ck = baseline.get(f"conversions@{k}") + rk = baseline.get(f"random_conversions@{k}") + if pk is not None: + print(f" P@{k}={pk:.3f} Lift@{k}={lk:.2f}x conversions={ck}/{k} random={rk:.1f}") + print(f" Status: {'FAIL' if errs else 'PASS'}") + all_errors.extend(errs) + report["baseline"] = baseline + + print("\nVALUE-AWARE RANKING") + print("=" * 60) + errs, ev_results = check_value_aware(df) + for r in ev_results: + k = r["k"] + print( + f" K={k}: prob=${r['captured_prob']:,.0f} (conv={r['conversions_prob']}) " + f"ev=${r['captured_ev']:,.0f} (conv={r['conversions_ev']}) " + f"ACV uplift={r['uplift_pct']:+.1f}%" + ) + all_errors.extend(errs) + report["value_aware"] = ev_results + + print("\nCOHORT SPLIT (train A+B, test C)") + print("=" * 60) + cohort = check_cohort(df) + if cohort: + print( + f" Random split: AUC={cohort['random_auc']:.4f} PR-AUC={cohort['random_pr_auc']:.4f}" + ) + print( + f" Cohort split: AUC={cohort['cohort_auc']:.4f} PR-AUC={cohort['cohort_pr_auc']:.4f}" + ) + print(f" AUC drop: {cohort['drop']:+.4f}") + report["cohort_split"] = cohort + else: + print(" Skipped (no acquisition_wave or insufficient cohort sizes)") + + report["errors"] = all_errors + report["warnings"] = all_warnings + + if out_json: + Path(out_json).parent.mkdir(parents=True, exist_ok=True) + with open(out_json, "w") as f: + json.dump(report, f, indent=2) + print(f"\nJSON report written to: {out_json}") + + print(f"\n{'=' * 60}") + if all_errors: + print(f"FAILED — {len(all_errors)} error(s):") + for err in all_errors: + print(f" * {err}") + return 1 + else: + print("ALL MANDATORY CHECKS PASSED") + return 0 + + +def main() -> None: + args = sys.argv[1:] + out_json = None + if "--out-json" in args: + idx = args.index("--out-json") + if idx + 1 < len(args): + out_json = args[idx + 1] + args = args[:idx] + args[idx + 2 :] + else: + print("--out-json requires a path", file=sys.stderr) + sys.exit(1) + + if len(args) != 1: + print(f"Usage: {sys.argv[0]} CSV_PATH [--out-json PATH]", file=sys.stderr) + sys.exit(1) + + sys.exit(validate(args[0], out_json=out_json)) + + +if __name__ == "__main__": + main()