diff --git a/.agent-plan.md b/.agent-plan.md index 7907891..c03e439 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,7 +6,7 @@ ## Current System State -**v0.5.0 in progress — Milestones 7–11 complete, v5 dataset shipped.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes merged (PR #21). v5 dataset generated and validated (all 10 checks pass). PR-agent refresh fallback wiring fixed for bot-authored reviews. 609 tests passing. +**v0.5.0 in progress — Milestones 7–11 complete, v5 dataset shipped + canonical validation module.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes merged (PR #21). v5 dataset regenerated with boosted leakage trap (snapshot day 10, Poisson(1) target-correlated boost) and validated via canonical sklearn pipeline (all checks pass). Canonical validation module added as single source of truth (`leadforge/validation/lead_scoring.py`). --- @@ -55,14 +55,23 @@ Build pipeline: No engine changes required — v5 is a build pipeline + validation improvement. -- [x] `scripts/build_v5_snapshot.py` — day-14 snapshot, ACV capping [18k–120k], `__leakage__` naming, `days_since_first_touch` momentum feature +- [x] `scripts/build_v5_snapshot.py` — day-10 snapshot, ACV capping [18k–120k], `__leakage__` naming, `days_since_first_touch` momentum feature, Poisson(1) trap boost - [x] `scripts/validate_v5_dataset.py` — 10 checks: hold-out AUC/PR-AUC, multi-seed leakage robustness, Precision@K, Lift@K, duplicate check, ACV range, missingness bounds - [x] `scripts/quick_baseline_eval_v5.py` — LR + RF baselines, value-aware scoring demo, feature importance -- [x] Generate `lead_scoring_intro_v5.csv` (1000 rows × 19 cols, 30% conversion, hold-out AUC 0.632) -- [x] Leakage trap robustly validated: mean delta 0.033, min delta 0.015 across 10 seeds +- [x] Generate `lead_scoring_intro_v5.csv` (1000 rows × 19 cols, 30% conversion, hold-out AUC 0.648) +- [x] Leakage trap robustly validated: mean delta 0.081, min delta 0.035 across 10 seeds - [x] `RELEASE_v5.md` with instructor/student notes, value-aware scoring section, full validation results - [x] Updated `BACKGROUND.md` with value-aware lead scoring section +### v5-validation: Canonical validation module (PR #26) + +- [x] `leadforge/validation/lead_scoring.py` — single source of truth validation with canonical sklearn pipeline (ColumnTransformer + OneHotEncoder + StandardScaler + LR) +- [x] `scripts/validate_lead_scoring_dataset.py` — CLI entrypoint for validation +- [x] `tests/validation/test_lead_scoring.py` — 12 tests +- [x] CI job added to `.github/workflows/ci.yml` for dataset validation +- [x] v5 dataset regenerated (snapshot day 10, trap boost) — all checks pass, exit code 0 +- [x] `RELEASE_v5.md` updated with canonical pipeline metrics + --- ## Deferred Items diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4d1f81e..5f89201 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,3 +57,28 @@ jobs: path: .coverage.${{ matrix.python-version }} include-hidden-files: true if-no-files-found: ignore + + validate-dataset: + name: Validate lead scoring dataset + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: pip install -e ".[dev,scripts]" + - name: Check for v5 dataset + id: check + run: | + if [ -f "lead_scoring_intro_v5.csv" ]; then + echo "found=true" >> "$GITHUB_OUTPUT" + echo "csv=lead_scoring_intro_v5.csv" >> "$GITHUB_OUTPUT" + else + echo "found=false" >> "$GITHUB_OUTPUT" + fi + - name: Run validator + if: steps.check.outputs.found == 'true' + run: python scripts/validate_lead_scoring_dataset.py --csv "${{ steps.check.outputs.csv }}" --enforce-1000 + - name: Skip (no dataset) + if: steps.check.outputs.found != 'true' + run: echo "No lead_scoring_intro_v5.csv found in repo root — skipping validation" diff --git a/leadforge/validation/lead_scoring.py b/leadforge/validation/lead_scoring.py new file mode 100644 index 0000000..dfadc36 --- /dev/null +++ b/leadforge/validation/lead_scoring.py @@ -0,0 +1,886 @@ +"""Single source of truth for lead scoring dataset validation and baseline evaluation. + +This module validates ``lead_scoring_intro_v*.csv`` datasets and computes +reproducible baseline metrics. All ML evaluation uses deterministic +hold-out splits with preprocessing fit on the training fold only. + +Usage (programmatic):: + + from leadforge.validation.lead_scoring import validate_dataset + report = validate_dataset("lead_scoring_intro_v5.csv") + print(report.summary()) + +Usage (CLI):: + + python scripts/validate_lead_scoring_dataset.py --csv lead_scoring_intro_v5.csv +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + +import numpy as np +import pandas as pd +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import average_precision_score, roc_auc_score +from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, StandardScaler + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +TARGET = "converted" + +EXPECTED_CAT_FEATURES = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", +] + +EXPECTED_NUMERIC_FEATURES = [ + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "days_since_first_touch", +] + +EXPECTED_BINARY_FEATURES = [ + "opportunity_created", + "demo_completed", +] + +LEAKAGE_PREFIX = "__leakage__" + +BANNED_COLUMNS = { + "current_stage", + "funnel_stage", + "conversion_timestamp", + "is_sql", + "is_mql", + "lead_created_at", +} + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class ValidationConfig: + """Tunable thresholds for dataset validation.""" + + expected_rows: int = 1000 + enforce_row_count: bool = False + max_duplicate_rate: float = 0.01 + max_col_missing_rate: float = 0.10 + min_group_size: int = 50 + min_group_rate: float = 0.02 + max_group_rate: float = 0.98 + auc_lower: float = 0.62 + auc_upper: float = 0.90 + trap_mean_delta: float = 0.03 + trap_min_delta: float = 0.015 + trap_n_seeds: int = 10 + trap_seed_start: int = 42 + ks: tuple[int, ...] = (25, 50) + test_size: float = 0.30 + default_seed: int = 42 + + +# --------------------------------------------------------------------------- +# Report dataclasses +# --------------------------------------------------------------------------- + + +@dataclass +class CheckResult: + """One validation check.""" + + name: str + passed: bool + details: str = "" + data: dict | None = None + + +@dataclass +class BaselineMetrics: + """Metrics from a single baseline evaluation.""" + + seed: int + auc: float + pr_auc: float + precision_at_k: dict[int, float] = field(default_factory=dict) + recall_at_k: dict[int, float] = field(default_factory=dict) + lift_at_k: dict[int, float] = field(default_factory=dict) + base_rate: float = 0.0 + + +@dataclass +class ValueMetrics: + """Value-aware ranking metrics.""" + + k: int + captured_acv_by_prob: float + captured_acv_by_ev: float + uplift_pct: float + + +@dataclass +class TrapMetrics: + """Leakage trap evaluation across seeds.""" + + column: str + deltas_auc: list[float] + deltas_pr_auc: list[float] + seeds: list[int] + + @property + def mean_delta_auc(self) -> float: + return float(np.mean(self.deltas_auc)) + + @property + def min_delta_auc(self) -> float: + return float(np.min(self.deltas_auc)) + + @property + def max_delta_auc(self) -> float: + return float(np.max(self.deltas_auc)) + + +@dataclass +class ValidationReport: + """Full validation report.""" + + csv_path: str + checks: list[CheckResult] = field(default_factory=list) + baseline: BaselineMetrics | None = None + value_metrics: list[ValueMetrics] = field(default_factory=list) + trap_metrics: list[TrapMetrics] = field(default_factory=list) + missingness: dict[str, float] = field(default_factory=dict) + n_rows: int = 0 + test_size: float = 0.30 + + @property + def passed(self) -> bool: + return all(c.passed for c in self.checks) + + @property + def n_errors(self) -> int: + return sum(1 for c in self.checks if not c.passed) + + def summary(self) -> str: + """Human-readable summary.""" + lines = [] + for c in self.checks: + status = "PASS" if c.passed else "FAIL" + line = f" {status} {c.name}" + if c.details: + line += f" ({c.details})" + lines.append(line) + + if self.baseline: + b = self.baseline + lines.append(f"\nBaseline (seed={b.seed}, hold-out):") + lines.append(f" AUC: {b.auc:.3f}") + lines.append(f" PR-AUC: {b.pr_auc:.3f}") + lines.append(f" Base rate: {b.base_rate:.1%}") + for k in sorted(b.precision_at_k): + lines.append( + f" P@{k}={b.precision_at_k[k]:.3f} " + f"R@{k}={b.recall_at_k[k]:.3f} " + f"Lift@{k}={b.lift_at_k[k]:.2f}x" + ) + + for vm in self.value_metrics: + lines.append( + f"\nValue@{vm.k}: " + f"by_prob=${vm.captured_acv_by_prob:,.0f} " + f"by_ev=${vm.captured_acv_by_ev:,.0f} " + f"uplift={vm.uplift_pct:+.1f}%" + ) + + for tm in self.trap_metrics: + lines.append( + f"\nTrap '{tm.column}' ({len(tm.seeds)} seeds): " + f"mean_delta={tm.mean_delta_auc:.4f} " + f"min_delta={tm.min_delta_auc:.4f} " + f"max_delta={tm.max_delta_auc:.4f}" + ) + neg = [(s, d) for s, d in zip(tm.seeds, tm.deltas_auc, strict=True) if d < 0] + if neg: + for s, d in neg: + lines.append(f" ⚠ seed {s}: delta={d:.4f} (negative)") + + if self.missingness: + lines.append("\nMissingness:") + for col, rate in sorted(self.missingness.items(), key=lambda x: -x[1]): + if rate > 0: + lines.append(f" {col}: {rate:.1%}") + + sep = "=" * 60 + if self.passed: + lines.append(f"\n{sep}\nALL CHECKS PASSED\n{sep}") + else: + lines.append(f"\n{sep}\nFAILED — {self.n_errors} check(s)\n{sep}") + + return "\n".join(lines) + + def to_dict(self) -> dict: + """Serialisable dict for JSON output.""" + d: dict = { + "csv_path": self.csv_path, + "passed": self.passed, + "n_errors": self.n_errors, + "checks": [ + {"name": c.name, "passed": c.passed, "details": c.details} for c in self.checks + ], + } + if self.baseline: + b = self.baseline + d["baseline"] = { + "seed": b.seed, + "auc": round(b.auc, 4), + "pr_auc": round(b.pr_auc, 4), + "base_rate": round(b.base_rate, 4), + "precision_at_k": {str(k): round(v, 4) for k, v in b.precision_at_k.items()}, + "recall_at_k": {str(k): round(v, 4) for k, v in b.recall_at_k.items()}, + "lift_at_k": {str(k): round(v, 2) for k, v in b.lift_at_k.items()}, + } + if self.value_metrics: + d["value_metrics"] = [ + { + "k": vm.k, + "captured_acv_by_prob": round(vm.captured_acv_by_prob, 0), + "captured_acv_by_ev": round(vm.captured_acv_by_ev, 0), + "uplift_pct": round(vm.uplift_pct, 1), + } + for vm in self.value_metrics + ] + if self.trap_metrics: + d["trap_metrics"] = [ + { + "column": tm.column, + "mean_delta_auc": round(tm.mean_delta_auc, 4), + "min_delta_auc": round(tm.min_delta_auc, 4), + "max_delta_auc": round(tm.max_delta_auc, 4), + "seeds": tm.seeds, + "deltas_auc": [round(d, 4) for d in tm.deltas_auc], + "mean_delta_pr_auc": round(float(np.mean(tm.deltas_pr_auc)), 4), + "deltas_pr_auc": [round(d, 4) for d in tm.deltas_pr_auc], + } + for tm in self.trap_metrics + ] + d["missingness"] = { + col: round(rate, 4) for col, rate in self.missingness.items() if rate > 0 + } + return d + + def emit_release_snippet(self) -> str: + """Markdown snippet for pasting into RELEASE docs.""" + lines = [""] + + if self.baseline: + b = self.baseline + lines.append("") + lines.append("### Baseline performance") + lines.append("") + train_pct = int(round((1 - self.test_size) * 100)) + test_pct = int(round(self.test_size * 100)) + lines.append( + f"Evaluated on a {train_pct}/{test_pct} stratified hold-out split (seed {b.seed})." + ) + lines.append("") + lines.append("| Metric | Value |") + lines.append("|---|---|") + lines.append(f"| ROC-AUC | {b.auc:.3f} |") + lines.append(f"| PR-AUC (Average Precision) | {b.pr_auc:.3f} |") + lines.append(f"| Base rate | {b.base_rate:.1%} |") + for k in sorted(b.precision_at_k): + lines.append( + f"| Precision@{k} | {b.precision_at_k[k]:.3f} (Lift: {b.lift_at_k[k]:.2f}x) |" + ) + lines.append(f"| Recall@{k} | {b.recall_at_k[k]:.3f} |") + + if self.value_metrics: + lines.append("") + lines.append("### Value-aware ranking") + lines.append("") + lines.append("| K | By P(convert) | By expected value | Uplift |") + lines.append("|---|---|---|---|") + for vm in self.value_metrics: + lines.append( + f"| {vm.k} | ${vm.captured_acv_by_prob:,.0f} " + f"| ${vm.captured_acv_by_ev:,.0f} " + f"| {vm.uplift_pct:+.1f}% |" + ) + + if self.trap_metrics: + lines.append("") + lines.append("### Leakage trap evaluation") + lines.append("") + for tm in self.trap_metrics: + lines.append("| Metric | Value |") + lines.append("|---|---|") + lines.append(f"| Column | `{tm.column}` |") + lines.append(f"| Seeds | {len(tm.seeds)} ({tm.seeds[0]}–{tm.seeds[-1]}) |") + lines.append(f"| Mean AUC delta | {tm.mean_delta_auc:.4f} |") + lines.append(f"| Min AUC delta | {tm.min_delta_auc:.4f} |") + lines.append(f"| Max AUC delta | {tm.max_delta_auc:.4f} |") + + if self.missingness: + lines.append("") + lines.append("### Missingness") + lines.append("") + lines.append("| Column | Missing | Rate |") + lines.append("|---|---|---|") + total = 0 + for col, rate in sorted(self.missingness.items(), key=lambda x: -x[1]): + if rate > 0: + n = int(round(rate * self.n_rows)) + total += n + lines.append(f"| `{col}` | {n} | {rate:.1%} |") + lines.append(f"| **Total** | **{total}** | |") + + lines.append("") + lines.append("") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# ML pipeline (single source of truth) +# --------------------------------------------------------------------------- + + +def _build_pipeline( + cat_cols: list[str], + num_cols: list[str], +) -> Pipeline: + """Build the canonical sklearn baseline pipeline. + + - Numeric: median imputation + standard scaling + - Categorical: most-frequent imputation + one-hot encoding + - Model: L2-regularised logistic regression (lbfgs solver) + """ + numeric_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="median")), + ("scaler", StandardScaler()), + ] + ) + categorical_transformer = Pipeline( + [ + ("imputer", SimpleImputer(strategy="most_frequent")), + ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), + ] + ) + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, num_cols), + ("cat", categorical_transformer, cat_cols), + ], + remainder="drop", + ) + return Pipeline( + [ + ("preprocessor", preprocessor), + ("classifier", LogisticRegression(max_iter=1000, solver="lbfgs", random_state=42)), + ] + ) + + +def _get_feature_cols( + df: pd.DataFrame, + exclude: set[str] | None = None, +) -> tuple[list[str], list[str]]: + """Partition feature columns into (cat_cols, num_cols).""" + exclude = (exclude or set()) | {TARGET} + cat_cols = [] + num_cols = [] + for col in df.columns: + if col in exclude: + continue + if pd.api.types.is_numeric_dtype(df[col]): + num_cols.append(col) + else: + cat_cols.append(col) + return cat_cols, num_cols + + +def _evaluate_split( + df: pd.DataFrame, + exclude_cols: set[str] | None = None, + seed: int = 42, + test_size: float = 0.30, + ks: tuple[int, ...] = (25, 50), +) -> BaselineMetrics: + """Train on hold-out split, return metrics on test set.""" + y = df[TARGET].astype(int) + cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) + x = df[cat_cols + num_cols] + + x_train, x_test, y_train, y_test = train_test_split( + x, + y, + test_size=test_size, + random_state=seed, + stratify=y, + ) + + pipe = _build_pipeline(cat_cols, num_cols) + pipe.fit(x_train, y_train) + probs = pipe.predict_proba(x_test)[:, 1] + + auc = float(roc_auc_score(y_test, probs)) + pr_auc = float(average_precision_score(y_test, probs)) + base_rate = float(y_test.mean()) + + # Precision@K, Recall@K, Lift@K (stable sort by -prob; ties preserve array order) + order = np.argsort(-probs, kind="stable") + y_sorted = y_test.values[order] + + n_pos = int(y_test.sum()) + precision_at_k: dict[int, float] = {} + recall_at_k: dict[int, float] = {} + lift_at_k: dict[int, float] = {} + for k in ks: + if k > len(y_test): + continue + top_k = y_sorted[:k] + prec = float(top_k.mean()) + rec = float(top_k.sum() / n_pos) if n_pos > 0 else 0.0 + precision_at_k[k] = prec + recall_at_k[k] = rec + lift_at_k[k] = prec / base_rate if base_rate > 0 else 0.0 + + return BaselineMetrics( + seed=seed, + auc=auc, + pr_auc=pr_auc, + precision_at_k=precision_at_k, + recall_at_k=recall_at_k, + lift_at_k=lift_at_k, + base_rate=base_rate, + ) + + +def _evaluate_value_aware( + df: pd.DataFrame, + exclude_cols: set[str] | None = None, + seed: int = 42, + test_size: float = 0.30, + ks: tuple[int, ...] = (25, 50), +) -> list[ValueMetrics]: + """Compute value-aware ranking metrics on hold-out.""" + if "expected_acv" not in df.columns: + return [] + + y = df[TARGET].astype(int) + cat_cols, num_cols = _get_feature_cols(df, exclude=exclude_cols) + x = df[cat_cols + num_cols] + + x_train, x_test, y_train, y_test = train_test_split( + x, + y, + test_size=test_size, + random_state=seed, + stratify=y, + ) + + pipe = _build_pipeline(cat_cols, num_cols) + pipe.fit(x_train, y_train) + probs = pipe.predict_proba(x_test)[:, 1] + + test_acv = pd.to_numeric(df.loc[x_test.index, "expected_acv"], errors="coerce").fillna(0).values + test_converted = y_test.values + expected_value = probs * test_acv + + results = [] + for k in ks: + if k > len(y_test): + continue + + # Rank by probability + order_prob = np.argsort(-probs, kind="stable") + top_k_prob = order_prob[:k] + captured_prob = float(np.sum(test_acv[top_k_prob] * test_converted[top_k_prob])) + + # Rank by expected value + order_ev = np.argsort(-expected_value, kind="stable") + top_k_ev = order_ev[:k] + captured_ev = float(np.sum(test_acv[top_k_ev] * test_converted[top_k_ev])) + + uplift = ((captured_ev - captured_prob) / captured_prob * 100) if captured_prob > 0 else 0.0 + + results.append( + ValueMetrics( + k=k, + captured_acv_by_prob=captured_prob, + captured_acv_by_ev=captured_ev, + uplift_pct=uplift, + ) + ) + + return results + + +# --------------------------------------------------------------------------- +# Checks +# --------------------------------------------------------------------------- + + +def _check_schema(df: pd.DataFrame, cfg: ValidationConfig) -> list[CheckResult]: + """Schema and basic structural checks.""" + results = [] + + # Target column + if TARGET not in df.columns: + results.append(CheckResult("target_exists", False, f"'{TARGET}' column missing")) + return results + results.append(CheckResult("target_exists", True)) + target_vals = set(df[TARGET].dropna().unique()) + if not target_vals <= {0, 1}: + results.append(CheckResult("target_binary", False, f"target values: {target_vals}")) + else: + results.append(CheckResult("target_binary", True)) + if df[TARGET].isna().any(): + results.append(CheckResult("target_no_missing", False, "target has missing values")) + else: + results.append(CheckResult("target_no_missing", True)) + # Both classes must be present for stratified splitting + if target_vals == {0, 1}: + results.append(CheckResult("target_both_classes", True)) + else: + results.append( + CheckResult("target_both_classes", False, f"need both {{0, 1}}, got {target_vals}") + ) + + # Banned columns + present = BANNED_COLUMNS & set(df.columns) + if present: + results.append(CheckResult("no_banned_columns", False, f"banned: {sorted(present)}")) + else: + results.append(CheckResult("no_banned_columns", True)) + + # ID columns + id_cols = [c for c in df.columns if c.endswith("_id")] + if id_cols: + results.append(CheckResult("no_id_columns", False, f"ID cols: {sorted(id_cols)}")) + else: + results.append(CheckResult("no_id_columns", True)) + + # Row count + n = len(df) + if cfg.enforce_row_count and n != cfg.expected_rows: + results.append(CheckResult("row_count", False, f"{n} rows (expected {cfg.expected_rows})")) + elif n != cfg.expected_rows: + results.append(CheckResult("row_count", True, f"{n} rows (expected {cfg.expected_rows})")) + else: + results.append(CheckResult("row_count", True, f"{n} rows")) + + # Duplicates + n_dupes = df.duplicated().sum() + dupe_rate = n_dupes / len(df) if len(df) > 0 else 0 + if dupe_rate > cfg.max_duplicate_rate: + results.append(CheckResult("duplicates", False, f"{n_dupes} duplicates ({dupe_rate:.1%})")) + else: + results.append(CheckResult("duplicates", True, f"{n_dupes} duplicates")) + + # Expected features (warn, don't fail) + missing_cat = [c for c in EXPECTED_CAT_FEATURES if c not in df.columns] + missing_num = [c for c in EXPECTED_NUMERIC_FEATURES if c not in df.columns] + missing_bin = [c for c in EXPECTED_BINARY_FEATURES if c not in df.columns] + if missing_cat or missing_num or missing_bin: + all_missing = missing_cat + missing_num + missing_bin + results.append(CheckResult("expected_features", True, f"missing: {all_missing} (warning)")) + else: + results.append(CheckResult("expected_features", True)) + + # Leakage column naming + leakage_cols = [c for c in df.columns if c.startswith(LEAKAGE_PREFIX)] + if "total_touches_all" in df.columns: + results.append(CheckResult("leakage_naming", False, "old name 'total_touches_all' found")) + elif len(leakage_cols) == 0: + results.append(CheckResult("leakage_naming", True, "no leakage columns")) + elif len(leakage_cols) == 1: + results.append(CheckResult("leakage_naming", True, f"trap: {leakage_cols[0]}")) + else: + results.append( + CheckResult("leakage_naming", True, f"multiple traps: {leakage_cols} (warning)") + ) + + return results + + +def _check_missingness( + df: pd.DataFrame, + cfg: ValidationConfig, +) -> tuple[list[CheckResult], dict[str, float]]: + """Per-column missingness checks.""" + results = [] + miss_map: dict[str, float] = {} + + for col in df.columns: + if col == TARGET: + continue + rate = float(df[col].isna().mean()) + if rate > 0: + miss_map[col] = rate + + violations = {col: rate for col, rate in miss_map.items() if rate > cfg.max_col_missing_rate} + if violations: + detail = ", ".join(f"{c}={r:.1%}" for c, r in violations.items()) + results.append( + CheckResult("missingness_bounds", False, f">{cfg.max_col_missing_rate:.0%}: {detail}") + ) + else: + results.append(CheckResult("missingness_bounds", True)) + + return results, miss_map + + +def _check_group_determinism( + df: pd.DataFrame, + cfg: ValidationConfig, +) -> list[CheckResult]: + """No categorical/binary group should be near-deterministic.""" + # Gather all categorical + binary columns present in the data + check_cols = [c for c in EXPECTED_CAT_FEATURES + EXPECTED_BINARY_FEATURES if c in df.columns] + # Also include any non-numeric or binary columns not in the expected list + for col in df.columns: + if col == TARGET or col in check_cols: + continue + if not pd.api.types.is_numeric_dtype(df[col]) or set(df[col].dropna().unique()) <= {0, 1}: + check_cols.append(col) + + violations = [] + for col in check_cols: + stats = df.groupby(col)[TARGET].agg(["mean", "count"]) + large = stats[stats["count"] >= cfg.min_group_size] + for val, row in large.iterrows(): + if row["mean"] < cfg.min_group_rate: + violations.append(f"{col}={val}: {row['mean']:.1%} (n={int(row['count'])})") + if row["mean"] > cfg.max_group_rate: + violations.append(f"{col}={val}: {row['mean']:.1%} (n={int(row['count'])})") + + if violations: + return [ + CheckResult( + "group_determinism", + False, + f"{len(violations)} violation(s): " + "; ".join(violations), + ) + ] + return [CheckResult("group_determinism", True)] + + +def _check_baseline_auc( + metrics: BaselineMetrics, + cfg: ValidationConfig, +) -> list[CheckResult]: + """Baseline AUC within expected range.""" + results = [] + auc = metrics.auc + if auc < cfg.auc_lower: + results.append(CheckResult("baseline_auc", False, f"AUC={auc:.3f} < {cfg.auc_lower}")) + elif auc > cfg.auc_upper: + results.append(CheckResult("baseline_auc", False, f"AUC={auc:.3f} > {cfg.auc_upper}")) + else: + results.append( + CheckResult("baseline_auc", True, f"AUC={auc:.3f}, PR-AUC={metrics.pr_auc:.3f}") + ) + return results + + +def _check_conversion_rate(df: pd.DataFrame) -> list[CheckResult]: + """Conversion rate in realistic B2B range [15%, 40%].""" + rate = df[TARGET].mean() + if rate < 0.15 or rate > 0.40: + return [CheckResult("conversion_rate", False, f"{rate:.1%} outside [15%, 40%]")] + return [CheckResult("conversion_rate", True, f"{rate:.1%}")] + + +def _check_acv_range(df: pd.DataFrame) -> list[CheckResult]: + """expected_acv within narrative range.""" + if "expected_acv" not in df.columns: + return [CheckResult("acv_range", True, "column not present (skip)")] + acv = pd.to_numeric(df["expected_acv"], errors="coerce").dropna() + if acv.empty: + return [CheckResult("acv_range", False, "no usable values")] + errors = [] + if acv.min() < 18_000 - 1: + errors.append(f"min={acv.min():.0f} < 18,000") + if acv.max() > 120_000 + 1: + errors.append(f"max={acv.max():.0f} > 120,000") + if errors: + return [CheckResult("acv_range", False, "; ".join(errors))] + return [CheckResult("acv_range", True, f"[{acv.min():.0f}, {acv.max():.0f}]")] + + +def _evaluate_trap( + df: pd.DataFrame, + cfg: ValidationConfig, +) -> tuple[list[CheckResult], list[TrapMetrics]]: + """Leakage trap evaluation across multiple seeds.""" + leakage_cols = [c for c in df.columns if c.startswith(LEAKAGE_PREFIX)] + if not leakage_cols: + return [CheckResult("leakage_trap", True, "no trap columns (skip)")], [] + + all_trap_metrics = [] + all_checks = [] + all_leakage = set(leakage_cols) + + for trap_col in leakage_cols: + seeds = list(range(cfg.trap_seed_start, cfg.trap_seed_start + cfg.trap_n_seeds)) + deltas_auc = [] + deltas_pr_auc = [] + + for seed in seeds: + m_without = _evaluate_split( + df, + exclude_cols=all_leakage, + seed=seed, + test_size=cfg.test_size, + ks=(), + ) + m_with = _evaluate_split( + df, + exclude_cols=all_leakage - {trap_col}, + seed=seed, + test_size=cfg.test_size, + ks=(), + ) + deltas_auc.append(m_with.auc - m_without.auc) + deltas_pr_auc.append(m_with.pr_auc - m_without.pr_auc) + + tm = TrapMetrics( + column=trap_col, + deltas_auc=deltas_auc, + deltas_pr_auc=deltas_pr_auc, + seeds=seeds, + ) + all_trap_metrics.append(tm) + + # Check thresholds + errors = [] + if tm.mean_delta_auc < cfg.trap_mean_delta: + errors.append(f"mean delta {tm.mean_delta_auc:.4f} < {cfg.trap_mean_delta}") + if tm.min_delta_auc < cfg.trap_min_delta: + bad_seeds = [ + f"seed {s}: {d:.4f}" + for s, d in zip(seeds, deltas_auc, strict=True) + if d < cfg.trap_min_delta + ] + errors.append( + f"min delta {tm.min_delta_auc:.4f} < {cfg.trap_min_delta} [{', '.join(bad_seeds)}]" + ) + + if errors: + all_checks.append( + CheckResult( + f"leakage_trap:{trap_col}", + False, + "; ".join(errors), + ) + ) + else: + all_checks.append( + CheckResult( + f"leakage_trap:{trap_col}", + True, + f"mean={tm.mean_delta_auc:.4f} min={tm.min_delta_auc:.4f}", + ) + ) + + return all_checks, all_trap_metrics + + +# --------------------------------------------------------------------------- +# Main entrypoint +# --------------------------------------------------------------------------- + + +def validate_dataset( + csv_path: str | Path, + cfg: ValidationConfig | None = None, +) -> ValidationReport: + """Run full validation suite on a lead scoring CSV. + + Args: + csv_path: Path to the CSV file. + cfg: Validation thresholds. Uses defaults if ``None``. + + Returns: + A :class:`ValidationReport` with all check results and metrics. + """ + cfg = cfg or ValidationConfig() + df = pd.read_csv(csv_path) + report = ValidationReport(csv_path=str(csv_path), n_rows=len(df), test_size=cfg.test_size) + + # Schema checks + schema_checks = _check_schema(df, cfg) + report.checks.extend(schema_checks) + if TARGET not in df.columns: + return report + # Short-circuit if target is unusable (non-binary, has NaNs, or single class) + if any( + not c.passed + for c in schema_checks + if c.name in ("target_binary", "target_no_missing", "target_both_classes") + ): + return report + + # Conversion rate + report.checks.extend(_check_conversion_rate(df)) + + # Missingness + miss_checks, miss_map = _check_missingness(df, cfg) + report.checks.extend(miss_checks) + report.missingness = miss_map + + # Group determinism + report.checks.extend(_check_group_determinism(df, cfg)) + + # ACV range + report.checks.extend(_check_acv_range(df)) + + # Baseline evaluation + leakage_cols = {c for c in df.columns if c.startswith(LEAKAGE_PREFIX)} + baseline = _evaluate_split( + df, + exclude_cols=leakage_cols, + seed=cfg.default_seed, + test_size=cfg.test_size, + ks=cfg.ks, + ) + report.baseline = baseline + report.checks.extend(_check_baseline_auc(baseline, cfg)) + + # Value-aware metrics + report.value_metrics = _evaluate_value_aware( + df, + exclude_cols=leakage_cols, + seed=cfg.default_seed, + test_size=cfg.test_size, + ks=cfg.ks, + ) + + # Leakage trap + trap_checks, trap_metrics = _evaluate_trap(df, cfg) + report.checks.extend(trap_checks) + report.trap_metrics = trap_metrics + + return report diff --git a/pyproject.toml b/pyproject.toml index 97f3cd0..ea26ae1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dev = [ "mypy>=1.10", "pre-commit>=3.7", "types-pyyaml>=6.0", + "scikit-learn>=1.3", ] scripts = [ "scikit-learn>=1.3", @@ -80,5 +81,9 @@ ignore_missing_imports = true module = ["networkx", "networkx.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] +module = ["sklearn", "sklearn.*"] +ignore_missing_imports = true + [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/scripts/build_v5_snapshot.py b/scripts/build_v5_snapshot.py index fd1ad62..5bf8a37 100644 --- a/scripts/build_v5_snapshot.py +++ b/scripts/build_v5_snapshot.py @@ -5,7 +5,7 @@ python scripts/build_v5_snapshot.py OUTPUT_CSV Produces a 1000-row × 19-column CSV at ~30% conversion rate with: -- Day-14 windowed features +- Day-10 windowed features - Structured missingness (MAR for web_sessions, seniority; MCAR on days_since_last_touch) - Leakage trap (__leakage__total_touches_90d using full 90-day data) - Expected ACV capped to narrative range [18k, 120k] @@ -29,7 +29,7 @@ # --------------------------------------------------------------------------- SEED = 42 N_LEADS = 5000 -SNAPSHOT_DAY = 14 +SNAPSHOT_DAY = 10 SUBSAMPLE_N = 1000 TARGET_RATE = 0.30 @@ -187,8 +187,27 @@ def inject_missingness(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataF return df +def boost_leakage_trap(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: + """Amplify the leakage trap signal to ensure robust detectability. + + Adds target-correlated noise to ``__leakage__total_touches_90d`` so + that converted leads accumulate extra post-snapshot touches. This + simulates a realistic scenario where the feature aggregates engagement + activity that occurs *after* the conversion decision is made. + """ + df = df.copy() + trap_col = "__leakage__total_touches_90d" + n = len(df) + converted = df["converted"].values + # Converted leads: add a Poisson(1)-distributed number of extra + # "post-conversion" touches (typically small, but unbounded) + boost = converted * rng.poisson(1, size=n) + df[trap_col] = df[trap_col] + boost + return df + + def build_v5_dataset(seed: int = SEED) -> pd.DataFrame: - """Full pipeline: generate → snapshot → derive → cap ACV → rename → subsample → missingness.""" + """Full pipeline: generate → derive → cap ACV → rename → subsample → boost → missingness.""" rng = np.random.RandomState(seed) print("Generating bundle...", file=sys.stderr) @@ -207,6 +226,9 @@ def build_v5_dataset(seed: int = SEED) -> pd.DataFrame: df = subsample(df, rng) print(f" Subsampled: {len(df)} rows, conversion={df['converted'].mean():.1%}", file=sys.stderr) + print("Boosting leakage trap...", file=sys.stderr) + df = boost_leakage_trap(df, rng) + print("Injecting missingness...", file=sys.stderr) df = inject_missingness(df, rng) diff --git a/scripts/validate_lead_scoring_dataset.py b/scripts/validate_lead_scoring_dataset.py new file mode 100644 index 0000000..6b7d4cf --- /dev/null +++ b/scripts/validate_lead_scoring_dataset.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +"""CLI entrypoint for lead scoring dataset validation. + +Usage: + python scripts/validate_lead_scoring_dataset.py --csv lead_scoring_intro_v5.csv + python scripts/validate_lead_scoring_dataset.py --csv data.csv --out-json report.json + python scripts/validate_lead_scoring_dataset.py --csv data.csv --emit-release-snippet + +Exit code 0 = all checks pass. +Exit code 1 = at least one check failed. +""" + +from __future__ import annotations + +import argparse +import json +import sys + +from leadforge.validation.lead_scoring import ValidationConfig, validate_dataset + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Validate a lead scoring intro CSV dataset.", + ) + parser.add_argument( + "--csv", + required=True, + help="Path to the CSV file to validate.", + ) + parser.add_argument( + "--out-json", + default=None, + help="Write JSON report to this path.", + ) + parser.add_argument( + "--emit-release-snippet", + action="store_true", + help="Print a markdown snippet suitable for RELEASE docs.", + ) + parser.add_argument( + "--enforce-1000", + action="store_true", + help="Fail (instead of warn) if row count != 1000.", + ) + parser.add_argument( + "--release", + default=None, + help="Path to RELEASE markdown file (currently unused, reserved).", + ) + args = parser.parse_args() + + cfg = ValidationConfig(enforce_row_count=args.enforce_1000) + report = validate_dataset(args.csv, cfg) + + print(report.summary()) + + if args.emit_release_snippet: + print("\n--- RELEASE SNIPPET ---\n") + print(report.emit_release_snippet()) + + if args.out_json: + with open(args.out_json, "w") as f: + json.dump(report.to_dict(), f, indent=2) + print(f"\nJSON report written to {args.out_json}", file=sys.stderr) + + sys.exit(0 if report.passed else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/validation/test_lead_scoring.py b/tests/validation/test_lead_scoring.py new file mode 100644 index 0000000..131dda1 --- /dev/null +++ b/tests/validation/test_lead_scoring.py @@ -0,0 +1,601 @@ +"""Tests for leadforge.validation.lead_scoring.""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from leadforge.validation.lead_scoring import ( + BaselineMetrics, + CheckResult, + TrapMetrics, + ValidationConfig, + ValidationReport, + _check_acv_range, + _check_baseline_auc, + _check_conversion_rate, + _check_group_determinism, + _check_missingness, + _check_schema, + validate_dataset, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_dataset( + n: int = 200, + conversion_rate: float = 0.30, + include_leakage: bool = True, + deterministic_col: bool = False, + seed: int = 99, +) -> pd.DataFrame: + """Build a small synthetic dataset that passes basic checks.""" + rng = np.random.RandomState(seed) + n_pos = int(n * conversion_rate) + n_neg = n - n_pos + + converted = np.array([1] * n_pos + [0] * n_neg) + rng.shuffle(converted) + + industries = rng.choice(["manufacturing", "logistics", "services", "healthcare"], size=n) + regions = rng.choice(["US", "UK"], size=n) + sizes = rng.choice(["200-499", "500-999", "1000-1999", "2000+"], size=n) + revenues = rng.choice(["$1M-$10M", "$10M-$50M", "$50M-$200M", "$200M+"], size=n) + roles = rng.choice(["finance", "ap_manager", "it_director", "procurement"], size=n) + seniority = rng.choice( + ["individual_contributor", "manager", "director", "vp", "c_suite"], size=n + ) + sources = rng.choice(["inbound_marketing", "sdr_outbound", "partner_referral"], size=n) + + df = pd.DataFrame( + { + "industry": industries, + "region": regions, + "company_size": sizes, + "company_revenue": revenues, + "contact_role": roles, + "seniority": seniority, + "lead_source": sources, + "opportunity_created": rng.randint(0, 2, size=n), + "demo_completed": rng.randint(0, 2, size=n), + "expected_acv": rng.uniform(18_000, 120_000, size=n).round(0), + "inbound_touches": rng.poisson(3, size=n), + "outbound_touches": rng.poisson(2, size=n), + "touches_week_1": rng.poisson(2, size=n), + "days_since_first_touch": rng.uniform(0, 14, size=n).round(1), + "web_sessions": rng.poisson(4, size=n).astype(float), + "sales_activities": rng.poisson(3, size=n), + "days_since_last_touch": rng.uniform(0, 14, size=n).round(1), + "converted": converted, + } + ) + + # Inject some missingness + miss_idx = rng.choice(n, size=int(n * 0.05), replace=False) + df.loc[miss_idx, "web_sessions"] = np.nan + + if include_leakage: + # Leakage: positively correlated with target + noise = rng.poisson(3, size=n) + df["__leakage__total_touches_90d"] = converted * rng.poisson(8, size=n) + noise + + if deterministic_col: + # Make a column that perfectly predicts conversion for a large group + df["bad_feature"] = "normal" + # First 60 rows all converted = 1 + df.loc[:59, "bad_feature"] = "leaked" + df.loc[:59, "converted"] = 1 + + return df + + +def _save(df: pd.DataFrame, tmp_path, name: str = "data.csv"): + path = tmp_path / name + df.to_csv(path, index=False) + return path + + +@pytest.fixture +def good_csv(tmp_path): + """Write a well-formed synthetic dataset.""" + return _save(_make_dataset(n=200, include_leakage=True), tmp_path, "good.csv") + + +@pytest.fixture +def bad_deterministic_csv(tmp_path): + """Write a dataset with a deterministic group.""" + return _save(_make_dataset(n=200, deterministic_col=True), tmp_path, "bad.csv") + + +@pytest.fixture +def no_target_csv(tmp_path): + """Write a dataset missing the target column.""" + df = _make_dataset(n=200).drop(columns=["converted"]) + return _save(df, tmp_path, "no_target.csv") + + +# --------------------------------------------------------------------------- +# Tests — schema checks +# --------------------------------------------------------------------------- + + +class TestSchemaChecks: + def test_good_dataset_passes_schema(self, good_csv): + cfg = ValidationConfig(enforce_row_count=False) + report = validate_dataset(good_csv, cfg) + schema_checks = [ + c + for c in report.checks + if c.name + in ( + "target_exists", + "target_binary", + "target_no_missing", + "target_both_classes", + "no_banned_columns", + "no_id_columns", + "duplicates", + ) + ] + assert all(c.passed for c in schema_checks) + + def test_target_exists_check_present_when_passing(self, good_csv): + report = validate_dataset(good_csv) + target_check = next(c for c in report.checks if c.name == "target_exists") + assert target_check.passed + + def test_missing_target_fails(self, no_target_csv): + report = validate_dataset(no_target_csv) + target_check = next(c for c in report.checks if c.name == "target_exists") + assert not target_check.passed + + def test_nan_target_short_circuits(self, tmp_path): + df = _make_dataset(n=200) + df.loc[0, "converted"] = np.nan + path = _save(df, tmp_path, "nan_target.csv") + report = validate_dataset(path) + # target_no_missing should fail + no_miss = next(c for c in report.checks if c.name == "target_no_missing") + assert not no_miss.passed + # baseline should NOT be computed (short-circuit) + assert report.baseline is None + + def test_nonbinary_target_short_circuits(self, tmp_path): + df = _make_dataset(n=200) + df.loc[0, "converted"] = 2 + path = _save(df, tmp_path, "nonbinary.csv") + report = validate_dataset(path) + binary_check = next(c for c in report.checks if c.name == "target_binary") + assert not binary_check.passed + assert report.baseline is None + + def test_single_class_target_short_circuits(self, tmp_path): + df = _make_dataset(n=200) + df["converted"] = 0 # all negatives + path = _save(df, tmp_path, "single_class.csv") + report = validate_dataset(path) + both = next(c for c in report.checks if c.name == "target_both_classes") + assert not both.passed + assert report.baseline is None + + def test_target_both_classes_passes(self, good_csv): + report = validate_dataset(good_csv) + both = next(c for c in report.checks if c.name == "target_both_classes") + assert both.passed + + def test_banned_columns_detected(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False) + df["current_stage"] = "active" + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + banned = next(c for c in checks if c.name == "no_banned_columns") + assert not banned.passed + assert "current_stage" in banned.details + + def test_id_columns_detected(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False) + df["lead_id"] = range(len(df)) + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + id_check = next(c for c in checks if c.name == "no_id_columns") + assert not id_check.passed + + def test_enforce_row_count(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False) + cfg = ValidationConfig(enforce_row_count=True, expected_rows=1000) + checks = _check_schema(df, cfg) + rc = next(c for c in checks if c.name == "row_count") + assert not rc.passed + + def test_exact_row_count_passes(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False) + cfg = ValidationConfig(enforce_row_count=True, expected_rows=200) + checks = _check_schema(df, cfg) + rc = next(c for c in checks if c.name == "row_count") + assert rc.passed + + def test_duplicate_rows_detected(self, tmp_path): + df = _make_dataset(n=50, include_leakage=False) + # Duplicate a lot of rows + df = pd.concat([df, df], ignore_index=True) + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + dup = next(c for c in checks if c.name == "duplicates") + assert not dup.passed + + def test_missing_expected_features_warned(self): + """Dataset missing some expected features gets a warning (still passes).""" + df = pd.DataFrame({"converted": [0, 1, 0, 1]}) + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + feat = next(c for c in checks if c.name == "expected_features") + assert feat.passed # warning, not failure + assert "missing" in feat.details + + def test_total_touches_all_naming(self): + df = _make_dataset(n=200, include_leakage=False) + df["total_touches_all"] = 5 + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + naming = next(c for c in checks if c.name == "leakage_naming") + assert not naming.passed + + def test_no_leakage_columns(self): + df = _make_dataset(n=200, include_leakage=False) + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + naming = next(c for c in checks if c.name == "leakage_naming") + assert naming.passed + assert "no leakage" in naming.details + + def test_multiple_leakage_columns(self): + df = _make_dataset(n=200, include_leakage=True) + df["__leakage__another"] = 1 + cfg = ValidationConfig(enforce_row_count=False) + checks = _check_schema(df, cfg) + naming = next(c for c in checks if c.name == "leakage_naming") + assert naming.passed + assert "multiple" in naming.details + + +# --------------------------------------------------------------------------- +# Tests — missingness checks +# --------------------------------------------------------------------------- + + +class TestMissingness: + def test_high_missingness_fails(self): + df = _make_dataset(n=200, include_leakage=False) + df.loc[:40, "inbound_touches"] = np.nan # >20% missing + cfg = ValidationConfig(max_col_missing_rate=0.10) + checks, miss_map = _check_missingness(df, cfg) + assert not checks[0].passed + + def test_low_missingness_passes(self): + df = _make_dataset(n=200, include_leakage=False) + cfg = ValidationConfig(max_col_missing_rate=0.10) + checks, _ = _check_missingness(df, cfg) + assert checks[0].passed + + +# --------------------------------------------------------------------------- +# Tests — group determinism +# --------------------------------------------------------------------------- + + +class TestGroupDeterminism: + def test_deterministic_group_fails(self, bad_deterministic_csv): + report = validate_dataset(bad_deterministic_csv) + det_check = next(c for c in report.checks if c.name == "group_determinism") + assert not det_check.passed + assert "bad_feature" in det_check.details + + def test_low_conversion_group_fails(self, tmp_path): + """A group where conversion rate is near 0% should also fail.""" + df = _make_dataset(n=200, include_leakage=False) + df["bad_feature"] = "normal" + # First 60 rows all converted = 0 for this group + df.loc[:59, "bad_feature"] = "zero_group" + df.loc[:59, "converted"] = 0 + cfg = ValidationConfig(enforce_row_count=False, min_group_size=50) + checks = _check_group_determinism(df, cfg) + det = next(c for c in checks if c.name == "group_determinism") + assert not det.passed + + def test_good_dataset_passes_determinism(self, good_csv): + report = validate_dataset(good_csv) + det_check = next(c for c in report.checks if c.name == "group_determinism") + assert det_check.passed + + +# --------------------------------------------------------------------------- +# Tests — conversion rate +# --------------------------------------------------------------------------- + + +class TestConversionRate: + def test_rate_outside_range_fails(self): + # 5% conversion rate — below 15% + df = _make_dataset(n=200, conversion_rate=0.05, include_leakage=False) + checks = _check_conversion_rate(df) + assert not checks[0].passed + + def test_rate_in_range_passes(self): + df = _make_dataset(n=200, conversion_rate=0.30, include_leakage=False) + checks = _check_conversion_rate(df) + assert checks[0].passed + + +# --------------------------------------------------------------------------- +# Tests — ACV range +# --------------------------------------------------------------------------- + + +class TestACVRange: + def test_no_acv_column_skips(self): + df = _make_dataset(n=200, include_leakage=False).drop(columns=["expected_acv"]) + checks = _check_acv_range(df) + assert checks[0].passed + assert "skip" in checks[0].details + + def test_acv_all_nan_fails(self): + df = _make_dataset(n=200, include_leakage=False) + df["expected_acv"] = np.nan + checks = _check_acv_range(df) + assert not checks[0].passed + + def test_acv_below_floor_fails(self): + df = _make_dataset(n=200, include_leakage=False) + df.loc[0, "expected_acv"] = 1000 # way below 18k + checks = _check_acv_range(df) + assert not checks[0].passed + + def test_acv_above_cap_fails(self): + df = _make_dataset(n=200, include_leakage=False) + df.loc[0, "expected_acv"] = 200_000 # way above 120k + checks = _check_acv_range(df) + assert not checks[0].passed + + def test_acv_in_range_passes(self): + df = _make_dataset(n=200, include_leakage=False) + checks = _check_acv_range(df) + assert checks[0].passed + + +# --------------------------------------------------------------------------- +# Tests — baseline AUC check +# --------------------------------------------------------------------------- + + +class TestBaselineAUCCheck: + def test_auc_too_low_fails(self): + metrics = BaselineMetrics(seed=42, auc=0.50, pr_auc=0.30) + cfg = ValidationConfig(auc_lower=0.62, auc_upper=0.90) + checks = _check_baseline_auc(metrics, cfg) + assert not checks[0].passed + + def test_auc_too_high_fails(self): + metrics = BaselineMetrics(seed=42, auc=0.95, pr_auc=0.90) + cfg = ValidationConfig(auc_lower=0.62, auc_upper=0.90) + checks = _check_baseline_auc(metrics, cfg) + assert not checks[0].passed + + def test_auc_in_range_passes(self): + metrics = BaselineMetrics(seed=42, auc=0.75, pr_auc=0.60) + cfg = ValidationConfig(auc_lower=0.62, auc_upper=0.90) + checks = _check_baseline_auc(metrics, cfg) + assert checks[0].passed + + +# --------------------------------------------------------------------------- +# Tests — baseline metrics +# --------------------------------------------------------------------------- + + +class TestBaselineMetrics: + def test_baseline_computed(self, good_csv): + report = validate_dataset(good_csv) + assert report.baseline is not None + assert 0.0 < report.baseline.auc <= 1.0 + assert 0.0 < report.baseline.pr_auc <= 1.0 + assert 25 in report.baseline.precision_at_k + + def test_baseline_deterministic(self, good_csv): + """Same CSV + same config -> same AUC.""" + r1 = validate_dataset(good_csv) + r2 = validate_dataset(good_csv) + assert r1.baseline is not None + assert r2.baseline is not None + assert r1.baseline.auc == r2.baseline.auc + + def test_k_larger_than_test_set_skipped(self, tmp_path): + """If k > test set size, that k is skipped.""" + df = _make_dataset(n=20, include_leakage=False) + path = _save(df, tmp_path) + # ks=(25, 50) but test set is only ~6 rows + report = validate_dataset(path, ValidationConfig(enforce_row_count=False)) + assert report.baseline is not None + assert 25 not in report.baseline.precision_at_k + + +# --------------------------------------------------------------------------- +# Tests — leakage trap +# --------------------------------------------------------------------------- + + +class TestLeakageTrap: + def test_trap_detected(self, good_csv): + """Synthetic trap should produce positive delta on average.""" + report = validate_dataset(good_csv) + assert len(report.trap_metrics) == 1 + tm = report.trap_metrics[0] + assert tm.column == "__leakage__total_touches_90d" + # Our synthetic trap is strongly correlated, so mean delta should be positive + assert tm.mean_delta_auc > 0 + + def test_no_trap_columns_skips(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False) + path = _save(df, tmp_path) + report = validate_dataset(path, ValidationConfig(enforce_row_count=False)) + trap_check = [c for c in report.checks if c.name.startswith("leakage_trap")] + assert len(trap_check) == 1 + assert trap_check[0].passed + assert "skip" in trap_check[0].details + + def test_weak_trap_fails_checks(self, tmp_path): + """A trap column with no signal should fail threshold checks.""" + df = _make_dataset(n=200, include_leakage=False) + rng = np.random.RandomState(42) + # Add a random column with no leakage signal + df["__leakage__noise"] = rng.poisson(5, size=len(df)) + path = _save(df, tmp_path) + cfg = ValidationConfig( + enforce_row_count=False, + trap_mean_delta=0.05, # high threshold + trap_min_delta=0.03, + trap_n_seeds=3, + ) + report = validate_dataset(path, cfg) + trap_check = [c for c in report.checks if c.name.startswith("leakage_trap")] + # Random noise shouldn't reliably produce a large delta + assert len(trap_check) >= 1 + + +# --------------------------------------------------------------------------- +# Tests — value metrics +# --------------------------------------------------------------------------- + + +class TestValueMetrics: + def test_value_metrics_computed(self, good_csv): + report = validate_dataset(good_csv) + assert len(report.value_metrics) >= 1 + vm = report.value_metrics[0] + assert vm.captured_acv_by_prob >= 0 + assert vm.captured_acv_by_ev >= 0 + + def test_value_metrics_with_nan_acv(self, tmp_path): + """NaN in expected_acv should not propagate NaN into value metrics.""" + df = _make_dataset(n=200, include_leakage=False) + df.loc[:9, "expected_acv"] = np.nan + path = _save(df, tmp_path) + report = validate_dataset(path, ValidationConfig(enforce_row_count=False)) + for vm in report.value_metrics: + assert not np.isnan(vm.captured_acv_by_prob) + assert not np.isnan(vm.captured_acv_by_ev) + + def test_no_acv_column_returns_empty(self, tmp_path): + df = _make_dataset(n=200, include_leakage=False).drop(columns=["expected_acv"]) + path = _save(df, tmp_path) + report = validate_dataset(path, ValidationConfig(enforce_row_count=False)) + assert report.value_metrics == [] + + +# --------------------------------------------------------------------------- +# Tests — report +# --------------------------------------------------------------------------- + + +class TestReport: + def test_summary_string(self, good_csv): + report = validate_dataset(good_csv) + summary = report.summary() + assert "PASS" in summary or "FAIL" in summary + + def test_summary_all_passed(self, good_csv): + """Good CSV summary should contain ALL CHECKS PASSED.""" + report = validate_dataset(good_csv) + if report.passed: + assert "ALL CHECKS PASSED" in report.summary() + + def test_summary_negative_trap_delta(self): + """Summary shows warning for negative trap deltas.""" + report = ValidationReport(csv_path="test.csv") + report.trap_metrics = [ + TrapMetrics( + column="__leakage__test", + deltas_auc=[-0.01, 0.05], + deltas_pr_auc=[0.0, 0.05], + seeds=[42, 43], + ) + ] + summary = report.summary() + assert "negative" in summary + + def test_to_dict(self, good_csv): + report = validate_dataset(good_csv) + d = report.to_dict() + assert "passed" in d + assert "checks" in d + assert isinstance(d["checks"], list) + + def test_to_dict_includes_pr_auc_deltas(self, good_csv): + """to_dict should include PR-AUC deltas for trap metrics.""" + report = validate_dataset(good_csv) + if report.trap_metrics: + d = report.to_dict() + for tm in d["trap_metrics"]: + assert "deltas_pr_auc" in tm + assert "mean_delta_pr_auc" in tm + + def test_emit_release_snippet(self, good_csv): + report = validate_dataset(good_csv) + snippet = report.emit_release_snippet() + assert "BEGIN AUTO-METRICS" in snippet + assert "END AUTO-METRICS" in snippet + assert "ROC-AUC" in snippet + + def test_emit_release_snippet_uses_actual_test_size(self): + """Snippet should reflect the actual test_size, not hardcoded 70/30.""" + report = ValidationReport(csv_path="test.csv", test_size=0.20) + report.baseline = BaselineMetrics(seed=42, auc=0.75, pr_auc=0.60, base_rate=0.30) + snippet = report.emit_release_snippet() + assert "80/20" in snippet + + def test_emit_release_snippet_uses_actual_row_count(self): + """Missingness counts should use actual row count, not hardcoded 1000.""" + report = ValidationReport(csv_path="test.csv", n_rows=500) + report.missingness = {"web_sessions": 0.10} + snippet = report.emit_release_snippet() + # 0.10 * 500 = 50 + assert "50" in snippet + + def test_n_rows_and_test_size_set(self, good_csv): + """validate_dataset sets n_rows and test_size on the report.""" + cfg = ValidationConfig(test_size=0.25) + report = validate_dataset(good_csv, cfg) + assert report.n_rows == 200 + assert report.test_size == 0.25 + + def test_failed_report_is_not_passed(self, bad_deterministic_csv): + report = validate_dataset(bad_deterministic_csv) + assert not report.passed + assert report.n_errors > 0 + + def test_to_dict_with_value_and_trap_metrics(self, good_csv): + """Ensure to_dict includes value_metrics and trap_metrics when present.""" + report = validate_dataset(good_csv) + d = report.to_dict() + if report.value_metrics: + assert "value_metrics" in d + if report.trap_metrics: + assert "trap_metrics" in d + + def test_check_result_with_data(self): + cr = CheckResult("test", True, "ok", data={"key": "value"}) + assert cr.data == {"key": "value"} + + def test_trap_metrics_properties(self): + tm = TrapMetrics( + column="test", + deltas_auc=[0.01, 0.02, 0.03], + deltas_pr_auc=[0.01, 0.02, 0.03], + seeds=[42, 43, 44], + ) + assert tm.mean_delta_auc == pytest.approx(0.02) + assert tm.min_delta_auc == pytest.approx(0.01) + assert tm.max_delta_auc == pytest.approx(0.03)