diff --git a/.agent-plan.md b/.agent-plan.md index 46a1afc..55eb99c 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -35,10 +35,9 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family - [x] PR 2.2: `check_exposure_monotonicity` updated for v5 — student is allowed to omit `BANNED_TABLES`, drop `BANNED_LEAD_COLUMNS` / `BANNED_OPP_COLUMNS`, and be a row-subset of instructor on snapshot-filtered event tables. ### Phase 3 — Release validation hardening -- [ ] `leadforge/validation/{release_quality,leakage_probes,reporting}.py` (new) -- [ ] `scripts/validate_release_candidate.py` (new) -- [ ] Resolve numeric `TBD-*` bands in `v1_acceptance_gates.md` -- [ ] `release/validation/validation_report.{json,md}` + figures auto-generated +- [x] PR 3.1: `leadforge/validation/leakage_probes.py` (new) — unified leakage taxonomy. Subsumes the PR 2.1 `relational_leakage` module and broadens it to the full design-doc / acceptance-gates taxonomy: direct (banned columns / banned tables, generalised to accept caller-supplied banned sets), time-window (`probe_snapshot_window`, generalised over `(table, ts_col)` pairs), relational (`probe_deterministic_reconstruction`, `deterministic_relational_reconstruction`), split (`probe_split_id_overlap` for G6.1/G6.2, `probe_split_near_duplicates` via deterministic rounded-vector hashing for G6.3, `probe_split_label_drift` opt-in), model-realism (`probe_bonus_model_auc` opt-in, new opt-in `probe_id_only_baseline` for G5.3, `probe_feature_subset_baseline` for G5.1/G5.2). `PROBE_REGISTRY` is the single source of truth (probe → taxonomy / opt-in flag); meta-test asserts every module-level `probe_*` is registered. Two orchestrators: `run_all_probes` / `run_all_probes_on_dataframes` (structural, kept stable for `validate_bundle`) and new `run_split_probes` (split-level over `{split_name: DataFrame}`). `relational_leakage.py` deleted; every internal call site updated (`leadforge/validation/{bundle_checks,invariants}.py`, `leadforge/render/{manifests,relational_snapshot_safe}.py`, `leadforge/exposure/filters.py` doc, `scripts/probe_relational_leakage.py`); test file renamed `test_relational_leakage.py` → `test_leakage_probes.py` and grew 24 new tests for the new probes + meta-coverage. `RelationalLeakageError` retained (now spans every taxonomy) with `LeakageError` alias for the new umbrella name. `BUNDLE_SCHEMA_VERSION` unchanged (purely additive on the validator side); 1067/1067 tests pass; hash-determinism preserved (67/67 files identical); `scripts/probe_relational_leakage.py release/{intro,intermediate,advanced} --max-accuracy 0.65` exits 0 on every public tier. +- [ ] PR 3.2: `leadforge/validation/{release_quality,reporting}.py` (new) +- [ ] PR 3.3: `scripts/validate_release_candidate.py` (new); resolve numeric `TBD-*` bands in `v1_acceptance_gates.md`; `release/validation/validation_report.{json,md}` + figures auto-generated ### Phase 4 — Channel-signal audit + dataset card hardening - [ ] `scripts/audit_channel_signal.py` → `docs/release/channel_signal_audit.md` diff --git a/leadforge/exposure/filters.py b/leadforge/exposure/filters.py index 61bae36..1376674 100644 --- a/leadforge/exposure/filters.py +++ b/leadforge/exposure/filters.py @@ -32,11 +32,11 @@ class BundleFilter: must be projected onto the snapshot-safe shape before being written. When ``True``, the bundle writer routes through :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, - which strips :data:`leadforge.validation.relational_leakage.BANNED_LEAD_COLUMNS` - from ``leads``, :data:`~leadforge.validation.relational_leakage.BANNED_OPP_COLUMNS` + which strips :data:`leadforge.validation.leakage_probes.BANNED_LEAD_COLUMNS` + from ``leads``, :data:`~leadforge.validation.leakage_probes.BANNED_OPP_COLUMNS` from ``opportunities``, filters event tables per-lead by ``lead_created_at + snapshot_day``, and omits - :data:`~leadforge.validation.relational_leakage.BANNED_TABLES` + :data:`~leadforge.validation.leakage_probes.BANNED_TABLES` (``customers`` / ``subscriptions``) entirely. When ``False``, the writer emits the full-horizon export. """ diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 79bd73c..92e57a3 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -14,7 +14,7 @@ from typing import TYPE_CHECKING, Any from leadforge.core.hashing import file_sha256 -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( BANNED_LEAD_COLUMNS, BANNED_OPP_COLUMNS, BANNED_TABLES, diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py index 7d6378e..377d7d2 100644 --- a/leadforge/render/relational_snapshot_safe.py +++ b/leadforge/render/relational_snapshot_safe.py @@ -24,7 +24,7 @@ The ``research_instructor`` mode keeps using :func:`leadforge.render.relational.to_dataframes` for the full-horizon export. The contract constants live in -:mod:`leadforge.validation.relational_leakage` (validator owns the +:mod:`leadforge.validation.leakage_probes` (validator owns the definition of "leakage"); this module re-exports them for ergonomics. """ @@ -34,7 +34,7 @@ import pandas as pd -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( BANNED_LEAD_COLUMNS, BANNED_OPP_COLUMNS, BANNED_TABLES, diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index c92e27e..ad4215c 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -21,8 +21,7 @@ from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for from leadforge.schema.relationships import ALL_CONSTRAINTS from leadforge.validation.difficulty import check_difficulty -from leadforge.validation.realism import check_realism -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( BANNED_TABLES, LeakageReport, probe_banned_columns, @@ -30,6 +29,7 @@ probe_deterministic_reconstruction, run_all_probes, ) +from leadforge.validation.realism import check_realism def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[str]: @@ -311,7 +311,7 @@ def _check_relational_leakage(root: Path, manifest: dict[str, Any]) -> list[str] when ``snapshot_day`` is unavailable, with an explicit error surfaced so the gap is visible. - Each :class:`~leadforge.validation.relational_leakage.LeakageFinding` + Each :class:`~leadforge.validation.leakage_probes.LeakageFinding` is rendered as one error string, keeping the existing ``validate_bundle`` contract (return list of strings, empty = pass). """ @@ -354,10 +354,10 @@ def _read_relational_tables(root: Path) -> dict[str, pd.DataFrame]: """Read every public + banned-table parquet under ``/tables/``. Mirrors the read logic in - :func:`leadforge.validation.relational_leakage.run_all_probes` but + :func:`leadforge.validation.leakage_probes.run_all_probes` but is reusable for the snapshot_day-missing path above. """ - from leadforge.validation.relational_leakage import ( + from leadforge.validation.leakage_probes import ( BANNED_TABLES as _BANNED, ) diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py index 1278045..e61514b 100644 --- a/leadforge/validation/invariants.py +++ b/leadforge/validation/invariants.py @@ -18,7 +18,7 @@ from leadforge.core.hashing import file_sha256 from leadforge.render.manifests import NON_DETERMINISTIC_MANIFEST_FIELDS from leadforge.schema.features import redacted_columns_for -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( BANNED_LEAD_COLUMNS, BANNED_OPP_COLUMNS, BANNED_TABLES, diff --git a/leadforge/validation/leakage_probes.py b/leadforge/validation/leakage_probes.py new file mode 100644 index 0000000..2b373a4 --- /dev/null +++ b/leadforge/validation/leakage_probes.py @@ -0,0 +1,1344 @@ +"""Unified leakage taxonomy for ``leadforge-lead-scoring-v1`` validation. + +Subsumes ``leadforge.validation.relational_leakage`` (PR 2.1) and broadens +it to the full taxonomy from ``docs/release/v1_release_design.md`` / +``docs/leadforge_design_doc.md``: direct, time-window, relational, split, +and model-realism. PR 3.2's ``release_quality.py`` and PR 3.3's +``validate_release_candidate.py`` driver consume this module. + +Probe families +-------------- + +* **Direct** — :func:`probe_banned_columns`, :func:`probe_banned_tables`. + A published bundle must not carry the label or conversion-conditional + artefacts. Defaults match the snapshot-safe contract; both probes + accept caller-supplied banned sets so future publication channels + (Kaggle / HF / instructor companion) can declare their own. +* **Time-window** — :func:`probe_snapshot_window`. Every event row must + satisfy ``timestamp <= lead_created_at + horizon`` for the relevant + per-table timestamp column. +* **Relational** — :func:`probe_deterministic_reconstruction`, + :func:`deterministic_relational_reconstruction`. Pure-join paths + (B/C/D from the v1 audit) must produce zero positive predictions; + Path A is delegated to the banned-column probe. +* **Split** — :func:`probe_split_id_overlap`, + :func:`probe_split_near_duplicates`, :func:`probe_split_label_drift`. + Cross-split contamination via shared IDs, near-duplicate rows, or + drifted label rates. +* **Model realism** — :func:`probe_bonus_model_auc`, + :func:`probe_id_only_baseline`, :func:`probe_feature_subset_baseline`. + Calibrated baselines that flag *under-realistic* leakage (e.g. an + ID-only model that scores well, or post-snapshot aggregates that + saturate). All opt-in: PR 3.3 supplies per-tier ``max_auc`` bands. + +Orchestrators +------------- + +* :func:`run_all_probes_on_dataframes` / :func:`run_all_probes` — the + bundle-level structural orchestrator that PR 2.2 wires into + ``validate_bundle``. Skips opt-in probes unless explicitly asked. +* :func:`run_split_probes` — split-level orchestrator over a + ``{split_name: DataFrame}`` mapping. PR 3.2/3.3 will plumb this + through the release-quality driver. + +Probe registry +-------------- + +:data:`PROBE_REGISTRY` maps every probe name to its taxonomy and input +needs. The orchestrators iterate it; the meta-test +``test_probe_registry_covers_every_module_level_probe`` enforces that any new ``probe_*`` +function is registered, so a future "I added a probe but forgot to wire it" +regression fails loudly rather than silently. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Final + +import pandas as pd + +from leadforge.core.exceptions import LeadforgeError + +# --------------------------------------------------------------------------- +# Snapshot-safe contract — single source of truth for "what is leakage". +# ``leadforge.render.relational_snapshot_safe`` (writer) and +# ``leadforge.render.manifests`` (manifest's structural_redactions) import +# from here so the writer and the validator share one definition. +# --------------------------------------------------------------------------- + +#: Columns dropped from public ``leads.parquet``. +BANNED_LEAD_COLUMNS: Final[tuple[str, ...]] = ( + "converted_within_90_days", + "conversion_timestamp", +) + +#: Columns dropped from public ``opportunities.parquet``. +BANNED_OPP_COLUMNS: Final[tuple[str, ...]] = ( + "close_outcome", + "closed_at", +) + +#: Tables omitted from public bundles entirely. Conversion-conditional — +#: their mere presence reconstructs the label. +BANNED_TABLES: Final[tuple[str, ...]] = ("customers", "subscriptions") + +#: Default banned-columns map for the snapshot-safe contract, suitable as +#: the ``banned`` argument to :func:`probe_banned_columns`. +DEFAULT_BANNED_COLUMNS: Final[Mapping[str, tuple[str, ...]]] = { + "leads": BANNED_LEAD_COLUMNS, + "opportunities": BANNED_OPP_COLUMNS, +} + +#: Tables filtered per-lead by their timestamp column to +#: ``lead_created_at + snapshot_day``. ``opportunities`` is included +#: even though it is an entity table, because its ``created_at`` anchors +#: when the entity becomes observable in the funnel. +SNAPSHOT_FILTERED_TABLES: Final[tuple[tuple[str, str], ...]] = ( + ("touches", "touch_timestamp"), + ("sessions", "session_timestamp"), + ("sales_activities", "activity_timestamp"), + ("opportunities", "created_at"), +) + +#: Channel labels carried on :class:`LeakageFinding.channel`. Constants +#: rather than an enum because findings serialise straight to JSON. +CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" +CHANNEL_BANNED_TABLE: Final[str] = "banned_table" +CHANNEL_JOIN_RECONSTRUCTION: Final[str] = "join_reconstruction" +CHANNEL_SNAPSHOT_WINDOW: Final[str] = "snapshot_window" +CHANNEL_BONUS_MODEL: Final[str] = "bonus_model" +CHANNEL_SPLIT_ID_OVERLAP: Final[str] = "split_id_overlap" +CHANNEL_SPLIT_NEAR_DUPLICATE: Final[str] = "split_near_duplicate" +CHANNEL_SPLIT_LABEL_DRIFT: Final[str] = "split_label_drift" +CHANNEL_ID_ONLY_BASELINE: Final[str] = "id_only_baseline" +CHANNEL_FEATURE_SUBSET_BASELINE: Final[str] = "feature_subset_baseline" + +_PUBLIC_TABLES: Final[tuple[str, ...]] = ( + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", +) + + +@dataclass(frozen=True) +class LeakageFinding: + """One leakage-channel violation surfaced by a probe.""" + + channel: str + detail: str + message: str + + +@dataclass(frozen=True) +class LeakageReport: + """Aggregate result of a probe run. Empty :attr:`findings` means OK.""" + + findings: tuple[LeakageFinding, ...] + + @property + def ok(self) -> bool: + return len(self.findings) == 0 + + def raise_if_failing(self) -> None: + """Raise :class:`RelationalLeakageError` if any probe reported a finding.""" + if not self.ok: + raise RelationalLeakageError(self) + + +class RelationalLeakageError(LeadforgeError): + """Raised by :meth:`LeakageReport.raise_if_failing` on any finding. + + The name is retained from PR 2.1 for backward compatibility — the + error class now spans every taxonomy in this module, not just + relational join reconstruction. Carries the originating + :class:`LeakageReport` on ``self.report`` so callers (e.g. + ``leadforge validate``) can render the full set of findings rather + than just the first one. + """ + + def __init__(self, report: LeakageReport) -> None: + self.report = report + rendered = "\n".join(f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings) + super().__init__( + f"leakage probe(s) failed ({len(report.findings)} finding(s)):\n{rendered}" + ) + + +# --------------------------------------------------------------------------- +# Deterministic reconstruction — the join graph that defines paths A-E. +# ``scripts/probe_relational_leakage.py`` re-exports this function so the +# alpha-bundle audit and the validator agree by construction. +# --------------------------------------------------------------------------- + + +def deterministic_relational_reconstruction( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, +) -> pd.DataFrame: + """Reconstruct ``converted_within_90_days`` from public relational joins. + + Returns a DataFrame indexed by ``lead_id`` with five boolean columns, + one per reconstruction path (A-E). Path E is the union of B, C, D and + is the headline relational-leakage prediction. + + No hidden state, no model fit — pure joins. + + Empty ``customers``/``subscriptions`` frames are accepted (the + post-fix expected state); the corresponding paths simply return + all-False. + + Raises: + ValueError: if ``leads.lead_id`` contains duplicates. A validator + cannot operate safely on non-unique keys. + """ + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + + leads_idx = leads.set_index("lead_id", drop=False) + + # Path A — the label itself, if present in public leads. + # Plain ``astype(bool)`` would map NaN to True; route through pandas' + # nullable boolean dtype so missing values fill cleanly to False without + # triggering object-downcast warnings. + if "converted_within_90_days" in leads.columns: + path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) + else: + path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") + + # Path B — any opportunity with close_outcome == "closed_won". + if "close_outcome" in opportunities.columns and len(opportunities) > 0: + won_leads = set( + opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] + ) + else: + won_leads = set() + path_b = leads_idx["lead_id"].isin(won_leads) + + # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). + if len(opportunities) > 0: + opp_to_lead = dict( + zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) + ) + else: + opp_to_lead = {} + customer_leads = { + opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead + } + path_c = leads_idx["lead_id"].isin(customer_leads) + + # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). + if len(customers) > 0: + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + sub_leads: set[str] = set() + for cust_id in subscriptions["customer_id"]: + opp_id = cust_to_opp.get(cust_id) + if opp_id is None: + continue + lead_id = opp_to_lead.get(opp_id) + if lead_id is not None: + sub_leads.add(lead_id) + path_d = leads_idx["lead_id"].isin(sub_leads) + + # Path E — deterministic OR of B, C, D (the headline join-only path). + path_e = path_b | path_c | path_d + + return pd.DataFrame( + { + "path_a_direct_label": path_a.values, + "path_b_opportunity_won": path_b.values, + "path_c_customer_exists": path_c.values, + "path_d_subscription_exists": path_d.values, + "path_e_or_b_c_d": path_e.values, + }, + index=leads_idx.index, + ) + + +# --------------------------------------------------------------------------- +# §8.1 Direct leakage — banned columns / banned tables. +# --------------------------------------------------------------------------- + + +def probe_banned_columns( + tables: Mapping[str, pd.DataFrame], + *, + banned: Mapping[str, Iterable[str]] | None = None, +) -> list[LeakageFinding]: + """Public tables must not carry caller-banned columns. + + Args: + tables: Mapping of table name → DataFrame. + banned: Mapping of table name → banned column names. Defaults to + :data:`DEFAULT_BANNED_COLUMNS` (the snapshot-safe contract: + ``leads`` drops :data:`BANNED_LEAD_COLUMNS`, ``opportunities`` + drops :data:`BANNED_OPP_COLUMNS`). Pass an explicit mapping + to widen the contract for non-relational publication channels + (e.g. flat-CSV exports with their own redaction list). + + Detects Path A (label column directly readable from ``leads``) and the + ``opportunities.close_outcome`` / ``closed_at`` channels — i.e. leakage + that any caller can spot by listing column names, no joins required. + """ + spec = banned if banned is not None else DEFAULT_BANNED_COLUMNS + findings: list[LeakageFinding] = [] + for table_name, banned_cols in spec.items(): + df = tables.get(table_name) + if df is None: + continue + for col in banned_cols: + if col in df.columns: + findings.append( + LeakageFinding( + channel=CHANNEL_BANNED_COLUMN, + detail=f"{table_name}.{col}", + message=( + f"public {table_name}.parquet must not contain " + f"the banned column '{col}'" + ), + ) + ) + return findings + + +def probe_banned_tables( + table_names: Iterable[str], + *, + banned: Iterable[str] | None = None, +) -> list[LeakageFinding]: + """Public bundles must not include caller-banned tables. + + Args: + table_names: Names of tables present in the bundle. + banned: Iterable of banned table names. Defaults to + :data:`BANNED_TABLES` (the conversion-conditional + ``customers`` / ``subscriptions``). + """ + banned_set = tuple(banned) if banned is not None else BANNED_TABLES + present = set(table_names) + return [ + LeakageFinding( + channel=CHANNEL_BANNED_TABLE, + detail=name, + message=( + f"public bundles must not include '{name}.parquet' " + "(it exists only for converted leads, so its presence " + "reconstructs the label)" + ), + ) + for name in banned_set + if name in present + ] + + +# --------------------------------------------------------------------------- +# §8.2 Time-window leakage — events past the snapshot anchor. +# --------------------------------------------------------------------------- + + +def probe_snapshot_window( + tables: Mapping[str, pd.DataFrame], + snapshot_day: int, + *, + filtered_tables: Iterable[tuple[str, str]] | None = None, +) -> list[LeakageFinding]: + """Every event-table row must satisfy ``timestamp <= lead_created_at + snapshot_day``. + + Args: + tables: Mapping of table name → DataFrame. + snapshot_day: Number of days after ``lead_created_at`` beyond + which event rows are forbidden. Negative values raise. + filtered_tables: Iterable of ``(table_name, timestamp_column)`` + pairs to audit. Defaults to :data:`SNAPSHOT_FILTERED_TABLES`. + Pass an explicit list to widen the contract (e.g. for new + event tables added by a future task). + """ + if snapshot_day < 0: + raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") + pairs = tuple(filtered_tables) if filtered_tables is not None else SNAPSHOT_FILTERED_TABLES + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + if "lead_id" not in leads.columns or "lead_created_at" not in leads.columns: + raise ValueError("leads must contain 'lead_id' and 'lead_created_at' columns") + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + + anchor = leads[["lead_id", "lead_created_at"]].copy() + anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"], errors="coerce") + # NaT in the anchor would propagate to NaT cutoffs, then ``ts > NaT`` + # is NaN, and the violation count's ``fillna(False)`` would silently + # drop those rows — masking a data-quality bug in the bundle. Refuse + # to operate on a malformed anchor, same posture as the duplicate- + # lead_id check above. + nat_mask = anchor["lead_created_at"].isna() + if nat_mask.any(): + sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " + f"value(s); offending lead_id sample: {sample}" + ) + horizon = pd.Timedelta(days=snapshot_day) + + findings: list[LeakageFinding] = [] + for name, ts_col in pairs: + df = tables.get(name) + if df is None or len(df) == 0 or ts_col not in df.columns: + continue + merged = df[["lead_id", ts_col]].merge(anchor, on="lead_id", how="left") + # An event row whose lead_id has no match in leads gets NaT for + # ``lead_created_at`` after the left-merge; that row's cutoff is + # NaT and the violation count would silently miss it. An orphan + # event row is a structural FK violation (and a leakage attack + # surface — a tampered bundle could insert post-snapshot events + # tied to lead_ids absent from the public leads table). Refuse + # to bless it. + orphan_mask = merged["lead_created_at"].isna() + if orphan_mask.any(): + sample = merged.loc[orphan_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"{name}.parquet has {int(orphan_mask.sum())} row(s) referencing " + f"lead_id(s) absent from leads; sample: {sample}" + ) + ts = pd.to_datetime(merged[ts_col]) + cutoff = merged["lead_created_at"] + horizon + violations = int((ts > cutoff).fillna(False).sum()) + if violations > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_SNAPSHOT_WINDOW, + detail=f"{name}.{ts_col}", + message=( + f"{violations}/{len(df)} rows in {name}.parquet " + f"have {ts_col} > lead_created_at + {snapshot_day}d" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# §8.3 Relational leakage — deterministic join paths. +# --------------------------------------------------------------------------- + + +def probe_deterministic_reconstruction( + tables: Mapping[str, pd.DataFrame], +) -> list[LeakageFinding]: + """Audit paths B / C / D must produce zero positive predictions. + + This probe focuses exclusively on the **join-graph** reconstruction: + + * B — at least one opportunity with ``close_outcome == "closed_won"``; + * C — a joinable customer row reachable via ``opportunity_id``; + * D — a joinable subscription row reachable via ``customer_id``. + + Path A (direct read of ``leads.converted_within_90_days``) is *not* + checked here — it is the column-presence violation already raised by + :func:`probe_banned_columns`. Re-emitting it here would double-count + one defect across two channels. Tests assert this delegation + explicitly so that future maintainers don't widen the scope by + accident. + """ + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + opportunities = tables.get( + "opportunities", + _empty_frame({"opportunity_id": "string", "lead_id": "string"}), + ) + customers = tables.get( + "customers", + _empty_frame({"customer_id": "string", "opportunity_id": "string", "account_id": "string"}), + ) + subscriptions = tables.get( + "subscriptions", + _empty_frame({"subscription_id": "string", "customer_id": "string"}), + ) + + paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) + + findings: list[LeakageFinding] = [] + for path_col, label in ( + ("path_b_opportunity_won", "B (opportunity.close_outcome == 'closed_won')"), + ("path_c_customer_exists", "C (joined customer exists)"), + ("path_d_subscription_exists", "D (joined subscription exists)"), + ): + positive = int(paths[path_col].sum()) + if positive > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_JOIN_RECONSTRUCTION, + detail=path_col, + message=( + f"path {label} produced {positive}/{len(paths)} " + "positive predictions; a snapshot-safe public " + "bundle must produce zero" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# §8.4 Split leakage — ID overlap / near-duplicates / label drift across +# train / valid / test splits. +# --------------------------------------------------------------------------- + + +def probe_split_id_overlap( + splits: Mapping[str, pd.DataFrame], + *, + id_columns: Iterable[str] = ("lead_id",), +) -> list[LeakageFinding]: + """Report any value of ``id_columns`` appearing in more than one split. + + For ``lead_id`` (the default) any overlap is a hard contamination + finding — each lead is one row, so a clean random-split task should + never duplicate a lead_id across splits. + + For ``account_id`` / ``contact_id`` overlap is informational at this + layer: G6.1 / G6.2 in ``v1_acceptance_gates.md`` say it must be + *documented as intentional or absent*. PR 3.3 will set whether the + finding is a release blocker; this probe just surfaces it. + + Splits with no ``id_columns`` present are skipped (e.g. a flat task + table without ``account_id``). Empty splits are also skipped. + """ + findings: list[LeakageFinding] = [] + split_names = list(splits.keys()) + for col in id_columns: + # Build the union of (split_name, id_value) pairs once per column. + per_split: dict[str, set[Any]] = {} + for name, df in splits.items(): + if col not in df.columns or len(df) == 0: + continue + per_split[name] = set(df[col].dropna().tolist()) + # Pairwise overlap across splits (deduplicated by ordering names). + for i, a in enumerate(split_names): + if a not in per_split: + continue + for b in split_names[i + 1 :]: + if b not in per_split: + continue + shared = per_split[a] & per_split[b] + if not shared: + continue + # Sort the full overlap before slicing — set iteration order + # is implementation-defined, so ``list(shared)[:5]`` would + # yield non-reproducible sample messages across runs. + sample = sorted(str(s) for s in shared)[:5] + findings.append( + LeakageFinding( + channel=CHANNEL_SPLIT_ID_OVERLAP, + detail=f"{col}:{a}∩{b}", + message=( + f"{len(shared)} {col} value(s) appear in both " + f"'{a}' and '{b}' splits; sample={sample}" + ), + ) + ) + return findings + + +def probe_split_near_duplicates( + splits: Mapping[str, pd.DataFrame], + *, + feature_columns: Iterable[str], + decimals: int = 4, + max_findings: int = 5, +) -> list[LeakageFinding]: + """Detect rows in different splits that match after rounding numeric features. + + Pragmatic, deterministic, no-sklearn-needed approximation of + cosine-similarity ≈ 1 near-duplicate detection: round each numeric + feature in ``feature_columns`` to ``decimals`` places, stringify the + rounded vector, and look for collisions across splits. Catches the + common cases: + + * exact duplicates of the same record landing in two splits; + * records whose features are indistinguishable within sensible + numeric precision (e.g. two leads with identical aggregates). + + The probe deliberately under-reports rather than over-reports — + cosine-similarity machinery would flag spurious near-duplicates on + sparse one-hot-encoded data and flake under sklearn version drift. + Rounded-vector hashing is reproducible and orthogonal to model + choice. + + Skips silently (empty findings) when ``feature_columns`` is empty, + every split is empty, or every requested column is non-numeric or + all-NaN after coercion. Caller-provided columns missing from a + split are ignored on a per-split basis. + + Rows whose rounded signature is entirely ``"nan"`` after coercion + (e.g. all-non-numeric inputs, or rows with missing values for every + requested feature) are excluded from the comparison — they carry no + information and would otherwise collide as a single saturating + false-positive across splits. + """ + cols = list(feature_columns) + if not cols: + return [] + + rounded: dict[str, pd.Series] = {} + for name, df in splits.items(): + if len(df) == 0: + continue + present = [c for c in cols if c in df.columns] + if not present: + continue + # Coerce to numeric; non-numeric columns become NaN. We then + # drop rows whose entire signature is NaN — those carry no + # information and would otherwise saturate as a single + # collision across splits, which is a false positive (not a + # near-duplicate). + numeric = df[present].apply(pd.to_numeric, errors="coerce").round(decimals) + non_empty = numeric.notna().any(axis=1) + if not non_empty.any(): + continue + rounded[name] = numeric.loc[non_empty].astype(str).agg("|".join, axis=1) + + findings: list[LeakageFinding] = [] + split_names = list(rounded.keys()) + for i, a in enumerate(split_names): + for b in split_names[i + 1 :]: + shared = set(rounded[a]) & set(rounded[b]) + if not shared: + continue + # Sort the full overlap before slicing — set iteration order + # is implementation-defined, so ``list(shared)[:N]`` would + # yield non-reproducible sample messages across runs. + sample = sorted(shared)[:max_findings] + findings.append( + LeakageFinding( + channel=CHANNEL_SPLIT_NEAR_DUPLICATE, + detail=f"{a}∩{b}", + message=( + f"{len(shared)} row signature(s) (numeric features rounded " + f"to {decimals} dp) match between '{a}' and '{b}' splits; " + f"sample={sample}" + ), + ) + ) + return findings + + +def probe_split_label_drift( + splits: Mapping[str, pd.DataFrame], + *, + label_col: str, + max_drift: float, +) -> list[LeakageFinding]: + """Per-split positive rate must not drift beyond ``max_drift`` between any two splits. + + Drift is measured as the absolute difference of mean(label) across + each pair of splits. A drifted positive rate signals a non-IID + split (e.g. a time-cohort split that wasn't rebalanced) — useful + info for the release-quality report; opt-in because cohort splits + are *intentionally* drifted. + + Splits without ``label_col`` are skipped on a per-split basis. + Skips silently when fewer than two splits carry the label. + """ + if max_drift < 0: + raise ValueError(f"max_drift must be non-negative, got {max_drift}") + rates: dict[str, float] = {} + for name, df in splits.items(): + if label_col not in df.columns or len(df) == 0: + continue + rates[name] = float(df[label_col].astype("boolean").fillna(False).astype(int).mean()) + if len(rates) < 2: + return [] + + findings: list[LeakageFinding] = [] + names = list(rates.keys()) + for i, a in enumerate(names): + for b in names[i + 1 :]: + drift = abs(rates[a] - rates[b]) + if drift > max_drift: + findings.append( + LeakageFinding( + channel=CHANNEL_SPLIT_LABEL_DRIFT, + detail=f"{a}↔{b}", + message=( + f"|rate({a}) - rate({b})| = " + f"|{rates[a]:.3f} - {rates[b]:.3f}| = {drift:.3f} " + f"exceeds max_drift={max_drift:.3f}" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# §8.5 Model-realism — calibrated baselines. +# All opt-in: PR 3.3 supplies per-tier ``max_auc`` bands. +# --------------------------------------------------------------------------- + + +def probe_bonus_model_auc( + tables: Mapping[str, pd.DataFrame], + *, + max_auc: float, + seed: int = 42, + label: pd.Series | None = None, +) -> list[LeakageFinding]: + """Opt-in honest-feature baseline: 5-fold CV LR + HistGBM AUC. + + Trains on per-lead aggregates (``n_opps`` / ``max_acv`` / + ``mean_acv``, plus ``n_customers`` / ``n_subscriptions`` if those + tables are present) and asserts the mean cross-validated AUC stays + below ``max_auc``. + + Caller responsibilities: + + * ``max_auc`` is required. PR 2.1 ships this probe with no + calibrated threshold; PR 3.3 will land per-tier bands. + * ``label`` must be a :class:`pandas.Series` indexed by ``lead_id`` + (``index.name == "lead_id"``) **and** cover every lead in the + bundle. Both are enforced — a misaligned or partial label would + silently neutralise the probe (via the binary-cardinality gate + or NaN folds), which defeats the validator's purpose. + + The probe skips silently (no findings, no error) when: + + * scikit-learn is not installed; + * ``leads`` is missing or empty; + * the label is unavailable (no ``label`` argument and the public + bundle has correctly redacted ``converted_within_90_days``); + * the label has fewer than two classes after alignment; + * the smaller class has fewer members than the minimum needed for + stratified CV (``n_splits >= 2``). + """ + sk = _import_sklearn() + if sk is None: + return [] + + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + y_series = _resolve_label(leads, label) + if y_series is None: + return [] + + features = _build_relational_features(leads, tables) + if features.empty or len(features.columns) == 0: + return [] + + aligned = y_series.reindex(features.index) + if aligned.isna().any(): + missing = int(aligned.isna().sum()) + raise ValueError( + f"label is missing values for {missing} lead_id(s) present in the " + "bundle; supply a complete label or omit it to read from leads" + ) + y = aligned.astype(int) + if y.nunique(dropna=True) < 2: + return [] + + # Stratified CV needs at least n_splits members in each class. If the + # smaller class is below that, the probe can't run — skip silently + # (this is a sample-size constraint, not a leakage finding). + min_class = int(y.value_counts().min()) + n_splits = min(5, min_class) + if n_splits < 2: + return [] + + models: dict[str, Any] = { + "logistic_regression": sk.Pipeline( + [("scaler", sk.StandardScaler()), ("clf", sk.LogisticRegression(max_iter=1000))] + ), + "hist_gbm": sk.Pipeline([("clf", sk.HistGradientBoostingClassifier(random_state=seed))]), + } + skf = sk.StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed) + + findings: list[LeakageFinding] = [] + for name, pipe in models.items(): + aucs: list[float] = [] + for train_idx, test_idx in skf.split(features.values, y.values): + x_tr, x_te = features.values[train_idx], features.values[test_idx] + y_tr, y_te = y.values[train_idx], y.values[test_idx] + pipe.fit(x_tr, y_tr) + proba = pipe.predict_proba(x_te)[:, 1] + aucs.append(float(sk.roc_auc_score(y_te, proba))) + auc_mean = sum(aucs) / len(aucs) + if auc_mean > max_auc: + findings.append( + LeakageFinding( + channel=CHANNEL_BONUS_MODEL, + detail=name, + message=( + f"{n_splits}-fold CV AUC {auc_mean:.3f} on join-derived " + f"features exceeds max_auc={max_auc:.3f}; honest " + "aggregates carry stronger signal than the band allows" + ), + ) + ) + return findings + + +def probe_id_only_baseline( + splits: Mapping[str, pd.DataFrame], + *, + label_col: str, + max_auc: float, + id_columns: Iterable[str] = ("lead_id", "account_id", "contact_id"), + seed: int = 42, +) -> list[LeakageFinding]: + """Opt-in: a model trained on IDs alone must not predict the label. + + A passing baseline (mean test AUC ≤ ``max_auc``, expected near 0.5) + confirms IDs carry no signal. A failing baseline reveals + ID-encoded leakage (e.g. IDs reflecting creation order correlated + with conversion, or IDs leaking through the ordinal hash of an + upstream sort key). + + Trains on the ``train`` split, evaluates on each of ``valid`` and + ``test`` if present. Splits provide the train/test boundary, so no + cross-validation here — split contamination is the next concern up + and a separate probe. + + Skips silently when sklearn is unavailable, when ``train`` is + missing, when no ``id_columns`` are present in the train split, + when the train label has fewer than two classes, or when no + evaluation split is available. + """ + sk = _import_sklearn() + if sk is None: + return [] + + train = splits.get("train") + if train is None or len(train) == 0 or label_col not in train.columns: + return [] + cols = [c for c in id_columns if c in train.columns] + if not cols: + return [] + y_train = train[label_col].astype("boolean").fillna(False).astype(int) + if y_train.nunique() < 2: + return [] + + # Hash the IDs to integers — the model needs numeric input but we + # don't want to one-hot every ID (would explode dimension and would + # *guarantee* AUC near 1.0 on the train split via memorisation). + # The hash is deterministic across runs and small enough for HistGBM. + x_train = _hash_id_columns(train[cols]) + + eval_splits = { + name: df + for name, df in splits.items() + if name != "train" + and len(df) > 0 + and label_col in df.columns + and all(c in df.columns for c in cols) + } + if not eval_splits: + return [] + + model = sk.HistGradientBoostingClassifier(random_state=seed, max_iter=100) + model.fit(x_train.values, y_train.values) + + findings: list[LeakageFinding] = [] + for name, df in eval_splits.items(): + y_eval = df[label_col].astype("boolean").fillna(False).astype(int) + if y_eval.nunique() < 2: + continue + x_eval = _hash_id_columns(df[cols]) + proba = model.predict_proba(x_eval.values)[:, 1] + auc = float(sk.roc_auc_score(y_eval.values, proba)) + if auc > max_auc: + findings.append( + LeakageFinding( + channel=CHANNEL_ID_ONLY_BASELINE, + detail=f"split={name},cols={','.join(cols)}", + message=( + f"HistGBM trained on hashed {cols} alone reaches " + f"AUC {auc:.3f} on '{name}' (max_auc={max_auc:.3f}); " + "IDs carry signal they should not" + ), + ) + ) + return findings + + +def probe_feature_subset_baseline( + splits: Mapping[str, pd.DataFrame], + *, + feature_columns: Iterable[str], + label_col: str, + max_auc: float, + name: str = "subset", + seed: int = 42, +) -> list[LeakageFinding]: + """Opt-in: a model trained on a feature subset must not predict the label above ``max_auc``. + + The umbrella probe behind G5.1 (post-snapshot aggregates) and G5.2 + (suspect-stage columns). Caller declares the suspect subset; a + mean-evaluation-AUC > ``max_auc`` is a finding. + + Train on ``train``, evaluate on each non-``train`` split present. + All numeric coercion is via :func:`pandas.to_numeric` with + ``errors="coerce"`` — non-numeric / missing values become NaN and + are passed straight to HistGBM (which handles NaN natively); LR + would not, so we use HistGBM only. + + Skips silently when sklearn is unavailable, when ``train`` is + missing or has < 2 classes, when no requested columns are present, + or when no evaluation split is available. + """ + sk = _import_sklearn() + if sk is None: + return [] + + train = splits.get("train") + if train is None or len(train) == 0 or label_col not in train.columns: + return [] + cols = [c for c in feature_columns if c in train.columns] + if not cols: + return [] + y_train = train[label_col].astype("boolean").fillna(False).astype(int) + if y_train.nunique() < 2: + return [] + + x_train = train[cols].apply(pd.to_numeric, errors="coerce") + + eval_splits = { + eval_name: df + for eval_name, df in splits.items() + if eval_name != "train" + and len(df) > 0 + and label_col in df.columns + and all(c in df.columns for c in cols) + } + if not eval_splits: + return [] + + model = sk.HistGradientBoostingClassifier(random_state=seed, max_iter=100) + model.fit(x_train.values, y_train.values) + + findings: list[LeakageFinding] = [] + for eval_name, df in eval_splits.items(): + y_eval = df[label_col].astype("boolean").fillna(False).astype(int) + if y_eval.nunique() < 2: + continue + x_eval = df[cols].apply(pd.to_numeric, errors="coerce") + proba = model.predict_proba(x_eval.values)[:, 1] + auc = float(sk.roc_auc_score(y_eval.values, proba)) + if auc > max_auc: + findings.append( + LeakageFinding( + channel=CHANNEL_FEATURE_SUBSET_BASELINE, + detail=f"name={name},split={eval_name}", + message=( + f"HistGBM trained on '{name}' subset ({len(cols)} cols) " + f"reaches AUC {auc:.3f} on '{eval_name}' " + f"(max_auc={max_auc:.3f})" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# Probe registry — single source of truth for "what probes exist and what +# do they need". The orchestrators iterate it; the meta-test asserts that +# every module-level ``probe_*`` function is registered. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class ProbeSpec: + """Metadata for one probe. + + Attributes: + name: Bare function name (matches the registered key). + callable: The probe function itself. + taxonomy: One of ``"direct"``, ``"time_window"``, ``"relational"``, + ``"split"``, ``"model_realism"``. + opt_in: True iff the probe needs caller-supplied calibrated + thresholds (currently every model-realism probe and the + label-drift split probe). + """ + + name: str + callable: Callable[..., list[LeakageFinding]] + taxonomy: str + opt_in: bool + + +PROBE_REGISTRY: Final[Mapping[str, ProbeSpec]] = { + "banned_columns": ProbeSpec("banned_columns", probe_banned_columns, "direct", opt_in=False), + "banned_tables": ProbeSpec("banned_tables", probe_banned_tables, "direct", opt_in=False), + "snapshot_window": ProbeSpec( + "snapshot_window", probe_snapshot_window, "time_window", opt_in=False + ), + "deterministic_reconstruction": ProbeSpec( + "deterministic_reconstruction", + probe_deterministic_reconstruction, + "relational", + opt_in=False, + ), + "split_id_overlap": ProbeSpec( + "split_id_overlap", probe_split_id_overlap, "split", opt_in=False + ), + "split_near_duplicates": ProbeSpec( + "split_near_duplicates", probe_split_near_duplicates, "split", opt_in=False + ), + "split_label_drift": ProbeSpec( + "split_label_drift", probe_split_label_drift, "split", opt_in=True + ), + "bonus_model_auc": ProbeSpec( + "bonus_model_auc", probe_bonus_model_auc, "model_realism", opt_in=True + ), + "id_only_baseline": ProbeSpec( + "id_only_baseline", probe_id_only_baseline, "model_realism", opt_in=True + ), + "feature_subset_baseline": ProbeSpec( + "feature_subset_baseline", + probe_feature_subset_baseline, + "model_realism", + opt_in=True, + ), +} + + +# --------------------------------------------------------------------------- +# Orchestrators +# --------------------------------------------------------------------------- + + +def run_all_probes_on_dataframes( + tables: Mapping[str, pd.DataFrame], + *, + snapshot_day: int, + bonus_model_max_auc: float | None = None, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every structural relational/time-window probe; bonus probe iff threshold given. + + This is the bundle-level orchestrator wired into ``validate_bundle`` + via :func:`run_all_probes`. Split-level probes have their own + orchestrator (:func:`run_split_probes`) because they consume the + task-split files, not the relational ``tables/`` dict. + """ + findings: list[LeakageFinding] = [] + findings += probe_banned_columns(tables) + findings += probe_banned_tables(tables.keys()) + findings += probe_deterministic_reconstruction(tables) + findings += probe_snapshot_window(tables, snapshot_day=snapshot_day) + if bonus_model_max_auc is not None: + findings += probe_bonus_model_auc(tables, max_auc=bonus_model_max_auc, label=label) + return LeakageReport(findings=tuple(findings)) + + +def run_all_probes( + bundle_dir: Path, + *, + snapshot_day: int, + bonus_model_max_auc: float | None = None, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every structural probe against ``/tables/*.parquet``. + + Args: + bundle_dir: Bundle root (must contain ``tables/leads.parquet``). + snapshot_day: Snapshot window for the timestamp probe. The + caller (typically ``validate_bundle``) is expected to read + it from ``manifest.json``. + bonus_model_max_auc: Pass a numeric threshold to enable the + opt-in :func:`probe_bonus_model_auc`. ``None`` (default) + skips it — the calibrated band ships in PR 3.3. + label: Optional ground-truth labels for the bonus probe when + ``leads.converted_within_90_days`` has been redacted. Must + be indexed by ``lead_id`` (see :func:`probe_bonus_model_auc`). + Ignored when ``bonus_model_max_auc`` is ``None``. + + Raises: + FileNotFoundError: if ``/tables/`` is missing or + ``leads.parquet`` is not present. + """ + tables_dir = bundle_dir / "tables" + if not tables_dir.is_dir(): + raise FileNotFoundError(f"missing tables/ under {bundle_dir}") + if not (tables_dir / "leads.parquet").exists(): + raise FileNotFoundError(f"missing required leads.parquet under {tables_dir}") + + tables: dict[str, pd.DataFrame] = {} + for name in (*_PUBLIC_TABLES, *BANNED_TABLES): + path = tables_dir / f"{name}.parquet" + if path.exists(): + tables[name] = pd.read_parquet(path) + return run_all_probes_on_dataframes( + tables, + snapshot_day=snapshot_day, + bonus_model_max_auc=bonus_model_max_auc, + label=label, + ) + + +def run_split_probes( + splits: Mapping[str, pd.DataFrame], + *, + label_col: str = "converted_within_90_days", + id_columns: Iterable[str] = ("lead_id",), + near_duplicate_columns: Iterable[str] | None = None, + near_duplicate_decimals: int = 4, + label_drift_max: float | None = None, + id_only_max_auc: float | None = None, + id_only_columns: Iterable[str] = ("lead_id", "account_id", "contact_id"), + feature_subsets: Mapping[str, tuple[float, Iterable[str]]] | None = None, +) -> LeakageReport: + """Run split-level leakage probes over a ``{split_name: DataFrame}`` mapping. + + Args: + splits: Mapping of split name (``train`` / ``valid`` / ``test``) + to the corresponding DataFrame. Empty splits are skipped. + label_col: Label column name. Defaults to the v1 task target. + id_columns: ID columns audited for cross-split overlap. Defaults + to ``("lead_id",)``. + near_duplicate_columns: Numeric feature columns to use for + near-duplicate detection. ``None`` (default) skips the + probe entirely. + near_duplicate_decimals: Rounding precision for the near- + duplicate signature (see :func:`probe_split_near_duplicates`). + label_drift_max: Pass a positive float to enable + :func:`probe_split_label_drift`. ``None`` skips it. + id_only_max_auc: Pass a numeric threshold to enable + :func:`probe_id_only_baseline`. ``None`` skips it. + id_only_columns: ID columns to feed the ID-only baseline. + feature_subsets: Optional mapping ``name → (max_auc, columns)``. + For each entry, runs :func:`probe_feature_subset_baseline` + with the given subset. Used by PR 3.3 to wire up the + post-snapshot-aggregate / suspect-stage / etc. baselines. + """ + findings: list[LeakageFinding] = [] + findings += probe_split_id_overlap(splits, id_columns=id_columns) + if near_duplicate_columns is not None: + findings += probe_split_near_duplicates( + splits, + feature_columns=near_duplicate_columns, + decimals=near_duplicate_decimals, + ) + if label_drift_max is not None: + findings += probe_split_label_drift(splits, label_col=label_col, max_drift=label_drift_max) + if id_only_max_auc is not None: + findings += probe_id_only_baseline( + splits, + label_col=label_col, + max_auc=id_only_max_auc, + id_columns=id_only_columns, + ) + if feature_subsets: + for subset_name, (max_auc, cols) in feature_subsets.items(): + findings += probe_feature_subset_baseline( + splits, + feature_columns=cols, + label_col=label_col, + max_auc=max_auc, + name=subset_name, + ) + return LeakageReport(findings=tuple(findings)) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _empty_frame(dtype_map: dict[str, str]) -> pd.DataFrame: + return pd.DataFrame({c: pd.Series(dtype=d) for c, d in dtype_map.items()}) + + +def _resolve_label( + leads: pd.DataFrame, + label: pd.Series | None, +) -> pd.Series | None: + """Pick a label series to score against, or ``None`` to skip the probe. + + A caller-supplied ``label`` must be indexed by ``lead_id`` + (``index.name == "lead_id"``). Without that guarantee a misaligned + label would silently skip the probe via the binary-cardinality gate + downstream — exactly the kind of hidden no-op a leakage validator + must not have. + """ + if label is not None: + if label.index.name != "lead_id": + raise ValueError( + "label must be a pandas.Series indexed by lead_id " + f"(got index.name={label.index.name!r})" + ) + return label.astype("boolean").fillna(False).astype(int) + if "converted_within_90_days" in leads.columns: + return ( + leads.set_index("lead_id")["converted_within_90_days"] + .astype("boolean") + .fillna(False) + .astype(int) + ) + return None + + +def _build_relational_features( + leads: pd.DataFrame, + tables: Mapping[str, pd.DataFrame], +) -> pd.DataFrame: + """Per-lead aggregates from joinable public/optional relational tables. + + Honest features only — no aggregate of ``close_outcome``. Customers + and subscriptions counts are included only when the corresponding + tables exist (i.e. on a tampered bundle); on a clean public bundle + they default to 0 and the model can discard the column. + """ + opps = tables.get("opportunities") + customers = tables.get("customers") + subscriptions = tables.get("subscriptions") + + feats = leads[["lead_id"]].copy() + + if opps is not None and len(opps) > 0: + agg: dict[str, tuple[str, str]] = {"n_opps": ("opportunity_id", "count")} + if "estimated_acv" in opps.columns: + agg["max_acv"] = ("estimated_acv", "max") + agg["mean_acv"] = ("estimated_acv", "mean") + opp_agg = opps.groupby("lead_id").agg(**agg).reset_index() + feats = feats.merge(opp_agg, on="lead_id", how="left") + opp_to_lead = dict(zip(opps["opportunity_id"], opps["lead_id"], strict=False)) + else: + opp_to_lead = {} + + if customers is not None and len(customers) > 0: + cust = customers.copy() + cust["lead_id"] = cust["opportunity_id"].map(opp_to_lead) + cust_agg = cust.groupby("lead_id").size().rename("n_customers").reset_index() + feats = feats.merge(cust_agg, on="lead_id", how="left") + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + + if subscriptions is not None and len(subscriptions) > 0: + subs = subscriptions.copy() + subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) + subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) + sub_agg = subs.groupby("lead_id").size().rename("n_subscriptions").reset_index() + feats = feats.merge(sub_agg, on="lead_id", how="left") + + fill_defaults: dict[str, float] = { + "n_opps": 0.0, + "max_acv": 0.0, + "mean_acv": 0.0, + "n_customers": 0.0, + "n_subscriptions": 0.0, + } + for col, default in fill_defaults.items(): + if col in feats.columns: + feats[col] = feats[col].fillna(default) + else: + feats[col] = default + + feature_cols = list(fill_defaults.keys()) + return feats.set_index("lead_id")[feature_cols].astype(float) + + +def _hash_id_columns(df: pd.DataFrame) -> pd.DataFrame: + """Map opaque ID strings to deterministic 32-bit hashes (stored as int64) per column. + + HistGBM handles integer features natively; one-hot encoding every + distinct ID would explode dimension and let the model memorise the + train split (guaranteeing AUC ~ 1.0 on train and ~ 0.5 elsewhere, + which is *not* the leakage signal we want to surface). Hashing + keeps the same train/eval semantics as the production model + pipeline while bounding feature width. + """ + import hashlib + + def _h(value: object) -> int: + # Fixed-output stable hash; ``hash()`` is salted per process. + digest = hashlib.blake2b(str(value).encode("utf-8"), digest_size=4).digest() + return int.from_bytes(digest, "big", signed=False) + + return pd.DataFrame({col: df[col].map(_h).astype("int64") for col in df.columns}) + + +@dataclass(frozen=True) +class _SklearnHandles: + Pipeline: Any + StandardScaler: Any + LogisticRegression: Any + HistGradientBoostingClassifier: Any + StratifiedKFold: Any + roc_auc_score: Any + + +def _import_sklearn() -> _SklearnHandles | None: + """Lazy import for sklearn; returns ``None`` if not installed. + + Centralised so every model-realism probe agrees on which symbols it + needs and so the skip-cleanly-without-sklearn behaviour is uniform. + """ + try: + from sklearn.ensemble import HistGradientBoostingClassifier + from sklearn.linear_model import LogisticRegression + from sklearn.metrics import roc_auc_score + from sklearn.model_selection import StratifiedKFold + from sklearn.pipeline import Pipeline + from sklearn.preprocessing import StandardScaler + except ImportError: + return None + return _SklearnHandles( + Pipeline=Pipeline, + StandardScaler=StandardScaler, + LogisticRegression=LogisticRegression, + HistGradientBoostingClassifier=HistGradientBoostingClassifier, + StratifiedKFold=StratifiedKFold, + roc_auc_score=roc_auc_score, + ) + + +__all__ = [ + "BANNED_LEAD_COLUMNS", + "BANNED_OPP_COLUMNS", + "BANNED_TABLES", + "CHANNEL_BANNED_COLUMN", + "CHANNEL_BANNED_TABLE", + "CHANNEL_BONUS_MODEL", + "CHANNEL_FEATURE_SUBSET_BASELINE", + "CHANNEL_ID_ONLY_BASELINE", + "CHANNEL_JOIN_RECONSTRUCTION", + "CHANNEL_SNAPSHOT_WINDOW", + "CHANNEL_SPLIT_ID_OVERLAP", + "CHANNEL_SPLIT_LABEL_DRIFT", + "CHANNEL_SPLIT_NEAR_DUPLICATE", + "DEFAULT_BANNED_COLUMNS", + "LeakageFinding", + "LeakageReport", + "PROBE_REGISTRY", + "ProbeSpec", + "RelationalLeakageError", + "SNAPSHOT_FILTERED_TABLES", + "deterministic_relational_reconstruction", + "probe_banned_columns", + "probe_banned_tables", + "probe_bonus_model_auc", + "probe_deterministic_reconstruction", + "probe_feature_subset_baseline", + "probe_id_only_baseline", + "probe_snapshot_window", + "probe_split_id_overlap", + "probe_split_label_drift", + "probe_split_near_duplicates", + "run_all_probes", + "run_all_probes_on_dataframes", + "run_split_probes", +] diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py deleted file mode 100644 index 3299e02..0000000 --- a/leadforge/validation/relational_leakage.py +++ /dev/null @@ -1,719 +0,0 @@ -"""Probes that detect public-bundle reconstruction of ``converted_within_90_days``. - -The audit in ``docs/release/v1_current_state_audit.md`` enumerates four -deterministic paths (A-E) by which alpha public bundles reconstruct the -target via joins. This module is the validator that asserts the -snapshot-safe contract — encoded in :data:`BANNED_LEAD_COLUMNS`, -:data:`BANNED_OPP_COLUMNS`, :data:`BANNED_TABLES`, and -:data:`SNAPSHOT_FILTERED_TABLES` — is in place on any bundle claiming to -be ``student_public``. The matching writer-side enforcement lives in -:mod:`leadforge.render.relational_snapshot_safe` and imports the same -constants from this module. - -Five probes, each producing zero or more :class:`LeakageFinding`: - -* :func:`probe_banned_columns` — public ``leads`` and ``opportunities`` - must not contain :data:`BANNED_LEAD_COLUMNS` or - :data:`BANNED_OPP_COLUMNS` respectively. -* :func:`probe_banned_tables` — public bundles must not include - :data:`BANNED_TABLES`. -* :func:`probe_deterministic_reconstruction` — paths B / C / D from the - audit must produce zero positive predictions. **Path A is not - checked here** — it is the column-presence violation already covered - by :func:`probe_banned_columns`. -* :func:`probe_snapshot_window` — every event-table row must satisfy - ``timestamp <= lead_created_at + snapshot_day``. -* :func:`probe_bonus_model_auc` — *opt-in* honest-feature baseline: - trains LR + HistGBM on the legitimate aggregates ``n_opps`` / - ``max_acv`` / ``mean_acv`` (plus ``n_customers`` / - ``n_subscriptions`` if present) and asserts CV AUC stays below an - explicit ``max_auc``. The orchestrators skip this probe unless the - caller passes ``bonus_model_max_auc=...``. - -:func:`run_all_probes` is the file-based orchestrator (designed to be -called from :func:`leadforge.validation.bundle_checks.validate_bundle`). -:func:`run_all_probes_on_dataframes` is the same orchestrator without -the disk read, for unit tests against in-memory bundles. - -The :func:`deterministic_relational_reconstruction` function is the -single source of truth for the join graph that defines paths A-E. The -companion script ``scripts/probe_relational_leakage.py`` re-exports it -from here so the alpha-bundle audit and the validator agree by -construction. -""" - -from __future__ import annotations - -from collections.abc import Iterable, Mapping -from dataclasses import dataclass -from pathlib import Path -from typing import Final - -import pandas as pd - -from leadforge.core.exceptions import LeadforgeError - -# --------------------------------------------------------------------------- -# Snapshot-safe contract — the single source of truth for "what is leakage". -# leadforge.render.relational_snapshot_safe imports these so the writer -# and the validator share one definition. -# --------------------------------------------------------------------------- - -#: Columns dropped from public ``leads.parquet``. -BANNED_LEAD_COLUMNS: Final[tuple[str, ...]] = ( - "converted_within_90_days", - "conversion_timestamp", -) - -#: Columns dropped from public ``opportunities.parquet``. -BANNED_OPP_COLUMNS: Final[tuple[str, ...]] = ( - "close_outcome", - "closed_at", -) - -#: Tables omitted from public bundles entirely. -BANNED_TABLES: Final[tuple[str, ...]] = ("customers", "subscriptions") - -#: Tables filtered per-lead by their timestamp column to -#: ``lead_created_at + snapshot_day``. ``opportunities`` is included -#: even though it is an entity table, because its ``created_at`` -#: anchors when the entity becomes observable in the funnel. -SNAPSHOT_FILTERED_TABLES: Final[tuple[tuple[str, str], ...]] = ( - ("touches", "touch_timestamp"), - ("sessions", "session_timestamp"), - ("sales_activities", "activity_timestamp"), - ("opportunities", "created_at"), -) - -#: Channel labels carried on :class:`LeakageFinding.channel`. Constants -#: rather than an enum because findings serialise straight to JSON. -CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" -CHANNEL_BANNED_TABLE: Final[str] = "banned_table" -CHANNEL_JOIN_RECONSTRUCTION: Final[str] = "join_reconstruction" -CHANNEL_SNAPSHOT_WINDOW: Final[str] = "snapshot_window" -CHANNEL_BONUS_MODEL: Final[str] = "bonus_model" - -_PUBLIC_TABLES: Final[tuple[str, ...]] = ( - "accounts", - "contacts", - "leads", - "touches", - "sessions", - "sales_activities", - "opportunities", -) - - -@dataclass(frozen=True) -class LeakageFinding: - """One leakage-channel violation surfaced by a probe.""" - - channel: str - detail: str - message: str - - -@dataclass(frozen=True) -class LeakageReport: - """Aggregate result of a probe run. Empty :attr:`findings` means OK.""" - - findings: tuple[LeakageFinding, ...] - - @property - def ok(self) -> bool: - return len(self.findings) == 0 - - def raise_if_failing(self) -> None: - """Raise :class:`RelationalLeakageError` if any probe reported a finding.""" - if not self.ok: - raise RelationalLeakageError(self) - - -class RelationalLeakageError(LeadforgeError): - """Raised by :meth:`LeakageReport.raise_if_failing` on any finding. - - Carries the originating :class:`LeakageReport` on ``self.report`` so - callers (e.g. ``leadforge validate``) can render the full set of - findings rather than just the first one. - """ - - def __init__(self, report: LeakageReport) -> None: - self.report = report - rendered = "\n".join(f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings) - super().__init__( - f"public bundle leaks `converted_within_90_days` " - f"({len(report.findings)} finding(s)):\n{rendered}" - ) - - -# --------------------------------------------------------------------------- -# Deterministic reconstruction — the join graph that defines paths A-E. -# Lifted from the PR 1.1 audit script; the script now re-exports this -# function from here so there is one implementation. -# --------------------------------------------------------------------------- - - -def deterministic_relational_reconstruction( - leads: pd.DataFrame, - opportunities: pd.DataFrame, - customers: pd.DataFrame, - subscriptions: pd.DataFrame, -) -> pd.DataFrame: - """Reconstruct ``converted_within_90_days`` from public relational joins. - - Returns a DataFrame indexed by ``lead_id`` with five boolean columns, - one per reconstruction path (A-E). Path E is the union of B, C, D and - is the headline relational-leakage prediction. - - No hidden state, no model fit — pure joins. - - Empty ``customers``/``subscriptions`` frames are accepted (the - post-fix expected state); the corresponding paths simply return - all-False. - - Raises: - ValueError: if ``leads.lead_id`` contains duplicates. A validator - cannot operate safely on non-unique keys. - """ - if not leads["lead_id"].is_unique: - raise ValueError("leads.lead_id must be unique") - - leads_idx = leads.set_index("lead_id", drop=False) - - # Path A — the label itself, if present in public leads. - # Plain ``astype(bool)`` would map NaN to True; route through pandas' - # nullable boolean dtype so missing values fill cleanly to False without - # triggering object-downcast warnings. - if "converted_within_90_days" in leads.columns: - path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) - else: - path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") - - # Path B — any opportunity with close_outcome == "closed_won". - if "close_outcome" in opportunities.columns and len(opportunities) > 0: - won_leads = set( - opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] - ) - else: - won_leads = set() - path_b = leads_idx["lead_id"].isin(won_leads) - - # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). - if len(opportunities) > 0: - opp_to_lead = dict( - zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) - ) - else: - opp_to_lead = {} - customer_leads = { - opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead - } - path_c = leads_idx["lead_id"].isin(customer_leads) - - # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). - if len(customers) > 0: - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) - else: - cust_to_opp = {} - sub_leads: set[str] = set() - for cust_id in subscriptions["customer_id"]: - opp_id = cust_to_opp.get(cust_id) - if opp_id is None: - continue - lead_id = opp_to_lead.get(opp_id) - if lead_id is not None: - sub_leads.add(lead_id) - path_d = leads_idx["lead_id"].isin(sub_leads) - - # Path E — deterministic OR of B, C, D (the headline join-only path). - path_e = path_b | path_c | path_d - - return pd.DataFrame( - { - "path_a_direct_label": path_a.values, - "path_b_opportunity_won": path_b.values, - "path_c_customer_exists": path_c.values, - "path_d_subscription_exists": path_d.values, - "path_e_or_b_c_d": path_e.values, - }, - index=leads_idx.index, - ) - - -# --------------------------------------------------------------------------- -# Probes -# --------------------------------------------------------------------------- - - -def probe_banned_columns(tables: Mapping[str, pd.DataFrame]) -> list[LeakageFinding]: - """Public ``leads``/``opportunities`` must not carry banned columns. - - Detects Path A (label column directly readable from ``leads``) and - the ``opportunities.close_outcome`` / ``closed_at`` channels — i.e. - leakage that any caller can spot by listing column names, no joins - required. - """ - findings: list[LeakageFinding] = [] - for table_name, banned in ( - ("leads", BANNED_LEAD_COLUMNS), - ("opportunities", BANNED_OPP_COLUMNS), - ): - df = tables.get(table_name) - if df is None: - continue - for col in banned: - if col in df.columns: - findings.append( - LeakageFinding( - channel=CHANNEL_BANNED_COLUMN, - detail=f"{table_name}.{col}", - message=( - f"public {table_name}.parquet must not contain " - f"the banned column '{col}'" - ), - ) - ) - return findings - - -def probe_banned_tables(table_names: Iterable[str]) -> list[LeakageFinding]: - """Public bundles must not include conversion-conditional tables.""" - present = set(table_names) - return [ - LeakageFinding( - channel=CHANNEL_BANNED_TABLE, - detail=name, - message=( - f"public bundles must not include '{name}.parquet' " - "(it exists only for converted leads, so its presence " - "reconstructs the label)" - ), - ) - for name in BANNED_TABLES - if name in present - ] - - -def probe_deterministic_reconstruction( - tables: Mapping[str, pd.DataFrame], -) -> list[LeakageFinding]: - """Audit paths B / C / D must produce zero positive predictions. - - This probe focuses exclusively on the **join-graph** reconstruction: - - * B — at least one opportunity with ``close_outcome == "closed_won"``; - * C — a joinable customer row reachable via ``opportunity_id``; - * D — a joinable subscription row reachable via ``customer_id``. - - Path A (direct read of ``leads.converted_within_90_days``) is *not* - checked here — it is the column-presence violation already raised by - :func:`probe_banned_columns`. Re-emitting it here would double-count - one defect across two channels. Tests assert this delegation - explicitly so that future maintainers don't widen the scope by - accident. - """ - leads = tables.get("leads") - if leads is None or len(leads) == 0: - return [] - - opportunities = tables.get( - "opportunities", - _empty_frame({"opportunity_id": "string", "lead_id": "string"}), - ) - customers = tables.get( - "customers", - _empty_frame({"customer_id": "string", "opportunity_id": "string", "account_id": "string"}), - ) - subscriptions = tables.get( - "subscriptions", - _empty_frame({"subscription_id": "string", "customer_id": "string"}), - ) - - paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) - - findings: list[LeakageFinding] = [] - for path_col, label in ( - ("path_b_opportunity_won", "B (opportunity.close_outcome == 'closed_won')"), - ("path_c_customer_exists", "C (joined customer exists)"), - ("path_d_subscription_exists", "D (joined subscription exists)"), - ): - positive = int(paths[path_col].sum()) - if positive > 0: - findings.append( - LeakageFinding( - channel=CHANNEL_JOIN_RECONSTRUCTION, - detail=path_col, - message=( - f"path {label} produced {positive}/{len(paths)} " - "positive predictions; a snapshot-safe public " - "bundle must produce zero" - ), - ) - ) - return findings - - -def probe_snapshot_window( - tables: Mapping[str, pd.DataFrame], snapshot_day: int -) -> list[LeakageFinding]: - """Every event-table row must satisfy ``timestamp <= lead_created_at + snapshot_day``.""" - if snapshot_day < 0: - raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") - leads = tables.get("leads") - if leads is None or len(leads) == 0: - return [] - if "lead_id" not in leads.columns or "lead_created_at" not in leads.columns: - raise ValueError("leads must contain 'lead_id' and 'lead_created_at' columns") - if not leads["lead_id"].is_unique: - raise ValueError("leads.lead_id must be unique") - - anchor = leads[["lead_id", "lead_created_at"]].copy() - anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"], errors="coerce") - # NaT in the anchor would propagate to NaT cutoffs, then ``ts > NaT`` - # is NaN, and the violation count's ``fillna(False)`` would silently - # drop those rows — masking a data-quality bug in the bundle. Refuse - # to operate on a malformed anchor, same posture as the duplicate- - # lead_id check above. - nat_mask = anchor["lead_created_at"].isna() - if nat_mask.any(): - sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() - raise ValueError( - f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " - f"value(s); offending lead_id sample: {sample}" - ) - horizon = pd.Timedelta(days=snapshot_day) - - findings: list[LeakageFinding] = [] - for name, ts_col in SNAPSHOT_FILTERED_TABLES: - df = tables.get(name) - if df is None or len(df) == 0 or ts_col not in df.columns: - continue - merged = df[["lead_id", ts_col]].merge(anchor, on="lead_id", how="left") - # An event row whose lead_id has no match in leads gets NaT for - # ``lead_created_at`` after the left-merge; that row's cutoff is - # NaT and the violation count would silently miss it. An orphan - # event row is a structural FK violation (and a leakage attack - # surface — a tampered bundle could insert post-snapshot events - # tied to lead_ids absent from the public leads table). Refuse - # to bless it. - orphan_mask = merged["lead_created_at"].isna() - if orphan_mask.any(): - sample = merged.loc[orphan_mask, "lead_id"].head(5).tolist() - raise ValueError( - f"{name}.parquet has {int(orphan_mask.sum())} row(s) referencing " - f"lead_id(s) absent from leads; sample: {sample}" - ) - ts = pd.to_datetime(merged[ts_col]) - cutoff = merged["lead_created_at"] + horizon - violations = int((ts > cutoff).fillna(False).sum()) - if violations > 0: - findings.append( - LeakageFinding( - channel=CHANNEL_SNAPSHOT_WINDOW, - detail=f"{name}.{ts_col}", - message=( - f"{violations}/{len(df)} rows in {name}.parquet " - f"have {ts_col} > lead_created_at + {snapshot_day}d" - ), - ) - ) - return findings - - -def probe_bonus_model_auc( - tables: Mapping[str, pd.DataFrame], - *, - max_auc: float, - seed: int = 42, - label: pd.Series | None = None, -) -> list[LeakageFinding]: - """Opt-in honest-feature baseline: 5-fold CV LR + HistGBM AUC. - - Trains on per-lead aggregates (``n_opps`` / ``max_acv`` / - ``mean_acv``, plus ``n_customers`` / ``n_subscriptions`` if those - tables are present) and asserts the mean cross-validated AUC stays - below ``max_auc``. - - Caller responsibilities: - - * ``max_auc`` is required. PR 2.1 ships this probe with no - calibrated threshold; PR 3.3 will land per-tier bands. - * ``label`` must be a :class:`pandas.Series` indexed by ``lead_id`` - (``index.name == "lead_id"``) **and** cover every lead in the - bundle. Both are enforced — a misaligned or partial label would - silently neutralise the probe (via the binary-cardinality gate - or NaN folds), which defeats the validator's purpose. - - The probe skips silently (no findings, no error) when: - - * scikit-learn is not installed; - * ``leads`` is missing or empty; - * the label is unavailable (no ``label`` argument and the public - bundle has correctly redacted ``converted_within_90_days``); - * the label has fewer than two classes after alignment; - * the smaller class has fewer members than the minimum needed for - stratified CV (``n_splits >= 2``). - """ - try: - from sklearn.ensemble import HistGradientBoostingClassifier - from sklearn.linear_model import LogisticRegression - from sklearn.metrics import roc_auc_score - from sklearn.model_selection import StratifiedKFold - from sklearn.pipeline import Pipeline - from sklearn.preprocessing import StandardScaler - except ImportError: - return [] - - leads = tables.get("leads") - if leads is None or len(leads) == 0: - return [] - - y_series = _resolve_label(leads, label) - if y_series is None: - return [] - - features = _build_relational_features(leads, tables) - if features.empty or len(features.columns) == 0: - return [] - - aligned = y_series.reindex(features.index) - if aligned.isna().any(): - missing = int(aligned.isna().sum()) - raise ValueError( - f"label is missing values for {missing} lead_id(s) present in the " - "bundle; supply a complete label or omit it to read from leads" - ) - y = aligned.astype(int) - if y.nunique(dropna=True) < 2: - return [] - - # Stratified CV needs at least n_splits members in each class. If the - # smaller class is below that, the probe can't run — skip silently - # (this is a sample-size constraint, not a leakage finding). - min_class = int(y.value_counts().min()) - n_splits = min(5, min_class) - if n_splits < 2: - return [] - - models: dict[str, Pipeline] = { - "logistic_regression": Pipeline( - [("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))] - ), - "hist_gbm": Pipeline([("clf", HistGradientBoostingClassifier(random_state=seed))]), - } - skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed) - - findings: list[LeakageFinding] = [] - for name, pipe in models.items(): - aucs: list[float] = [] - for train_idx, test_idx in skf.split(features.values, y.values): - x_tr, x_te = features.values[train_idx], features.values[test_idx] - y_tr, y_te = y.values[train_idx], y.values[test_idx] - pipe.fit(x_tr, y_tr) - proba = pipe.predict_proba(x_te)[:, 1] - aucs.append(float(roc_auc_score(y_te, proba))) - auc_mean = sum(aucs) / len(aucs) - if auc_mean > max_auc: - findings.append( - LeakageFinding( - channel=CHANNEL_BONUS_MODEL, - detail=name, - message=( - f"{n_splits}-fold CV AUC {auc_mean:.3f} on join-derived " - f"features exceeds max_auc={max_auc:.3f}; honest " - "aggregates carry stronger signal than the band allows" - ), - ) - ) - return findings - - -# --------------------------------------------------------------------------- -# Orchestrators -# --------------------------------------------------------------------------- - - -def run_all_probes_on_dataframes( - tables: Mapping[str, pd.DataFrame], - *, - snapshot_day: int, - bonus_model_max_auc: float | None = None, - label: pd.Series | None = None, -) -> LeakageReport: - """Run every structural probe; run the bonus probe iff ``bonus_model_max_auc`` is set.""" - findings: list[LeakageFinding] = [] - findings += probe_banned_columns(tables) - findings += probe_banned_tables(tables.keys()) - findings += probe_deterministic_reconstruction(tables) - findings += probe_snapshot_window(tables, snapshot_day=snapshot_day) - if bonus_model_max_auc is not None: - findings += probe_bonus_model_auc(tables, max_auc=bonus_model_max_auc, label=label) - return LeakageReport(findings=tuple(findings)) - - -def run_all_probes( - bundle_dir: Path, - *, - snapshot_day: int, - bonus_model_max_auc: float | None = None, - label: pd.Series | None = None, -) -> LeakageReport: - """Run every structural probe against ``/tables/*.parquet``. - - Args: - bundle_dir: Bundle root (must contain ``tables/leads.parquet``). - snapshot_day: Snapshot window for the timestamp probe. The - caller (typically ``validate_bundle``) is expected to read - it from ``manifest.json``. - bonus_model_max_auc: Pass a numeric threshold to enable the - opt-in :func:`probe_bonus_model_auc`. ``None`` (default) - skips it — the calibrated band ships in PR 3.3. - label: Optional ground-truth labels for the bonus probe when - ``leads.converted_within_90_days`` has been redacted. Must - be indexed by ``lead_id`` (see :func:`probe_bonus_model_auc`). - Ignored when ``bonus_model_max_auc`` is ``None``. - - Raises: - FileNotFoundError: if ``/tables/`` is missing or - ``leads.parquet`` is not present. - """ - tables_dir = bundle_dir / "tables" - if not tables_dir.is_dir(): - raise FileNotFoundError(f"missing tables/ under {bundle_dir}") - if not (tables_dir / "leads.parquet").exists(): - raise FileNotFoundError(f"missing required leads.parquet under {tables_dir}") - - tables: dict[str, pd.DataFrame] = {} - for name in (*_PUBLIC_TABLES, *BANNED_TABLES): - path = tables_dir / f"{name}.parquet" - if path.exists(): - tables[name] = pd.read_parquet(path) - return run_all_probes_on_dataframes( - tables, - snapshot_day=snapshot_day, - bonus_model_max_auc=bonus_model_max_auc, - label=label, - ) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _empty_frame(dtype_map: dict[str, str]) -> pd.DataFrame: - return pd.DataFrame({c: pd.Series(dtype=d) for c, d in dtype_map.items()}) - - -def _resolve_label( - leads: pd.DataFrame, - label: pd.Series | None, -) -> pd.Series | None: - """Pick a label series to score against, or ``None`` to skip the probe. - - A caller-supplied ``label`` must be indexed by ``lead_id`` - (``index.name == "lead_id"``). Without that guarantee a misaligned - label would silently skip the probe via the binary-cardinality gate - downstream — exactly the kind of hidden no-op a leakage validator - must not have. - """ - if label is not None: - if label.index.name != "lead_id": - raise ValueError( - "label must be a pandas.Series indexed by lead_id " - f"(got index.name={label.index.name!r})" - ) - return label.astype("boolean").fillna(False).astype(int) - if "converted_within_90_days" in leads.columns: - return ( - leads.set_index("lead_id")["converted_within_90_days"] - .astype("boolean") - .fillna(False) - .astype(int) - ) - return None - - -def _build_relational_features( - leads: pd.DataFrame, - tables: Mapping[str, pd.DataFrame], -) -> pd.DataFrame: - """Per-lead aggregates from joinable public/optional relational tables. - - Honest features only — no aggregate of ``close_outcome``. Customers - and subscriptions counts are included only when the corresponding - tables exist (i.e. on a tampered bundle); on a clean public bundle - they default to 0 and the model can discard the column. - """ - opps = tables.get("opportunities") - customers = tables.get("customers") - subscriptions = tables.get("subscriptions") - - feats = leads[["lead_id"]].copy() - - if opps is not None and len(opps) > 0: - agg: dict[str, tuple[str, str]] = {"n_opps": ("opportunity_id", "count")} - if "estimated_acv" in opps.columns: - agg["max_acv"] = ("estimated_acv", "max") - agg["mean_acv"] = ("estimated_acv", "mean") - opp_agg = opps.groupby("lead_id").agg(**agg).reset_index() - feats = feats.merge(opp_agg, on="lead_id", how="left") - opp_to_lead = dict(zip(opps["opportunity_id"], opps["lead_id"], strict=False)) - else: - opp_to_lead = {} - - if customers is not None and len(customers) > 0: - cust = customers.copy() - cust["lead_id"] = cust["opportunity_id"].map(opp_to_lead) - cust_agg = cust.groupby("lead_id").size().rename("n_customers").reset_index() - feats = feats.merge(cust_agg, on="lead_id", how="left") - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) - else: - cust_to_opp = {} - - if subscriptions is not None and len(subscriptions) > 0: - subs = subscriptions.copy() - subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) - subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) - sub_agg = subs.groupby("lead_id").size().rename("n_subscriptions").reset_index() - feats = feats.merge(sub_agg, on="lead_id", how="left") - - fill_defaults: dict[str, float] = { - "n_opps": 0.0, - "max_acv": 0.0, - "mean_acv": 0.0, - "n_customers": 0.0, - "n_subscriptions": 0.0, - } - for col, default in fill_defaults.items(): - if col in feats.columns: - feats[col] = feats[col].fillna(default) - else: - feats[col] = default - - feature_cols = list(fill_defaults.keys()) - return feats.set_index("lead_id")[feature_cols].astype(float) - - -__all__ = [ - "BANNED_LEAD_COLUMNS", - "BANNED_OPP_COLUMNS", - "BANNED_TABLES", - "CHANNEL_BANNED_COLUMN", - "CHANNEL_BANNED_TABLE", - "CHANNEL_BONUS_MODEL", - "CHANNEL_JOIN_RECONSTRUCTION", - "CHANNEL_SNAPSHOT_WINDOW", - "LeakageFinding", - "LeakageReport", - "RelationalLeakageError", - "SNAPSHOT_FILTERED_TABLES", - "deterministic_relational_reconstruction", - "probe_banned_columns", - "probe_banned_tables", - "probe_bonus_model_auc", - "probe_deterministic_reconstruction", - "probe_snapshot_window", - "run_all_probes", - "run_all_probes_on_dataframes", -] diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py index 4aa08af..66bcdb7 100644 --- a/scripts/probe_relational_leakage.py +++ b/scripts/probe_relational_leakage.py @@ -38,9 +38,10 @@ with ``timestamp > lead_created_at + horizon_days``. If horizon_days isn't readable from the manifest, falls back to 90. -The deterministic reconstruction is exposed as -:func:`deterministic_relational_reconstruction` so PR 3.1 can lift it -verbatim into ``leadforge/validation/leakage_probes.py``. +The deterministic reconstruction is the same one used by +:mod:`leadforge.validation.leakage_probes` — this script re-exports it +unchanged so the alpha-bundle audit and the validator agree by +construction. Exit codes: @@ -69,10 +70,13 @@ import pandas as pd # Re-export from the canonical package location. PR 2.1 lifted this -# function into ``leadforge/validation/relational_leakage.py``; the -# script keeps it accessible at ``probe_module.deterministic_relational_reconstruction`` +# function into the leakage-probe module (then named +# ``leadforge/validation/relational_leakage.py``; PR 3.1 consolidated +# every leakage-taxonomy probe under +# ``leadforge/validation/leakage_probes.py``). The script keeps it +# accessible at ``probe_module.deterministic_relational_reconstruction`` # (and at the CLI level) so callers and existing tests remain stable. -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( deterministic_relational_reconstruction, ) diff --git a/tests/integration/test_snapshot_safe_bundle.py b/tests/integration/test_snapshot_safe_bundle.py index 11ad772..926572c 100644 --- a/tests/integration/test_snapshot_safe_bundle.py +++ b/tests/integration/test_snapshot_safe_bundle.py @@ -36,7 +36,7 @@ from leadforge.api.generator import Generator from leadforge.validation.bundle_checks import validate_bundle -from leadforge.validation.relational_leakage import ( +from leadforge.validation.leakage_probes import ( BANNED_LEAD_COLUMNS, BANNED_OPP_COLUMNS, BANNED_TABLES, diff --git a/tests/render/test_bundle_schema_v5_contract.py b/tests/render/test_bundle_schema_v5_contract.py index e199d4e..f2cef38 100644 --- a/tests/render/test_bundle_schema_v5_contract.py +++ b/tests/render/test_bundle_schema_v5_contract.py @@ -40,7 +40,7 @@ # Pinned column / table sets for bundle schema v5. Update *together* # with ``BUNDLE_SCHEMA_VERSION``, ``LEAD_SNAPSHOT_FEATURES``, and the # snapshot-safe contract constants in -# ``leadforge.validation.relational_leakage``. +# ``leadforge.validation.leakage_probes``. V5_TASK_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( { diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_leakage_probes.py similarity index 61% rename from tests/validation/test_relational_leakage.py rename to tests/validation/test_leakage_probes.py index 989fb78..ed0a3fe 100644 --- a/tests/validation/test_relational_leakage.py +++ b/tests/validation/test_leakage_probes.py @@ -1,6 +1,18 @@ -"""Tests for ``leadforge/validation/relational_leakage.py``. +"""Tests for ``leadforge/validation/leakage_probes.py``. -Each probe is exercised against two synthetic minimal bundles: +Covers every probe family in the unified leakage taxonomy: + +* relational / time-window / direct (lifted from PR 2.1's + ``test_relational_leakage.py``); +* split — ID-overlap, near-duplicate row collisions, label drift; +* model-realism — opt-in calibrated baselines (``probe_id_only_baseline``, + ``probe_feature_subset_baseline``, ``probe_bonus_model_auc``); +* meta — every ``probe_*`` function in the module is registered in + :data:`leadforge.validation.leakage_probes.PROBE_REGISTRY` so a + future "I added a probe but forgot to wire it" regression fails + loudly here. + +For the structural probes each is exercised against two configurations: * a *clean* bundle, produced by running the same source frames through :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, @@ -12,17 +24,25 @@ from __future__ import annotations +import inspect from pathlib import Path import pandas as pd import pytest from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe -from leadforge.validation.relational_leakage import ( +from leadforge.validation import leakage_probes +from leadforge.validation.leakage_probes import ( CHANNEL_BANNED_COLUMN, CHANNEL_BANNED_TABLE, CHANNEL_BONUS_MODEL, + CHANNEL_FEATURE_SUBSET_BASELINE, + CHANNEL_ID_ONLY_BASELINE, CHANNEL_JOIN_RECONSTRUCTION, + CHANNEL_SPLIT_ID_OVERLAP, + CHANNEL_SPLIT_LABEL_DRIFT, + CHANNEL_SPLIT_NEAR_DUPLICATE, + PROBE_REGISTRY, LeakageFinding, LeakageReport, RelationalLeakageError, @@ -31,9 +51,15 @@ probe_banned_tables, probe_bonus_model_auc, probe_deterministic_reconstruction, + probe_feature_subset_baseline, + probe_id_only_baseline, probe_snapshot_window, + probe_split_id_overlap, + probe_split_label_drift, + probe_split_near_duplicates, run_all_probes, run_all_probes_on_dataframes, + run_split_probes, ) ANCHOR = pd.Timestamp("2026-01-01") @@ -592,3 +618,340 @@ def test_deterministic_function_matches_script_export() -> None: src["leads"], src["opportunities"], src["customers"], src["subscriptions"] ) pd.testing.assert_frame_equal(a, b) + + +# --------------------------------------------------------------------------- +# §8.1 Direct — caller-supplied banned sets. +# --------------------------------------------------------------------------- + + +def test_probe_banned_columns_accepts_custom_banned_set() -> None: + """A non-relational caller (e.g. flat-CSV exporter) can ban its own + column list without depending on the snapshot-safe defaults.""" + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(secret_score=0.5) + custom = {"leads": ("secret_score",)} + findings = probe_banned_columns(tables, banned=custom) + assert any(f.detail == "leads.secret_score" for f in findings) + # Default ban list is unaffected. + assert probe_banned_columns(tables) == [] + + +def test_probe_banned_tables_accepts_custom_banned_set() -> None: + """Same generalisation for banned tables — caller can declare an + arbitrary blocklist (e.g. ``raw_telemetry``).""" + tables = _clean_bundle() + tables["raw_telemetry"] = pd.DataFrame({"x": [1]}) + findings = probe_banned_tables(tables.keys(), banned=("raw_telemetry",)) + assert any(f.detail == "raw_telemetry" for f in findings) + # Default ban list is unaffected. + assert probe_banned_tables(tables.keys()) == [] + + +# --------------------------------------------------------------------------- +# §8.4 Split — ID overlap / near-duplicates / label drift. +# --------------------------------------------------------------------------- + + +def _split_frames() -> dict[str, pd.DataFrame]: + """Three disjoint splits with shared schema, no contamination.""" + train = pd.DataFrame( + { + "lead_id": [f"l_{i:03d}" for i in range(20)], + "account_id": [f"a_{i:03d}" for i in range(20)], + "f1": [float(i) for i in range(20)], + "f2": [i * 0.5 for i in range(20)], + "label": [True] * 5 + [False] * 15, + } + ) + valid = pd.DataFrame( + { + "lead_id": [f"l_{i:03d}" for i in range(20, 25)], + "account_id": [f"a_{i:03d}" for i in range(20, 25)], + "f1": [float(i) for i in range(20, 25)], + "f2": [i * 0.5 for i in range(20, 25)], + "label": [True, False, True, False, False], + } + ) + test = pd.DataFrame( + { + "lead_id": [f"l_{i:03d}" for i in range(25, 30)], + "account_id": [f"a_{i:03d}" for i in range(25, 30)], + "f1": [float(i) for i in range(25, 30)], + "f2": [i * 0.5 for i in range(25, 30)], + "label": [True, False, False, False, True], + } + ) + return {"train": train, "valid": valid, "test": test} + + +def test_split_id_overlap_silent_on_clean_splits() -> None: + assert probe_split_id_overlap(_split_frames()) == [] + + +def test_split_id_overlap_fires_when_lead_id_shared() -> None: + splits = _split_frames() + # Force a leak: copy first train row into test. + splits["test"] = pd.concat([splits["test"], splits["train"].head(1)], ignore_index=True) + findings = probe_split_id_overlap(splits) + assert any(f.channel == CHANNEL_SPLIT_ID_OVERLAP for f in findings) + assert any("lead_id" in f.detail for f in findings) + + +def test_split_id_overlap_audits_account_id_when_requested() -> None: + splits = _split_frames() + # Same account_id appears in train and valid by design. + shared = splits["train"]["account_id"].head(5).values + splits["valid"] = splits["valid"].assign(account_id=shared) + findings = probe_split_id_overlap(splits, id_columns=("account_id",)) + assert any(f.detail.startswith("account_id:") for f in findings) + + +def test_split_id_overlap_skips_missing_columns() -> None: + """Splits without an audited id column are silently skipped per-split, + not a probe-level error. Allows the probe to be wired generically + over heterogeneous task schemas.""" + splits = _split_frames() + splits["valid"] = splits["valid"].drop(columns=["account_id"]) + # account_id audit: train+test still overlap-free, valid skipped — no findings. + assert probe_split_id_overlap(splits, id_columns=("account_id",)) == [] + + +def test_split_near_duplicates_silent_on_clean_splits() -> None: + splits = _split_frames() + findings = probe_split_near_duplicates(splits, feature_columns=("f1", "f2"), decimals=4) + assert findings == [] + + +def test_split_near_duplicates_fires_on_rounded_match() -> None: + splits = _split_frames() + # Inject a row in test whose features round to a train row's features. + leak_row = splits["train"].iloc[[0]].copy() + leak_row["lead_id"] = "leak_001" # different ID, near-duplicate features + splits["test"] = pd.concat([splits["test"], leak_row], ignore_index=True) + findings = probe_split_near_duplicates(splits, feature_columns=("f1", "f2")) + assert any(f.channel == CHANNEL_SPLIT_NEAR_DUPLICATE for f in findings) + + +def test_split_near_duplicates_skipped_when_no_columns() -> None: + assert probe_split_near_duplicates(_split_frames(), feature_columns=()) == [] + + +def test_split_label_drift_skipped_below_threshold() -> None: + splits = _split_frames() + # Rates are 0.25, 0.4, 0.4 — drift = 0.15. Pass a threshold above that. + assert probe_split_label_drift(splits, label_col="label", max_drift=0.5) == [] + + +def test_split_label_drift_fires_when_drift_exceeds_threshold() -> None: + splits = _split_frames() + # Force a drift: relabel everything in test to True. + splits["test"] = splits["test"].assign(label=[True] * len(splits["test"])) + findings = probe_split_label_drift(splits, label_col="label", max_drift=0.1) + assert any(f.channel == CHANNEL_SPLIT_LABEL_DRIFT for f in findings) + + +def test_split_label_drift_negative_threshold_raises() -> None: + with pytest.raises(ValueError, match="non-negative"): + probe_split_label_drift(_split_frames(), label_col="label", max_drift=-0.1) + + +# --------------------------------------------------------------------------- +# §8.5 Model realism — opt-in calibrated baselines. +# --------------------------------------------------------------------------- + + +def test_probe_id_only_baseline_silent_on_random_splits() -> None: + """Hashed IDs alone must not predict the label on a clean random split.""" + pytest.importorskip("sklearn") + splits = _split_frames() + # max_auc=0.95 is generous: on this 20+5+5 random split a HistGBM + # trained on hashed IDs should hover near 0.5. We're not testing + # the literal AUC, only that the probe doesn't fire spuriously. + assert probe_id_only_baseline(splits, label_col="label", max_auc=0.95) == [] + + +def test_probe_id_only_baseline_runs_cleanly_on_partitioned_ids() -> None: + """Wiring smoke test: even when train lead_ids cleanly partition by + label, the probe completes without error. + + A "fires" demonstration is structurally impossible from hashed + lead_ids alone: each hash is independent, so HistGBM cannot + generalise from train hashes to disjoint test hashes — AUC stays + near 0.5 by construction. The positive-fire path for the + model-realism family is covered by + :func:`test_probe_feature_subset_baseline_fires_when_subset_predicts_label`; + here we just assert the probe runs end-to-end with a non-trivial + label. + """ + pytest.importorskip("sklearn") + train = pd.DataFrame( + { + "lead_id": [f"POS_{i:03d}" for i in range(40)] + [f"NEG_{i:03d}" for i in range(40)], + "label": [True] * 40 + [False] * 40, + } + ) + test = pd.DataFrame( + { + "lead_id": [f"POS_{i:03d}" for i in range(40, 50)] + + [f"NEG_{i:03d}" for i in range(40, 50)], + "label": [True] * 10 + [False] * 10, + } + ) + findings = probe_id_only_baseline( + {"train": train, "test": test}, + label_col="label", + max_auc=0.6, + id_columns=("lead_id",), + ) + assert isinstance(findings, list) + + +def test_probe_id_only_baseline_skips_without_train() -> None: + pytest.importorskip("sklearn") + splits = _split_frames() + del splits["train"] + assert probe_id_only_baseline(splits, label_col="label", max_auc=0.6) == [] + + +def test_probe_feature_subset_baseline_silent_when_features_uninformative() -> None: + """Numeric features that carry no signal must not exceed even a + tight band — confirms the wiring runs without spuriously firing.""" + pytest.importorskip("sklearn") + splits = _split_frames() + findings = probe_feature_subset_baseline( + splits, + feature_columns=("f1", "f2"), + label_col="label", + max_auc=0.99, + name="numeric_only", + ) + assert findings == [] + + +def test_probe_feature_subset_baseline_fires_when_subset_predicts_label() -> None: + """A leakage-equivalent feature in the subset must trip the baseline. + + Build a train+test where ``leak`` is monotonic in the label; + HistGBM should saturate AUC and exceed any sane band. + """ + pytest.importorskip("sklearn") + rng_pos = pd.Series(range(40), name="leak").astype(float) + 100.0 + rng_neg = pd.Series(range(40), name="leak").astype(float) + train = pd.DataFrame( + { + "lead_id": [f"l_{i}" for i in range(80)], + "leak": pd.concat([rng_pos, rng_neg], ignore_index=True), + "label": [True] * 40 + [False] * 40, + } + ) + test = pd.DataFrame( + { + "lead_id": [f"t_{i}" for i in range(20)], + "leak": [200.0] * 10 + [0.0] * 10, + "label": [True] * 10 + [False] * 10, + } + ) + findings = probe_feature_subset_baseline( + {"train": train, "test": test}, + feature_columns=("leak",), + label_col="label", + max_auc=0.6, + name="leak_subset", + ) + assert findings, "expected the leakage-equivalent feature subset to exceed max_auc" + assert all(f.channel == CHANNEL_FEATURE_SUBSET_BASELINE for f in findings) + + +def test_probe_feature_subset_baseline_skips_when_no_columns_present() -> None: + pytest.importorskip("sklearn") + splits = _split_frames() + assert ( + probe_feature_subset_baseline( + splits, + feature_columns=("nonexistent",), + label_col="label", + max_auc=0.6, + name="ghost", + ) + == [] + ) + + +# --------------------------------------------------------------------------- +# Split-orchestrator wiring. +# --------------------------------------------------------------------------- + + +def test_run_split_probes_runs_id_overlap_by_default() -> None: + splits = _split_frames() + splits["test"] = pd.concat([splits["test"], splits["train"].head(1)], ignore_index=True) + report = run_split_probes(splits) + assert any(f.channel == CHANNEL_SPLIT_ID_OVERLAP for f in report.findings) + + +def test_run_split_probes_skips_opt_in_probes_by_default() -> None: + """Without explicit thresholds, the orchestrator must not run the + label-drift, ID-only, or feature-subset probes — calibrated bands + are PR 3.3's job.""" + pytest.importorskip("sklearn") + splits = _split_frames() + report = run_split_probes(splits) + channels = {f.channel for f in report.findings} + assert CHANNEL_SPLIT_LABEL_DRIFT not in channels + assert CHANNEL_ID_ONLY_BASELINE not in channels + assert CHANNEL_FEATURE_SUBSET_BASELINE not in channels + + +def test_run_split_probes_runs_all_when_enabled() -> None: + pytest.importorskip("sklearn") + splits = _split_frames() + # Force drift so the label-drift probe has something to surface. + splits["test"] = splits["test"].assign(label=[True] * len(splits["test"])) + report = run_split_probes( + splits, + label_col="label", + near_duplicate_columns=("f1", "f2"), + label_drift_max=0.05, + id_only_max_auc=0.95, + feature_subsets={"numeric_only": (0.95, ("f1", "f2"))}, + ) + # At minimum, label-drift must fire. ID-only / feature-subset may + # or may not fire on this synthetic data; assertion focuses on the + # opt-in probes' wiring, not their statistical behaviour. + channels = {f.channel for f in report.findings} + assert CHANNEL_SPLIT_LABEL_DRIFT in channels + + +# --------------------------------------------------------------------------- +# Meta — every probe is registered. +# --------------------------------------------------------------------------- + + +def test_probe_registry_covers_every_module_level_probe() -> None: + """Any function named ``probe_*`` in :mod:`leakage_probes` must be + listed in :data:`PROBE_REGISTRY`. Guards against the + "I-added-a-probe-but-forgot-to-register-it" regression so the + orchestrators stay authoritative.""" + module_probes = { + name + for name, obj in inspect.getmembers(leakage_probes, inspect.isfunction) + if name.startswith("probe_") and obj.__module__ == leakage_probes.__name__ + } + registered = {f"probe_{spec.name}" for spec in PROBE_REGISTRY.values()} + missing = module_probes - registered + extra = registered - module_probes + assert not missing, f"probes defined but not registered: {sorted(missing)}" + assert not extra, f"registered probes that don't exist: {sorted(extra)}" + + +def test_probe_registry_taxonomies_are_known() -> None: + """Every spec carries one of the five documented taxonomies.""" + valid = {"direct", "time_window", "relational", "split", "model_realism"} + for spec in PROBE_REGISTRY.values(): + assert spec.taxonomy in valid, f"{spec.name} has unknown taxonomy {spec.taxonomy!r}" + + +def test_probe_registry_callables_are_callable() -> None: + for spec in PROBE_REGISTRY.values(): + assert callable(spec.callable), f"{spec.name}.callable is not callable"