diff --git a/.agent-plan.md b/.agent-plan.md index c568ed1..46a1afc 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -24,13 +24,15 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family - [x] Reproduce relational-leakage finding on alpha bundles → `docs/release/v1_current_state_audit.md` — all three tiers reconstruct `converted_within_90_days` at 100% via paths A–E; LR/HistGBM AUC = 1.000 on join-derived features. Probe script: `scripts/probe_relational_leakage.py` (function `deterministic_relational_reconstruction` designed to lift into PR 3.1's `leadforge/validation/leakage_probes.py`). - [x] Lock dataset release name `leadforge-lead-scoring-v1` (already locked via PR #61's milestone rename + roadmap edits; G1.1 reaffirmed) -### Phase 2 — Snapshot-safe relational export +### Phase 2 — Snapshot-safe relational export ✓ - [x] `leadforge/render/relational_snapshot_safe.py` (new) — PR 2.1: `to_dataframes_snapshot_safe(dfs, *, snapshot_day)` projects the full-horizon dict from `to_dataframes` onto the public-bundle shape (drops `BANNED_LEAD_COLUMNS` from leads, `BANNED_OPP_COLUMNS` from opportunities, filters event tables per-lead by `lead_created_at + snapshot_day`, omits `customers`/`subscriptions`, passes accounts/contacts unchanged). - [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: owns the snapshot-safe contract constants (`BANNED_LEAD_COLUMNS`, `BANNED_OPP_COLUMNS`, `BANNED_TABLES`, `SNAPSHOT_FILTERED_TABLES`); ships `LeakageFinding`/`LeakageReport`/`RelationalLeakageError(LeadforgeError)` plus five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, opt-in `probe_bonus_model_auc`) and two orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). The bonus-model probe is opt-in: orchestrators skip it unless the caller passes `bonus_model_max_auc=...` (PR 3.3 will calibrate per-tier bands). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. -- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` — PR 2.2. -- [ ] Wire `relational_snapshot_safe` through `leadforge/exposure/filters.py` and `leadforge/api/bundle.py`; plumb the leakage validator into `leadforge/validation/bundle_checks.py` — PR 2.2. -- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles — PR 2.2 (the structural rules are already enforced module-side; PR 2.2 turns them on for actual writes). -- [ ] Hash-determinism preserved on regenerated bundles — PR 2.2. +- [x] PR 2.2: `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe: bool` (self-describes whether `tables/` is the snapshot-safe public shape or full-horizon instructor shape). +- [x] PR 2.2: `BundleFilter.relational_snapshot_safe` flag (student_public True; research_instructor False); `leadforge/api/bundle.py` calls `to_dataframes_snapshot_safe(dfs, snapshot_day=...)` for student_public after `to_dataframes(...)`. `leadforge/validation/bundle_checks.py` runs `run_all_probes` on public bundles (bonus probe stays off — PR 3.3 calibrates); `_check_fk_integrity` silently skips `BANNED_TABLES` for snapshot-safe bundles. +- [x] PR 2.2: public `leads.parquet` drops `converted_within_90_days` / `conversion_timestamp`; public `opportunities.parquet` drops `close_outcome` / `closed_at`; public bundles omit `customers` / `subscriptions`. Confirmed by regenerated `release/{intro,intermediate,advanced}/manifest.json` (`tables` key: 7 entries, no `customers` / `subscriptions`); `release/intermediate_instructor/` retains full-horizon (9 tables). +- [x] PR 2.2: `scripts/probe_relational_leakage.py release/{intro,intermediate,advanced} --max-accuracy 0.65` exits 0 on all three public tiers (was exiting 2 on alpha bundles); all path prediction rates A-E = 0.000. +- [x] PR 2.2: hash-determinism preserved on regenerated bundles (`scripts/verify_hash_determinism.py`: 67/67 files identical across pinned-timestamp runs; was 73/73 before — drop reflects the 2 omitted public tables × 3 public tiers). +- [x] PR 2.2: `check_exposure_monotonicity` updated for v5 — student is allowed to omit `BANNED_TABLES`, drop `BANNED_LEAD_COLUMNS` / `BANNED_OPP_COLUMNS`, and be a row-subset of instructor on snapshot-filtered event tables. ### Phase 3 — Release validation hardening - [ ] `leadforge/validation/{release_quality,leakage_probes,reporting}.py` (new) diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index 4065bc9..ec1fb36 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -3,7 +3,8 @@ :func:`write_bundle` is called by :meth:`WorldBundle.save` and orchestrates all rendering steps: -1. Write relational Parquet tables (``tables/``). +1. Project the relational dict (snapshot-safe for ``student_public``, + full-horizon for ``research_instructor``) and write ``tables/``. 2. Build the lead snapshot and write task splits (``tasks/``). 3. Write ``dataset_card.md`` and ``feature_dictionary.csv``. 4. Apply exposure filtering — write ``metadata/`` for ``research_instructor`` @@ -16,10 +17,12 @@ from pathlib import Path from typing import TYPE_CHECKING +from leadforge.exposure.filters import get_filter from leadforge.exposure.modes import apply_exposure from leadforge.narrative.dataset_card import render_dataset_card from leadforge.render.manifests import build_manifest, write_manifest from leadforge.render.relational import to_dataframes +from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe from leadforge.render.snapshots import build_snapshot from leadforge.render.tasks import write_task_splits from leadforge.schema.dictionaries import write_feature_dictionary @@ -66,14 +69,36 @@ def write_bundle( # README's "Option 3") cannot trivially reintroduce a redacted # column by joining ``tables/leads.parquet`` to their feature set. redacted = redacted_columns_for(config.exposure_mode) + bundle_filter = get_filter(config.exposure_mode) # ------------------------------------------------------------------ # 1. Relational tables → tables/ + # + # For ``student_public`` (``relational_snapshot_safe = True``) we + # project the full-horizon dict onto the snapshot-safe shape: + # ``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` are dropped, event + # tables are filtered per-lead to ``lead_created_at + snapshot_day``, + # and ``BANNED_TABLES`` (``customers`` / ``subscriptions``) are + # omitted entirely. The feature-level redaction below still applies + # on top — the two policies operate on disjoint columns + # (snapshot-safe owns the structural reconstruction surface; + # ``redacted_columns_for`` owns near-deterministic snapshot + # features), so they neither double-emit nor overlap. # ------------------------------------------------------------------ tables_dir = root / "tables" tables_dir.mkdir(exist_ok=True) dfs = to_dataframes(result, population) + if bundle_filter.relational_snapshot_safe: + if config.snapshot_day is None: + raise ValueError( + f"exposure_mode={config.exposure_mode.value!r} requires " + "config.snapshot_day to be set (the snapshot-safe relational " + "export filters event tables to lead_created_at + snapshot_day); " + "got snapshot_day=None. Pin a snapshot_day on the recipe or " + "pass it explicitly." + ) + dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day) table_row_counts: dict[str, int] = {} for table_name, df in dfs.items(): if redacted: @@ -136,5 +161,6 @@ def write_bundle( bundle_root=root, generation_timestamp=generation_timestamp, redacted_columns=sorted(redacted), + relational_snapshot_safe=bundle_filter.relational_snapshot_safe, ) write_manifest(manifest, root) diff --git a/leadforge/exposure/filters.py b/leadforge/exposure/filters.py index af26089..61bae36 100644 --- a/leadforge/exposure/filters.py +++ b/leadforge/exposure/filters.py @@ -28,15 +28,33 @@ class BundleFilter: write_metadata: Whether to create ``metadata/`` with hidden-truth files (``graph.json``, ``graph.graphml``, ``world_spec.json``, ``latent_registry.json``, ``mechanism_summary.json``). + relational_snapshot_safe: Whether the relational ``tables/`` dict + must be projected onto the snapshot-safe shape before being + written. When ``True``, the bundle writer routes through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, + which strips :data:`leadforge.validation.relational_leakage.BANNED_LEAD_COLUMNS` + from ``leads``, :data:`~leadforge.validation.relational_leakage.BANNED_OPP_COLUMNS` + from ``opportunities``, filters event tables per-lead by + ``lead_created_at + snapshot_day``, and omits + :data:`~leadforge.validation.relational_leakage.BANNED_TABLES` + (``customers`` / ``subscriptions``) entirely. When ``False``, + the writer emits the full-horizon export. """ write_metadata: bool + relational_snapshot_safe: bool #: Canonical filter rules for every supported exposure mode. FILTERS: dict[ExposureMode, BundleFilter] = { - ExposureMode.student_public: BundleFilter(write_metadata=False), - ExposureMode.research_instructor: BundleFilter(write_metadata=True), + ExposureMode.student_public: BundleFilter( + write_metadata=False, + relational_snapshot_safe=True, + ), + ExposureMode.research_instructor: BundleFilter( + write_metadata=True, + relational_snapshot_safe=False, + ), } diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 27023c5..79bd73c 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -14,6 +14,11 @@ from typing import TYPE_CHECKING, Any from leadforge.core.hashing import file_sha256 +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, +) if TYPE_CHECKING: from leadforge.core.models import GenerationConfig @@ -36,7 +41,19 @@ # assuming "features computed over full horizon" must update. # ``manifest.snapshot_day`` recorded so the contract is # self-describing (``null`` means full-horizon, legacy behaviour). -BUNDLE_SCHEMA_VERSION = "4" +# "5" — PR 2.2: ``student_public`` bundles route through the +# snapshot-safe relational export ( +# :mod:`leadforge.render.relational_snapshot_safe`). Public +# ``leads`` drops ``converted_within_90_days`` / +# ``conversion_timestamp``; public ``opportunities`` drops +# ``close_outcome`` / ``closed_at``; public bundles omit +# ``customers`` / ``subscriptions``; event tables filtered +# per-lead to ``lead_created_at + snapshot_day``. +# ``manifest.relational_snapshot_safe`` records the contract so +# consumers / validators can tell from the bundle alone whether +# the tables are snapshot-safe. ``research_instructor`` bundles +# keep the full-horizon export (``relational_snapshot_safe = false``). +BUNDLE_SCHEMA_VERSION = "5" # Manifest fields whose value is non-deterministic by design (wall-clock, # host metadata, etc.). Determinism checks must ignore these fields when @@ -52,6 +69,7 @@ def build_manifest( bundle_root: Path, generation_timestamp: str | None = None, redacted_columns: list[str] | None = None, + relational_snapshot_safe: bool = False, ) -> dict[str, Any]: """Build the bundle manifest dict. @@ -71,6 +89,13 @@ def build_manifest( this exposure mode. Recorded in the manifest so consumers (and the validator) can audit redaction without inspecting package internals. Defaults to ``[]`` (nothing redacted). + relational_snapshot_safe: ``True`` if the relational ``tables/`` + were projected through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe` + before being written. Recorded in the manifest so a tool + reading a v5+ bundle can tell from the manifest alone whether + ``tables/`` is the snapshot-safe (public) shape or the + full-horizon (instructor) shape. Defaults to ``False``. Returns: A JSON-serialisable dict ready to be written as ``manifest.json``. @@ -117,11 +142,35 @@ def build_manifest( "snapshot_day": config.snapshot_day, "motif_family": world_graph.motif_family, "redacted_columns": redacted_columns_list, + "relational_snapshot_safe": bool(relational_snapshot_safe), + "structural_redactions": _build_structural_redactions(bool(relational_snapshot_safe)), "tables": tables, "tasks": tasks, } +def _build_structural_redactions(relational_snapshot_safe: bool) -> dict[str, Any]: + """Self-describing record of the table-level redactions applied at write. + + For snapshot-safe (public) bundles this enumerates every column the + snapshot-safe export drops from ``leads`` / ``opportunities`` and the + tables it omits entirely. For full-horizon (instructor) bundles + every list is empty. Together with ``manifest.redacted_columns`` + (the snapshot-feature redactions) and ``manifest.relational_snapshot_safe`` + (the contract flag) the manifest fully describes what the writer + dropped without the consumer needing to consult package internals. + """ + if not relational_snapshot_safe: + return {"columns": {}, "omitted_tables": []} + return { + "columns": { + "leads": sorted(BANNED_LEAD_COLUMNS), + "opportunities": sorted(BANNED_OPP_COLUMNS), + }, + "omitted_tables": sorted(BANNED_TABLES), + } + + def write_manifest(manifest: dict[str, Any], bundle_root: Path) -> Path: """Serialise *manifest* to ``bundle_root/manifest.json`` and return the path.""" path = bundle_root / "manifest.json" diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index 715336e..c92e27e 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -15,12 +15,21 @@ import pyarrow.parquet as pq from leadforge.core.enums import ExposureMode +from leadforge.core.exceptions import LeadforgeError from leadforge.core.hashing import file_sha256 from leadforge.core.serialization import load_json from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for from leadforge.schema.relationships import ALL_CONSTRAINTS from leadforge.validation.difficulty import check_difficulty from leadforge.validation.realism import check_realism +from leadforge.validation.relational_leakage import ( + BANNED_TABLES, + LeakageReport, + probe_banned_columns, + probe_banned_tables, + probe_deterministic_reconstruction, + run_all_probes, +) def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[str]: @@ -44,9 +53,10 @@ def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[ tables, table_errors = _check_tables(bundle_root, manifest) errors.extend(table_errors) errors.extend(_check_task_splits(bundle_root, manifest)) - errors.extend(_check_fk_integrity(tables)) + errors.extend(_check_fk_integrity(tables, manifest)) errors.extend(_check_leakage(bundle_root, manifest)) errors.extend(_check_exposure_redaction(bundle_root, manifest)) + errors.extend(_check_relational_leakage(bundle_root, manifest)) if include_realism: errors.extend(check_realism(bundle_root, manifest)) @@ -133,13 +143,35 @@ def _check_task_splits(root: Path, manifest: dict[str, Any]) -> list[str]: return errors -def _check_fk_integrity(tables: dict[str, pd.DataFrame]) -> list[str]: +def _check_fk_integrity(tables: dict[str, pd.DataFrame], manifest: dict[str, Any]) -> list[str]: + # In snapshot-safe (public) bundles ``customers`` / ``subscriptions`` + # are intentionally absent — emitting "FK check skipped" for them + # would be a false positive. The expected-absent set is the same + # ``BANNED_TABLES`` constant the writer omits. + # + # Strict bool check: ``bool(...)`` would coerce malformed manifest + # values (the JSON string ``"false"`` is truthy; the int ``1`` would + # masquerade as snapshot-safe) and silently suppress real FK errors. + # Surface the bad value instead. errors: list[str] = [] + raw_flag = manifest.get("relational_snapshot_safe", False) + if not isinstance(raw_flag, bool): + errors.append( + f"manifest.relational_snapshot_safe must be a JSON boolean, got " + f"{type(raw_flag).__name__}={raw_flag!r}" + ) + snapshot_safe = False + else: + snapshot_safe = raw_flag + expected_absent = set(BANNED_TABLES) if snapshot_safe else set() + for fk in ALL_CONSTRAINTS: child_df = tables.get(fk.child_table) parent_df = tables.get(fk.parent_table) if child_df is None or parent_df is None: missing = fk.child_table if child_df is None else fk.parent_table + if missing in expected_absent: + continue errors.append( f"FK check skipped: {fk.child_table}.{fk.child_column} → " f"{fk.parent_table}.{fk.parent_column} " @@ -260,3 +292,91 @@ def _check_exposure_redaction(root: Path, manifest: dict[str, Any]) -> list[str] ) return errors + + +def _check_relational_leakage(root: Path, manifest: dict[str, Any]) -> list[str]: + """Run the relational-leakage probes on snapshot-safe (public) bundles. + + Skips ``research_instructor`` bundles entirely — they retain the + full hidden truth (label column, customers, subscriptions, + ``close_outcome``) by design, so the probes would be a false + positive there. The bonus-model probe stays off (PR 3.3 will + calibrate per-tier bands). + + The structural probes (banned columns, banned tables, deterministic + join reconstruction) do not depend on ``snapshot_day``. They run + even if the manifest is missing or has a malformed ``snapshot_day`` + — those are exactly the cases where users *most* need leakage + detection, not least. Only the snapshot-window probe is skipped + when ``snapshot_day`` is unavailable, with an explicit error + surfaced so the gap is visible. + + Each :class:`~leadforge.validation.relational_leakage.LeakageFinding` + is rendered as one error string, keeping the existing + ``validate_bundle`` contract (return list of strings, empty = pass). + """ + mode_str = manifest.get("exposure_mode") + if mode_str != ExposureMode.student_public.value: + return [] + + snapshot_day = manifest.get("snapshot_day") + snapshot_day_usable = ( + isinstance(snapshot_day, int) and not isinstance(snapshot_day, bool) and snapshot_day >= 0 + ) + + errors: list[str] = [] + try: + if snapshot_day_usable and isinstance(snapshot_day, int): + report: LeakageReport = run_all_probes(root, snapshot_day=snapshot_day) + else: + # Run the structural subset that does not need ``snapshot_day``. + tables = _read_relational_tables(root) + findings = list(probe_banned_columns(tables)) + findings += list(probe_banned_tables(tables.keys())) + findings += list(probe_deterministic_reconstruction(tables)) + report = LeakageReport(findings=tuple(findings)) + errors.append( + "Relational leakage [snapshot_window] manifest.snapshot_day is " + f"missing or malformed ({snapshot_day!r}); skipping the snapshot-" + "window probe. Structural probes (banned columns / tables / " + "join reconstruction) ran normally." + ) + except (FileNotFoundError, ValueError, LeadforgeError) as exc: + return errors + [f"Relational-leakage probe failed: {type(exc).__name__}: {exc}"] + + errors.extend( + f"Relational leakage [{f.channel}] {f.detail}: {f.message}" for f in report.findings + ) + return errors + + +def _read_relational_tables(root: Path) -> dict[str, pd.DataFrame]: + """Read every public + banned-table parquet under ``/tables/``. + + Mirrors the read logic in + :func:`leadforge.validation.relational_leakage.run_all_probes` but + is reusable for the snapshot_day-missing path above. + """ + from leadforge.validation.relational_leakage import ( + BANNED_TABLES as _BANNED, + ) + + tables_dir = root / "tables" + if not tables_dir.is_dir() or not (tables_dir / "leads.parquet").exists(): + raise FileNotFoundError(f"missing tables/leads.parquet under {root}") + + public_tables = ( + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + ) + out: dict[str, pd.DataFrame] = {} + for name in (*public_tables, *_BANNED): + path = tables_dir / f"{name}.parquet" + if path.exists(): + out[name] = pd.read_parquet(path) + return out diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py index 58799e7..1278045 100644 --- a/leadforge/validation/invariants.py +++ b/leadforge/validation/invariants.py @@ -18,6 +18,12 @@ from leadforge.core.hashing import file_sha256 from leadforge.render.manifests import NON_DETERMINISTIC_MANIFEST_FIELDS from leadforge.schema.features import redacted_columns_for +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + SNAPSHOT_FILTERED_TABLES, +) def check_determinism(bundle_a: Path, bundle_b: Path) -> list[str]: @@ -185,7 +191,39 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - "for at least one shared feature" ) - # Both must have the same tables with identical content + # Both must have the same tables with identical content, modulo the + # snapshot-safe export contract (PR 2.2 / bundle schema v5): + # + # * Student is allowed to omit ``BANNED_TABLES`` (``customers`` / + # ``subscriptions``) — these are conversion-conditional and thus + # leak the label. + # * Student's ``leads`` is allowed to drop ``BANNED_LEAD_COLUMNS``; + # ``opportunities`` is allowed to drop ``BANNED_OPP_COLUMNS``. + # * Student's event tables in ``SNAPSHOT_FILTERED_TABLES`` are + # allowed to be a *row-subset* of instructor (filtered to + # ``lead_created_at + snapshot_day``). + # + # In all cases, student is still a subset of instructor on every + # column / row that survives the contract. + expected_redacted = redacted_columns_for(ExposureMode.student_public) + snapshot_filtered_table_names = {name for name, _ in SNAPSHOT_FILTERED_TABLES} + extra_columns_allowed_per_table: dict[str, set[str]] = { + "leads": set(BANNED_LEAD_COLUMNS), + "opportunities": set(BANNED_OPP_COLUMNS), + } + # Per-snapshot-filtered-table primary key. Used to verify the + # row-subset relationship cheaply and correctly under NaN: a left + # merge over all shared columns is fragile (NaN doesn't equal NaN + # in pandas, even on the same float), and the natural PK is enough + # to assert ``student.rows ⊆ instructor.rows`` since student rows + # were derived directly from instructor by row-filtering. + snapshot_filtered_pks: dict[str, str] = { + "touches": "touch_id", + "sessions": "session_id", + "sales_activities": "activity_id", + "opportunities": "opportunity_id", + } + student_tables = ( {p.name for p in (student_bundle / "tables").glob("*.parquet")} if (student_bundle / "tables").exists() @@ -199,25 +237,35 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - missing_from_instructor = student_tables - instructor_tables if missing_from_instructor: errors.append(f"Tables in student but not instructor: {sorted(missing_from_instructor)}") - extra_in_instructor = instructor_tables - student_tables + expected_only_in_instructor = {f"{name}.parquet" for name in BANNED_TABLES} + extra_in_instructor = instructor_tables - student_tables - expected_only_in_instructor if extra_in_instructor: errors.append(f"Tables in instructor but not student: {sorted(extra_in_instructor)}") - expected_redacted = redacted_columns_for(ExposureMode.student_public) for table in sorted(student_tables & instructor_tables): s_path = student_bundle / "tables" / table i_path = instructor_bundle / "tables" / table if file_sha256(s_path) == file_sha256(i_path): continue - # Mismatch is acceptable iff the only difference is redacted - # columns (same logic as for task splits below). s_df = pd.read_parquet(s_path) i_df = pd.read_parquet(i_path) - if len(s_df) != len(i_df): + + table_name = table[: -len(".parquet")] + # Row counts can differ for snapshot-filtered event tables; the + # contract is "student rows ⊆ instructor rows", checked below by + # comparing shared columns on the inner join. + is_snapshot_filtered = table_name in snapshot_filtered_table_names + if not is_snapshot_filtered and len(s_df) != len(i_df): errors.append( f"Table {table}: row count mismatch student={len(s_df)} instructor={len(i_df)}" ) continue + if is_snapshot_filtered and len(s_df) > len(i_df): + errors.append( + f"Table {table}: student has more rows than instructor " + f"({len(s_df)} > {len(i_df)}); snapshot-safe export must be a row-subset" + ) + continue s_cols = set(s_df.columns) i_cols = set(i_df.columns) extra_in_student = s_cols - i_cols @@ -228,17 +276,41 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - ) continue diff = i_cols - s_cols - if not diff.issubset(expected_redacted): + allowed = expected_redacted | extra_columns_allowed_per_table.get(table_name, set()) + if not diff.issubset(allowed): errors.append( f"Table {table}: instructor−student column diff {sorted(diff)} contains " - f"non-redacted columns (expected subset of {sorted(expected_redacted)})" + f"non-redacted columns (expected subset of {sorted(allowed)})" ) continue shared = [c for c in s_df.columns if c in i_df.columns] - s_shared = s_df[shared].reset_index(drop=True) - i_shared = i_df[shared].reset_index(drop=True) - if not s_shared.equals(i_shared): - errors.append(f"Table {table}: shared-column values differ between modes") + if is_snapshot_filtered and len(s_df) != len(i_df): + # Row-subset by primary key. Each student row was derived + # from instructor by row-filtering, so the PK relationship + # is the strongest invariant we can assert without + # depending on column-by-column equality (which is fragile + # under NaN). + pk = snapshot_filtered_pks.get(table_name) + if pk is None or pk not in s_df.columns or pk not in i_df.columns: + errors.append( + f"Table {table}: snapshot-filtered table missing expected " + f"primary key {pk!r}; cannot verify row-subset" + ) + continue + student_pks = set(s_df[pk].tolist()) + instructor_pks = set(i_df[pk].tolist()) + orphans = student_pks - instructor_pks + if orphans: + sample = sorted(orphans)[:3] + errors.append( + f"Table {table}: {len(orphans)} student {pk}(s) absent from instructor, " + f"e.g. {sample} (snapshot-safe export must be a row-subset)" + ) + else: + s_shared = s_df[shared].reset_index(drop=True) + i_shared = i_df[shared].reset_index(drop=True) + if not s_shared.equals(i_shared): + errors.append(f"Table {table}: shared-column values differ between modes") # Both must have the same task splits with identical content student_tasks = ( diff --git a/tests/integration/test_snapshot_safe_bundle.py b/tests/integration/test_snapshot_safe_bundle.py new file mode 100644 index 0000000..11ad772 --- /dev/null +++ b/tests/integration/test_snapshot_safe_bundle.py @@ -0,0 +1,214 @@ +"""Integration tests for the snapshot-safe bundle write path (bundle schema v5). + +Covers the contract turned on in PR 2.2: ``student_public`` bundles +route ``tables/`` through +:func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe` +(the structural fix against the alpha-bundle reconstruction paths +A-E), ``research_instructor`` bundles keep the full-horizon export, +and the manifest is self-describing via ``relational_snapshot_safe``, +``structural_redactions``, and ``bundle_schema_version == "5"``. + +Tests fall into three groups: + +* **Round-trip** — both modes write, both validate clean, on-disk + shape matches the contract (banned columns / banned tables drop in + public; both retained in instructor). +* **Manifest contract** — row counts match disk; banned tables are + not listed under ``manifest.tables``. +* **Negative** — a tampered "public" bundle (instructor copy moved + into the public slot) trips the relational-leakage probes through + ``validate_bundle``, with specific banned columns / tables / join + paths named in the findings (not just channel-set membership). + +Hash determinism is covered by +:class:`tests.validation.test_invariants.TestDeterminism` for both +modes — no need to duplicate it here. +""" + +from __future__ import annotations + +import json +import shutil +from pathlib import Path + +import pyarrow.parquet as pq +import pytest + +from leadforge.api.generator import Generator +from leadforge.validation.bundle_checks import validate_bundle +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + CHANNEL_BANNED_COLUMN, + CHANNEL_BANNED_TABLE, + CHANNEL_JOIN_RECONSTRUCTION, + run_all_probes, +) + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +def _build(mode: str, out: Path, seed: int = 42) -> None: + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) + gen.generate(**_SMALL).save(str(out)) + + +@pytest.fixture(scope="module") +def public_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("snapshot_safe_public") + _build("student_public", out) + return out + + +@pytest.fixture(scope="module") +def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("snapshot_safe_instructor") + _build("research_instructor", out) + return out + + +# --------------------------------------------------------------------------- +# Round-trip +# --------------------------------------------------------------------------- + + +class TestRoundTripValidates: + def test_public_bundle_validates(self, public_bundle: Path) -> None: + errors = validate_bundle(public_bundle) + assert errors == [], f"public bundle should validate clean, got: {errors}" + + def test_instructor_bundle_validates(self, instructor_bundle: Path) -> None: + errors = validate_bundle(instructor_bundle) + assert errors == [], f"instructor bundle should validate clean, got: {errors}" + + def test_public_bundle_omits_banned_tables_on_disk(self, public_bundle: Path) -> None: + for name in BANNED_TABLES: + assert not (public_bundle / "tables" / f"{name}.parquet").exists(), ( + f"public bundle must not write tables/{name}.parquet" + ) + + def test_instructor_bundle_keeps_banned_tables_on_disk(self, instructor_bundle: Path) -> None: + for name in BANNED_TABLES: + assert (instructor_bundle / "tables" / f"{name}.parquet").exists(), ( + f"instructor bundle must retain tables/{name}.parquet" + ) + + def test_public_leads_drops_banned_columns(self, public_bundle: Path) -> None: + cols = set(pq.read_schema(public_bundle / "tables/leads.parquet").names) + for c in BANNED_LEAD_COLUMNS: + assert c not in cols, f"leads.{c} must be absent from public bundle" + + def test_public_opportunities_drops_banned_columns(self, public_bundle: Path) -> None: + cols = set(pq.read_schema(public_bundle / "tables/opportunities.parquet").names) + for c in BANNED_OPP_COLUMNS: + assert c not in cols, f"opportunities.{c} must be absent from public bundle" + + def test_instructor_leads_keeps_banned_columns(self, instructor_bundle: Path) -> None: + cols = set(pq.read_schema(instructor_bundle / "tables/leads.parquet").names) + for c in BANNED_LEAD_COLUMNS: + assert c in cols, ( + f"leads.{c} must be retained in research_instructor bundle (full-horizon export)" + ) + + +# --------------------------------------------------------------------------- +# Manifest contract +# --------------------------------------------------------------------------- + + +class TestManifestContract: + def test_public_manifest_table_row_counts_match_disk(self, public_bundle: Path) -> None: + """Manifest row counts must come from the *post-redaction* dict so + consumers reading the manifest see the truth on disk, not the + pre-redaction full-horizon shape.""" + manifest = json.loads((public_bundle / "manifest.json").read_text()) + for name, info in manifest["tables"].items(): + actual = pq.read_metadata(public_bundle / f"tables/{name}.parquet").num_rows + assert info["row_count"] == actual, ( + f"manifest row_count for {name} ({info['row_count']}) " + f"disagrees with parquet ({actual})" + ) + + def test_public_manifest_does_not_list_banned_tables(self, public_bundle: Path) -> None: + manifest = json.loads((public_bundle / "manifest.json").read_text()) + for name in BANNED_TABLES: + assert name not in manifest["tables"], ( + f"manifest.tables must not list banned table {name!r}" + ) + + +# --------------------------------------------------------------------------- +# Negative — tampered public bundle +# --------------------------------------------------------------------------- + + +class TestTamperedPublicBundle: + """Hand-craft a tampered public bundle and verify ``validate_bundle`` + surfaces the leakage findings. Tampering = take an instructor bundle + (full-horizon shape) and rewrite its manifest to claim + ``student_public``. This is the structural attack the contract + defends against.""" + + def _make_tampered(self, instructor_bundle: Path, dest: Path) -> Path: + shutil.copytree(instructor_bundle, dest) + manifest = json.loads((dest / "manifest.json").read_text()) + manifest["exposure_mode"] = "student_public" + # ``redacted_columns`` would mismatch the public expectation + # (instructor has []); align it so we test the relational + # leakage path specifically and not the redaction-set check. + manifest["redacted_columns"] = ["current_stage", "is_sql"] + # Leave ``relational_snapshot_safe`` at False — that IS the + # lie this test exercises (manifest claims public, tables are + # full-horizon). + (dest / "manifest.json").write_text(json.dumps(manifest, indent=2)) + return dest + + def test_validate_surfaces_leakage_findings( + self, tmp_path: Path, instructor_bundle: Path + ) -> None: + tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered") + errors = validate_bundle(tampered, include_realism=False) + leak_errors = [e for e in errors if e.startswith("Relational leakage")] + assert leak_errors, ( + f"tampered public bundle must surface relational-leakage errors; got {errors}" + ) + + def test_findings_name_specific_banned_columns( + self, tmp_path: Path, instructor_bundle: Path + ) -> None: + """Channel membership isn't enough — assert the *details* point at + the actual banned columns / tables, so a future regression that + leaves one finding behind doesn't slip through.""" + tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered_details") + manifest = json.loads((tampered / "manifest.json").read_text()) + report = run_all_probes(tampered, snapshot_day=manifest["snapshot_day"]) + + details_by_channel: dict[str, set[str]] = {} + for f in report.findings: + details_by_channel.setdefault(f.channel, set()).add(f.detail) + + # Every banned column must be named in a banned-column finding. + banned_col_details = details_by_channel.get(CHANNEL_BANNED_COLUMN, set()) + expected_banned_cols = {f"leads.{c}" for c in BANNED_LEAD_COLUMNS} | { + f"opportunities.{c}" for c in BANNED_OPP_COLUMNS + } + assert expected_banned_cols <= banned_col_details, ( + f"banned-column findings missing entries: " + f"{sorted(expected_banned_cols - banned_col_details)}" + ) + + # Every banned table must be named in a banned-table finding. + banned_tbl_details = details_by_channel.get(CHANNEL_BANNED_TABLE, set()) + assert set(BANNED_TABLES) <= banned_tbl_details, ( + f"banned-table findings missing entries: " + f"{sorted(set(BANNED_TABLES) - banned_tbl_details)}" + ) + + # Join-reconstruction must name at least path B (closed_won + # opportunities are in the tampered bundle). Paths C / D may + # also fire when customers / subscriptions are present. + join_details = details_by_channel.get(CHANNEL_JOIN_RECONSTRUCTION, set()) + assert "path_b_opportunity_won" in join_details, ( + f"join-reconstruction finding for path B missing; got {sorted(join_details)}" + ) diff --git a/tests/render/test_bundle_schema_v4_contract.py b/tests/render/test_bundle_schema_v4_contract.py deleted file mode 100644 index b839471..0000000 --- a/tests/render/test_bundle_schema_v4_contract.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Schema contract test for ``bundle_schema_version == "4"``. - -The constants below are an *intentional* duplication of the column sets -the bundle writer produces. The duplication is the point: any change to -``LEAD_SNAPSHOT_FEATURES``, ``LeadRow``, or the redaction policy that -also changes the published column set must update this contract. A bare -"add a new feature" PR that touches the spec but not this file will fail -here, forcing the author to either update the contract (and bump -``BUNDLE_SCHEMA_VERSION``) or revisit the change. - -v4 vs v3: the column SET is unchanged. The bump records the semantic -shift introduced by the windowed snapshot — event-aggregate column -VALUES are now anchored at ``snapshot_day`` instead of the full horizon. -``manifest.snapshot_day`` makes the contract self-describing. - -If you find yourself wondering "do I have to update this?": yes. That -is the failure mode this test is designed to catch. -""" - -from __future__ import annotations - -import json -from pathlib import Path - -import pyarrow.parquet as pq -import pytest - -from leadforge.api.generator import Generator - -# Pinned column lists for bundle schema v4. Update *together* with -# ``BUNDLE_SCHEMA_VERSION`` and ``LEAD_SNAPSHOT_FEATURES``. - -V4_TASK_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( - { - "account_id", - "industry", - "region", - "employee_band", - "estimated_revenue_band", - "process_maturity_band", - "contact_id", - "role_function", - "seniority", - "buyer_role", - "lead_id", - "lead_created_at", - "lead_source", - "first_touch_channel", - "touch_count", - "inbound_touch_count", - "outbound_touch_count", - "session_count", - "pricing_page_views", - "demo_page_views", - "total_session_duration_seconds", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "activity_count", - "days_since_last_touch", - "opportunity_created", - "has_open_opportunity", - "opportunity_estimated_acv", - "expected_acv", - "total_touches_all", - "converted_within_90_days", - } -) - -V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V4_TASK_COLUMNS_STUDENT_PUBLIC | { - "current_stage", - "is_sql", -} - -V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( - { - "lead_id", - "contact_id", - "account_id", - "lead_created_at", - "lead_source", - "first_touch_channel", - # ``current_stage``, ``is_sql`` redacted in student_public - "owner_rep_id", - "converted_within_90_days", - "conversion_timestamp", - } -) - -V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC | { - "current_stage", - "is_sql", -} - -_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} - - -def _build(mode: str, out: Path, seed: int = 42) -> None: - gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) - gen.generate(**_SMALL).save(str(out)) - - -@pytest.fixture(scope="module") -def student_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("v4_student") - _build("student_public", out) - return out - - -@pytest.fixture(scope="module") -def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("v4_instructor") - _build("research_instructor", out) - return out - - -def _task_cols(bundle: Path) -> frozenset[str]: - return frozenset(pq.read_schema(bundle / "tasks/converted_within_90_days/train.parquet").names) - - -def _leads_cols(bundle: Path) -> frozenset[str]: - return frozenset(pq.read_schema(bundle / "tables/leads.parquet").names) - - -def test_manifest_declares_v4(student_bundle: Path, instructor_bundle: Path) -> None: - for b in (student_bundle, instructor_bundle): - manifest = json.loads((b / "manifest.json").read_text()) - assert manifest["bundle_schema_version"] == "4", ( - f"{b.name}: bundle_schema_version is {manifest['bundle_schema_version']!r}, " - "expected '4'" - ) - - -def test_manifest_records_snapshot_day(student_bundle: Path, instructor_bundle: Path) -> None: - """v4 contract: manifest must surface ``snapshot_day`` so consumers can - distinguish full-horizon (legacy) bundles from windowed bundles.""" - for b in (student_bundle, instructor_bundle): - manifest = json.loads((b / "manifest.json").read_text()) - assert "snapshot_day" in manifest, f"{b.name}: manifest is missing 'snapshot_day' field" - # The b2b_saas_procurement_v1 recipe pins snapshot_day=30. - assert manifest["snapshot_day"] == 30, ( - f"{b.name}: snapshot_day is {manifest['snapshot_day']!r}, expected 30" - ) - - -def test_student_public_task_columns_match_v4_contract(student_bundle: Path) -> None: - actual = _task_cols(student_bundle) - assert actual == V4_TASK_COLUMNS_STUDENT_PUBLIC, ( - f"student_public task split columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_TASK_COLUMNS_STUDENT_PUBLIC)}\n" - f" missing: {sorted(V4_TASK_COLUMNS_STUDENT_PUBLIC - actual)}\n" - " → either update tests/render/test_bundle_schema_v4_contract.py and " - "bump BUNDLE_SCHEMA_VERSION, or revert the schema change." - ) - - -def test_research_instructor_task_columns_match_v4_contract(instructor_bundle: Path) -> None: - actual = _task_cols(instructor_bundle) - assert actual == V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR, ( - f"research_instructor task split columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR)}\n" - f" missing: {sorted(V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" - ) - - -def test_student_public_leads_table_columns_match_v4_contract(student_bundle: Path) -> None: - actual = _leads_cols(student_bundle) - assert actual == V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC, ( - f"student_public tables/leads.parquet columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC)}\n" - f" missing: {sorted(V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC - actual)}" - ) - - -def test_research_instructor_leads_table_columns_match_v4_contract( - instructor_bundle: Path, -) -> None: - actual = _leads_cols(instructor_bundle) - assert actual == V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR, ( - f"research_instructor tables/leads.parquet columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR)}\n" - f" missing: {sorted(V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" - ) - - -def test_student_public_redacted_set_in_manifest(student_bundle: Path) -> None: - manifest = json.loads((student_bundle / "manifest.json").read_text()) - assert set(manifest["redacted_columns"]) == {"current_stage", "is_sql"} - - -def test_research_instructor_redacted_set_in_manifest(instructor_bundle: Path) -> None: - manifest = json.loads((instructor_bundle / "manifest.json").read_text()) - assert manifest["redacted_columns"] == [] - - -def test_is_mql_absent_from_all_v4_artifacts(student_bundle: Path, instructor_bundle: Path) -> None: - """``is_mql`` was removed entirely in v3 (issue #57) and remains absent in v4: - not in the snapshot, not in the relational table, not in the feature - dictionary, not anywhere.""" - for b in (student_bundle, instructor_bundle): - assert "is_mql" not in _task_cols(b) - assert "is_mql" not in _leads_cols(b) diff --git a/tests/render/test_bundle_schema_v5_contract.py b/tests/render/test_bundle_schema_v5_contract.py new file mode 100644 index 0000000..e199d4e --- /dev/null +++ b/tests/render/test_bundle_schema_v5_contract.py @@ -0,0 +1,336 @@ +"""Schema contract test for ``bundle_schema_version == "5"``. + +The constants below are an *intentional* duplication of the column / +table sets the bundle writer produces. The duplication is the point: +any change to ``LEAD_SNAPSHOT_FEATURES``, ``LeadRow``, the redaction +policy, or the snapshot-safe contract (``BANNED_LEAD_COLUMNS`` / +``BANNED_OPP_COLUMNS`` / ``BANNED_TABLES``) that also changes the +published shape must update this contract. A bare "add a new feature" +PR that touches the spec but not this file will fail here, forcing the +author to either update the contract (and bump +``BUNDLE_SCHEMA_VERSION``) or revisit the change. + +v5 vs v4: ``student_public`` bundles now route through the +snapshot-safe relational export. Public ``leads.parquet`` drops +``converted_within_90_days`` and ``conversion_timestamp``; public +``opportunities.parquet`` drops ``close_outcome`` and ``closed_at``; +public bundles omit ``customers`` and ``subscriptions`` entirely; event +tables are filtered per-lead to ``lead_created_at + snapshot_day``. +``manifest.relational_snapshot_safe`` records the contract so the +bundle is self-describing. ``research_instructor`` bundles keep the +full-horizon export. + +Task split column SET is unchanged from v4 — the structural fix lives +in ``tables/``, not the snapshot. + +If you find yourself wondering "do I have to update this?": yes. That +is the failure mode this test is designed to catch. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pyarrow.parquet as pq +import pytest + +from leadforge.api.generator import Generator + +# Pinned column / table sets for bundle schema v5. Update *together* +# with ``BUNDLE_SCHEMA_VERSION``, ``LEAD_SNAPSHOT_FEATURES``, and the +# snapshot-safe contract constants in +# ``leadforge.validation.relational_leakage``. + +V5_TASK_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + { + "account_id", + "industry", + "region", + "employee_band", + "estimated_revenue_band", + "process_maturity_band", + "contact_id", + "role_function", + "seniority", + "buyer_role", + "lead_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + "touch_count", + "inbound_touch_count", + "outbound_touch_count", + "session_count", + "pricing_page_views", + "demo_page_views", + "total_session_duration_seconds", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "activity_count", + "days_since_last_touch", + "opportunity_created", + "has_open_opportunity", + "opportunity_estimated_acv", + "expected_acv", + "total_touches_all", + "converted_within_90_days", + } +) + +V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V5_TASK_COLUMNS_STUDENT_PUBLIC | { + "current_stage", + "is_sql", +} + +# v5: public ``leads.parquet`` drops ``converted_within_90_days`` and +# ``conversion_timestamp`` (BANNED_LEAD_COLUMNS). +V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + { + "lead_id", + "contact_id", + "account_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + # ``current_stage``, ``is_sql`` redacted in student_public + "owner_rep_id", + # ``converted_within_90_days`` / ``conversion_timestamp`` dropped + # by the snapshot-safe export (PR 2.2) + } +) + +V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = frozenset( + { + "lead_id", + "contact_id", + "account_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + "current_stage", + "is_sql", + "owner_rep_id", + "converted_within_90_days", + "conversion_timestamp", + } +) + +# v5: public ``opportunities.parquet`` drops ``close_outcome`` and +# ``closed_at`` (BANNED_OPP_COLUMNS). +V5_OPP_TABLE_BANNED_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + {"close_outcome", "closed_at"} +) + +# v5: public bundles must NOT include these tables. +V5_PUBLIC_BANNED_TABLES: frozenset[str] = frozenset({"customers", "subscriptions"}) + +# v5: tables that ARE expected in public bundles. +V5_PUBLIC_EXPECTED_TABLES: frozenset[str] = frozenset( + { + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + } +) + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +def _build(mode: str, out: Path, seed: int = 42) -> None: + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) + gen.generate(**_SMALL).save(str(out)) + + +@pytest.fixture(scope="module") +def student_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("v5_student") + _build("student_public", out) + return out + + +@pytest.fixture(scope="module") +def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("v5_instructor") + _build("research_instructor", out) + return out + + +def _task_cols(bundle: Path) -> frozenset[str]: + return frozenset(pq.read_schema(bundle / "tasks/converted_within_90_days/train.parquet").names) + + +def _table_cols(bundle: Path, table_name: str) -> frozenset[str]: + return frozenset(pq.read_schema(bundle / f"tables/{table_name}.parquet").names) + + +def _leads_cols(bundle: Path) -> frozenset[str]: + return _table_cols(bundle, "leads") + + +def test_manifest_declares_v5(student_bundle: Path, instructor_bundle: Path) -> None: + for b in (student_bundle, instructor_bundle): + manifest = json.loads((b / "manifest.json").read_text()) + assert manifest["bundle_schema_version"] == "5", ( + f"{b.name}: bundle_schema_version is {manifest['bundle_schema_version']!r}, " + "expected '5'" + ) + + +def test_manifest_records_snapshot_day(student_bundle: Path, instructor_bundle: Path) -> None: + for b in (student_bundle, instructor_bundle): + manifest = json.loads((b / "manifest.json").read_text()) + assert "snapshot_day" in manifest, f"{b.name}: manifest is missing 'snapshot_day' field" + assert manifest["snapshot_day"] == 30, ( + f"{b.name}: snapshot_day is {manifest['snapshot_day']!r}, expected 30" + ) + + +def test_manifest_records_relational_snapshot_safe( + student_bundle: Path, instructor_bundle: Path +) -> None: + """v5 contract: manifest must surface ``relational_snapshot_safe`` so a tool + reading a bundle can tell from the manifest alone whether ``tables/`` is the + snapshot-safe (public) shape or the full-horizon (instructor) shape.""" + student_manifest = json.loads((student_bundle / "manifest.json").read_text()) + assert student_manifest["relational_snapshot_safe"] is True, ( + "student_public bundle must declare relational_snapshot_safe = true" + ) + + instructor_manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert instructor_manifest["relational_snapshot_safe"] is False, ( + "research_instructor bundle must declare relational_snapshot_safe = false" + ) + + +def test_manifest_records_structural_redactions( + student_bundle: Path, instructor_bundle: Path +) -> None: + """v5 contract: ``manifest.structural_redactions`` enumerates the table-level + drops (columns + omitted tables) so the bundle is self-describing.""" + student = json.loads((student_bundle / "manifest.json").read_text()) + assert student["structural_redactions"] == { + "columns": { + "leads": ["conversion_timestamp", "converted_within_90_days"], + "opportunities": ["close_outcome", "closed_at"], + }, + "omitted_tables": ["customers", "subscriptions"], + }, "student_public must record the snapshot-safe drops in manifest.structural_redactions" + + instructor = json.loads((instructor_bundle / "manifest.json").read_text()) + assert instructor["structural_redactions"] == { + "columns": {}, + "omitted_tables": [], + }, "research_instructor must record an empty structural_redactions" + + +def test_student_public_task_columns_match_v5_contract(student_bundle: Path) -> None: + actual = _task_cols(student_bundle) + assert actual == V5_TASK_COLUMNS_STUDENT_PUBLIC, ( + f"student_public task split columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_TASK_COLUMNS_STUDENT_PUBLIC)}\n" + f" missing: {sorted(V5_TASK_COLUMNS_STUDENT_PUBLIC - actual)}\n" + " → either update tests/render/test_bundle_schema_v5_contract.py and " + "bump BUNDLE_SCHEMA_VERSION, or revert the schema change." + ) + + +def test_research_instructor_task_columns_match_v5_contract(instructor_bundle: Path) -> None: + actual = _task_cols(instructor_bundle) + assert actual == V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR, ( + f"research_instructor task split columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR)}\n" + f" missing: {sorted(V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" + ) + + +def test_student_public_leads_table_columns_match_v5_contract(student_bundle: Path) -> None: + actual = _leads_cols(student_bundle) + assert actual == V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC, ( + f"student_public tables/leads.parquet columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC)}\n" + f" missing: {sorted(V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC - actual)}" + ) + + +def test_research_instructor_leads_table_columns_match_v5_contract( + instructor_bundle: Path, +) -> None: + actual = _leads_cols(instructor_bundle) + assert actual == V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR, ( + f"research_instructor tables/leads.parquet columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR)}\n" + f" missing: {sorted(V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" + ) + + +def test_student_public_opportunities_drop_banned_columns(student_bundle: Path) -> None: + actual = _table_cols(student_bundle, "opportunities") + leak = actual & V5_OPP_TABLE_BANNED_COLUMNS_STUDENT_PUBLIC + assert not leak, ( + f"student_public tables/opportunities.parquet retains banned columns: {sorted(leak)}.\n" + " → snapshot-safe export must drop close_outcome / closed_at." + ) + + +def test_research_instructor_opportunities_keep_banned_columns(instructor_bundle: Path) -> None: + """Instructor bundles retain the full-horizon shape — those columns are + *not* banned for them, they're load-bearing for hidden-truth analysis.""" + actual = _table_cols(instructor_bundle, "opportunities") + assert "close_outcome" in actual, "research_instructor must retain close_outcome" + assert "closed_at" in actual, "research_instructor must retain closed_at" + + +def test_student_public_omits_banned_tables(student_bundle: Path) -> None: + tables_dir = student_bundle / "tables" + for name in V5_PUBLIC_BANNED_TABLES: + assert not (tables_dir / f"{name}.parquet").exists(), ( + f"student_public bundle must not include tables/{name}.parquet" + ) + + manifest = json.loads((student_bundle / "manifest.json").read_text()) + declared = set(manifest["tables"].keys()) + assert not (declared & V5_PUBLIC_BANNED_TABLES), ( + f"manifest.tables must not list banned tables: {sorted(declared & V5_PUBLIC_BANNED_TABLES)}" + ) + + +def test_student_public_includes_expected_tables(student_bundle: Path) -> None: + tables_dir = student_bundle / "tables" + for name in V5_PUBLIC_EXPECTED_TABLES: + assert (tables_dir / f"{name}.parquet").exists(), ( + f"student_public bundle is missing required tables/{name}.parquet" + ) + + +def test_research_instructor_keeps_banned_tables(instructor_bundle: Path) -> None: + """Instructor bundles retain ``customers`` / ``subscriptions``.""" + tables_dir = instructor_bundle / "tables" + for name in V5_PUBLIC_BANNED_TABLES: + assert (tables_dir / f"{name}.parquet").exists(), ( + f"research_instructor bundle must include tables/{name}.parquet" + ) + + +def test_student_public_redacted_set_in_manifest(student_bundle: Path) -> None: + manifest = json.loads((student_bundle / "manifest.json").read_text()) + assert set(manifest["redacted_columns"]) == {"current_stage", "is_sql"} + + +def test_research_instructor_redacted_set_in_manifest(instructor_bundle: Path) -> None: + manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert manifest["redacted_columns"] == [] + + +def test_is_mql_absent_from_all_v5_artifacts(student_bundle: Path, instructor_bundle: Path) -> None: + """``is_mql`` was removed entirely in v3 (issue #57) and remains absent in v5: + not in the snapshot, not in the relational table, not in the feature + dictionary, not anywhere.""" + for b in (student_bundle, instructor_bundle): + assert "is_mql" not in _task_cols(b) + assert "is_mql" not in _leads_cols(b) diff --git a/tests/render/test_render.py b/tests/render/test_render.py index aad4db1..ddeadcf 100644 --- a/tests/render/test_render.py +++ b/tests/render/test_render.py @@ -23,7 +23,12 @@ def _make_config(seed: int = 42, n_leads: int = 80) -> GenerationConfig: - return GenerationConfig(seed=seed, n_accounts=30, n_contacts=90, n_leads=n_leads) + # ``snapshot_day=30`` matches the recipe pin and satisfies the + # student_public writer contract introduced in PR 2.2 (the + # snapshot-safe export needs a finite window). + return GenerationConfig( + seed=seed, n_accounts=30, n_contacts=90, n_leads=n_leads, snapshot_day=30 + ) def _make_narrative(seed: int = 42): diff --git a/tests/scripts/test_probe_relational_leakage.py b/tests/scripts/test_probe_relational_leakage.py index 57a3fd3..e063be6 100644 --- a/tests/scripts/test_probe_relational_leakage.py +++ b/tests/scripts/test_probe_relational_leakage.py @@ -233,11 +233,14 @@ def test_cli_smoke_reports_numeric_accuracy() -> None: @pytest.mark.skipif( not (_REPO_ROOT / "release" / "intermediate" / "tables" / "leads.parquet").exists(), - reason="alpha bundle not present (clean checkout)", + reason="release bundle not present (clean checkout)", ) -def test_cli_max_accuracy_gate_fails_on_alpha() -> None: - """Phase-2 CI gate flag: alpha bundles trivially exceed any sane threshold, - so ``--max-accuracy 0.65`` must exit 2.""" +def test_cli_max_accuracy_gate_passes_on_v5_bundle() -> None: + """Phase-2 CI gate: post-PR 2.2 (snapshot-safe v5) public bundles must + pass ``--max-accuracy 0.65``. This test was originally pointed at + alpha bundles and asserted exit 2 (gate failure) as evidence of the + leak; PR 2.2 fixed the leak, so the regenerated bundles must now exit + 0 instead.""" result = subprocess.run( # noqa: S603 — controlled args, sys.executable [ sys.executable, @@ -250,5 +253,5 @@ def test_cli_max_accuracy_gate_fails_on_alpha() -> None: text=True, check=False, ) - assert result.returncode == 2, result.stderr - assert "GATE FAILURE" in result.stderr + assert result.returncode == 0, result.stderr + assert "GATE FAILURE" not in result.stderr diff --git a/tests/validation/test_bundle_checks.py b/tests/validation/test_bundle_checks.py index 31d3e51..4f1679a 100644 --- a/tests/validation/test_bundle_checks.py +++ b/tests/validation/test_bundle_checks.py @@ -131,3 +131,23 @@ def test_missing_required_file(self, tmp_path: Path, valid_bundle: Path) -> None errors = validate_bundle(corrupt) assert any("dataset_card.md" in e for e in errors) + + def test_relational_snapshot_safe_must_be_a_bool( + self, tmp_path: Path, valid_bundle: Path + ) -> None: + """``manifest.relational_snapshot_safe`` is consulted to decide + whether ``customers`` / ``subscriptions`` are *expected* absent. + A bare ``bool(...)`` coercion would make the JSON string + ``"false"`` truthy, silently suppressing FK errors on a + corrupted manifest. Surface the type problem instead.""" + corrupt = tmp_path / "bad_flag" + shutil.copytree(valid_bundle, corrupt) + manifest = json.loads((corrupt / "manifest.json").read_text()) + # JSON string "false" — truthy under bool() but invalid. + manifest["relational_snapshot_safe"] = "false" + (corrupt / "manifest.json").write_text(json.dumps(manifest, indent=2)) + + errors = validate_bundle(corrupt, include_realism=False) + assert any("relational_snapshot_safe" in e and "boolean" in e for e in errors), ( + f"expected strict-bool error; got: {errors}" + ) diff --git a/tests/validation/test_invariants.py b/tests/validation/test_invariants.py index 078548c..f125b3e 100644 --- a/tests/validation/test_invariants.py +++ b/tests/validation/test_invariants.py @@ -22,7 +22,7 @@ @pytest.fixture(scope="module") def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: - """Generate two bundles with the same seed and pinned timestamp.""" + """Generate two student_public bundles with the same seed and pinned timestamp.""" a = tmp_path_factory.mktemp("det_a") b = tmp_path_factory.mktemp("det_b") for out in (a, b): @@ -33,6 +33,21 @@ def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, return a, b +@pytest.fixture(scope="module") +def determinism_instructor_bundles( + tmp_path_factory: pytest.TempPathFactory, +) -> tuple[Path, Path]: + """Generate two research_instructor bundles with the same seed and pinned timestamp.""" + a = tmp_path_factory.mktemp("det_instructor_a") + b = tmp_path_factory.mktemp("det_instructor_b") + for out in (a, b): + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", seed=77, exposure_mode="research_instructor" + ) + gen.generate(**_SMALL).save(str(out), generation_timestamp=_PINNED_TIMESTAMP) + return a, b + + @pytest.fixture(scope="module") def exposure_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: """Generate student_public and research_instructor bundles.""" @@ -55,6 +70,16 @@ def test_same_seed_produces_identical_bundles( errors = check_determinism(a, b) assert errors == [] + def test_same_seed_produces_identical_instructor_bundles( + self, determinism_instructor_bundles: tuple[Path, Path] + ) -> None: + """Determinism must hold for ``research_instructor`` too — the + full-horizon export and the metadata/ artefacts (graph, latents, + mechanism summary) are part of the deterministic contract.""" + a, b = determinism_instructor_bundles + errors = check_determinism(a, b) + assert errors == [] + def test_different_seeds_differ(self, tmp_path: Path) -> None: a = tmp_path / "seed1" b = tmp_path / "seed2" @@ -87,6 +112,218 @@ def test_instructor_without_metadata_fails(self, exposure_bundles: tuple[Path, P assert any("missing metadata" in e for e in errors) +# --------------------------------------------------------------------------- +# check_exposure_monotonicity — focused unit tests for the v5 (PR 2.2) +# snapshot-safe branches. These build minimal synthetic bundles so the +# table-comparison logic is exercised independently of generation. +# --------------------------------------------------------------------------- + + +class TestExposureMonotonicitySnapshotSafe: + """Exercise the table-comparison branches added in PR 2.2: + omitted ``BANNED_TABLES``, dropped ``BANNED_LEAD_COLUMNS`` / + ``BANNED_OPP_COLUMNS``, and PK-based row-subset on + ``SNAPSHOT_FILTERED_TABLES``.""" + + @staticmethod + def _shell(root: Path, *, has_metadata: bool) -> Path: + """Write the minimum scaffolding ``check_exposure_monotonicity`` requires + before it reaches the table-comparison logic. Feature dictionary is + written empty so the subset check trivially passes.""" + import pandas as pd + + root.mkdir(parents=True, exist_ok=True) + (root / "manifest.json").write_text("{}") + (root / "dataset_card.md").write_text("") + # Identical empty FD on both sides → subset check trivially passes. + pd.DataFrame({"name": [], "dtype": []}).to_csv(root / "feature_dictionary.csv", index=False) + if has_metadata: + (root / "metadata").mkdir() + (root / "tables").mkdir() + return root + + @staticmethod + def _write_parquet(root: Path, name: str, df) -> None: + df.to_parquet(root / "tables" / f"{name}.parquet", index=False) + + @pytest.fixture + def student_root(self, tmp_path: Path) -> Path: + return self._shell(tmp_path / "student", has_metadata=False) + + @pytest.fixture + def instructor_root(self, tmp_path: Path) -> Path: + return self._shell(tmp_path / "instructor", has_metadata=True) + + def test_omitted_banned_tables_pass(self, student_root: Path, instructor_root: Path) -> None: + """``customers`` / ``subscriptions`` in instructor only is the + expected snapshot-safe contract — must NOT raise an error.""" + import pandas as pd + + for root in (student_root, instructor_root): + self._write_parquet(root, "accounts", pd.DataFrame({"account_id": ["a1"]})) + self._write_parquet( + instructor_root, + "customers", + pd.DataFrame({"customer_id": ["c1"], "opportunity_id": ["o1"]}), + ) + self._write_parquet( + instructor_root, + "subscriptions", + pd.DataFrame({"subscription_id": ["s1"], "customer_id": ["c1"]}), + ) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_unexpected_extra_instructor_table_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """A NON-banned extra table on instructor must still be flagged — + snapshot-safe doesn't license arbitrary instructor-only tables.""" + import pandas as pd + + for root in (student_root, instructor_root): + self._write_parquet(root, "accounts", pd.DataFrame({"account_id": ["a1"]})) + self._write_parquet(instructor_root, "rogue", pd.DataFrame({"x": [1]})) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("rogue.parquet" in e and "instructor but not student" in e for e in errors) + + def test_dropped_banned_lead_columns_pass( + self, student_root: Path, instructor_root: Path + ) -> None: + """Public ``leads`` drops ``converted_within_90_days`` / + ``conversion_timestamp`` — must NOT raise.""" + import pandas as pd + + student_leads = pd.DataFrame({"lead_id": ["l1"], "lead_created_at": ["2024-01-01"]}) + instructor_leads = pd.DataFrame( + { + "lead_id": ["l1"], + "lead_created_at": ["2024-01-01"], + "converted_within_90_days": [True], + "conversion_timestamp": ["2024-02-01"], + } + ) + self._write_parquet(student_root, "leads", student_leads) + self._write_parquet(instructor_root, "leads", instructor_leads) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_dropped_banned_opp_columns_pass( + self, student_root: Path, instructor_root: Path + ) -> None: + """Public ``opportunities`` drops ``close_outcome`` / ``closed_at`` — + must NOT raise (and must still respect row-equality on shared + columns since the table size is below the snapshot-filter threshold + in this synthetic case).""" + import pandas as pd + + student_opps = pd.DataFrame( + { + "opportunity_id": ["o1"], + "lead_id": ["l1"], + "created_at": ["2024-01-01"], + } + ) + instructor_opps = pd.DataFrame( + { + "opportunity_id": ["o1"], + "lead_id": ["l1"], + "created_at": ["2024-01-01"], + "close_outcome": ["closed_won"], + "closed_at": ["2024-02-01"], + } + ) + self._write_parquet(student_root, "opportunities", student_opps) + self._write_parquet(instructor_root, "opportunities", instructor_opps) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_unexpected_extra_instructor_lead_column_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """A non-banned, non-redacted extra column on instructor's leads + must still trip the column-diff check.""" + import pandas as pd + + student_leads = pd.DataFrame({"lead_id": ["l1"]}) + instructor_leads = pd.DataFrame({"lead_id": ["l1"], "rogue_col": [42]}) + self._write_parquet(student_root, "leads", student_leads) + self._write_parquet(instructor_root, "leads", instructor_leads) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("leads.parquet" in e and "rogue_col" in e for e in errors) + + def test_pk_row_subset_pass(self, student_root: Path, instructor_root: Path) -> None: + """Student touches PK ⊂ instructor touches PK — must NOT raise.""" + import pandas as pd + + instructor_touches = pd.DataFrame( + { + "touch_id": ["t1", "t2", "t3", "t4"], + "lead_id": ["l1", "l1", "l1", "l1"], + } + ) + # Student has a snapshot-window subset (rows for t1, t2 only). + student_touches = instructor_touches.iloc[:2].reset_index(drop=True) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_pk_orphan_row_fails(self, student_root: Path, instructor_root: Path) -> None: + """Student touch_id not present in instructor — must raise. The PK + branch fires when row counts differ; instructor strictly larger + with student carrying an orphan PK is the regression case.""" + import pandas as pd + + instructor_touches = pd.DataFrame( + {"touch_id": ["t1", "t2", "t3"], "lead_id": ["l1", "l1", "l1"]} + ) + student_touches = pd.DataFrame( + {"touch_id": ["t1", "t99"], "lead_id": ["l1", "l1"]} # t99 is the orphan + ) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("touches.parquet" in e and "absent from instructor" in e for e in errors), ( + f"expected PK-orphan error; got: {errors}" + ) + + def test_student_has_more_rows_than_instructor_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """Snapshot-safe must be a row-subset; student strictly larger means + the writer leaked extra rows somewhere — must raise.""" + import pandas as pd + + instructor_touches = pd.DataFrame({"touch_id": ["t1"], "lead_id": ["l1"]}) + student_touches = pd.DataFrame( + {"touch_id": ["t1", "t2", "t3"], "lead_id": ["l1", "l1", "l1"]} + ) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("touches.parquet" in e and "more rows than instructor" in e for e in errors) + + def test_missing_pk_column_surfaces_an_error( + self, student_root: Path, instructor_root: Path + ) -> None: + """Student dropping ``touch_id`` is structurally invalid — the + column-diff check catches it (``touch_id`` is not in the + snapshot-safe allowlist for ``touches``), surfacing it as a + non-redacted-column error before the PK-set comparison runs.""" + import pandas as pd + + instructor_touches = pd.DataFrame({"touch_id": ["t1", "t2"], "lead_id": ["l1", "l1"]}) + student_touches = pd.DataFrame({"lead_id": ["l1"]}) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any( + "touches.parquet" in e and "touch_id" in e and "non-redacted columns" in e + for e in errors + ), f"expected column-diff error naming touch_id; got: {errors}" + + def _make_synthetic_bundle( root: Path, files: dict[str, str | bytes],