From 57a80824b536fe9401ab9374e022d4c70684a690 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 22:25:34 +0300 Subject: [PATCH 1/4] feat(exposure,render,validation): route student_public through snapshot-safe export; bundle schema v5 (PR 2.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the structural fix produced by PR 2.1 into the writer and validator so alpha public bundles stop leaking ``converted_within_90_days`` via joins. Bumps ``BUNDLE_SCHEMA_VERSION`` to ``"5"`` and adds the new self-describing ``relational_snapshot_safe`` flag on the manifest. Writer (``leadforge/api/bundle.py``) - Calls ``to_dataframes_snapshot_safe(dfs, snapshot_day=...)`` for ``student_public`` after ``to_dataframes(...)``; ``research_instructor`` keeps the full-horizon export. ``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` and feature-level redaction operate on disjoint columns, so they neither double-emit nor overlap. Manifest row counts are computed on the post-redaction dict. Raises ``ValueError`` if ``snapshot_day=None`` for a snapshot-safe mode. Filter / manifest - ``BundleFilter`` gains ``relational_snapshot_safe`` (student_public True; research_instructor False); the writer reads this from ``get_filter(mode)``. - ``manifest.relational_snapshot_safe`` recorded so a tool reading a v5 bundle can tell from the manifest alone whether ``tables/`` is the snapshot-safe (public) or full-horizon (instructor) shape. Validator - ``validate_bundle`` now runs ``run_all_probes`` on student_public bundles only (instructor retains hidden truth by design); bonus probe stays off (PR 3.3 will calibrate per-tier bands). Findings are rendered as one error string per finding through the existing list-of- strings contract. - ``_check_fk_integrity`` silently skips FK constraints involving the expected-absent ``BANNED_TABLES`` for snapshot-safe bundles. - ``check_exposure_monotonicity`` now allows: customers/subscriptions in instructor only; ``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` on instructor's leads/opportunities; event-table row-subsets in the snapshot-filtered tables (``touches`` / ``sessions`` / ``sales_activities`` / ``opportunities``). Tests - Renamed v4 contract test → ``test_bundle_schema_v5_contract.py`` and refreshed the pinned column / table sets for the new shape. - New ``tests/test_pr_2_2_integration.py``: round-trip both modes validate clean, manifest contract (v5 + flag + post-redaction row counts), tampered-public negative case (instructor copy moved into the public slot must trip banned-column / banned-table / join-reconstruction channels), and pinned-timestamp hash determinism for both modes. - ``tests/render/test_render.py``: ``_make_config`` pins ``snapshot_day=30`` to match the writer contract. All 1037 tests pass; ruff + mypy clean. --- leadforge/api/bundle.py | 28 +- leadforge/exposure/filters.py | 22 +- leadforge/render/manifests.py | 23 +- leadforge/validation/bundle_checks.py | 55 ++- leadforge/validation/invariants.py | 76 ++++- .../render/test_bundle_schema_v4_contract.py | 202 ----------- .../render/test_bundle_schema_v5_contract.py | 315 ++++++++++++++++++ tests/render/test_render.py | 7 +- tests/test_pr_2_2_integration.py | 244 ++++++++++++++ 9 files changed, 751 insertions(+), 221 deletions(-) delete mode 100644 tests/render/test_bundle_schema_v4_contract.py create mode 100644 tests/render/test_bundle_schema_v5_contract.py create mode 100644 tests/test_pr_2_2_integration.py diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index 4065bc9..ec1fb36 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -3,7 +3,8 @@ :func:`write_bundle` is called by :meth:`WorldBundle.save` and orchestrates all rendering steps: -1. Write relational Parquet tables (``tables/``). +1. Project the relational dict (snapshot-safe for ``student_public``, + full-horizon for ``research_instructor``) and write ``tables/``. 2. Build the lead snapshot and write task splits (``tasks/``). 3. Write ``dataset_card.md`` and ``feature_dictionary.csv``. 4. Apply exposure filtering — write ``metadata/`` for ``research_instructor`` @@ -16,10 +17,12 @@ from pathlib import Path from typing import TYPE_CHECKING +from leadforge.exposure.filters import get_filter from leadforge.exposure.modes import apply_exposure from leadforge.narrative.dataset_card import render_dataset_card from leadforge.render.manifests import build_manifest, write_manifest from leadforge.render.relational import to_dataframes +from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe from leadforge.render.snapshots import build_snapshot from leadforge.render.tasks import write_task_splits from leadforge.schema.dictionaries import write_feature_dictionary @@ -66,14 +69,36 @@ def write_bundle( # README's "Option 3") cannot trivially reintroduce a redacted # column by joining ``tables/leads.parquet`` to their feature set. redacted = redacted_columns_for(config.exposure_mode) + bundle_filter = get_filter(config.exposure_mode) # ------------------------------------------------------------------ # 1. Relational tables → tables/ + # + # For ``student_public`` (``relational_snapshot_safe = True``) we + # project the full-horizon dict onto the snapshot-safe shape: + # ``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` are dropped, event + # tables are filtered per-lead to ``lead_created_at + snapshot_day``, + # and ``BANNED_TABLES`` (``customers`` / ``subscriptions``) are + # omitted entirely. The feature-level redaction below still applies + # on top — the two policies operate on disjoint columns + # (snapshot-safe owns the structural reconstruction surface; + # ``redacted_columns_for`` owns near-deterministic snapshot + # features), so they neither double-emit nor overlap. # ------------------------------------------------------------------ tables_dir = root / "tables" tables_dir.mkdir(exist_ok=True) dfs = to_dataframes(result, population) + if bundle_filter.relational_snapshot_safe: + if config.snapshot_day is None: + raise ValueError( + f"exposure_mode={config.exposure_mode.value!r} requires " + "config.snapshot_day to be set (the snapshot-safe relational " + "export filters event tables to lead_created_at + snapshot_day); " + "got snapshot_day=None. Pin a snapshot_day on the recipe or " + "pass it explicitly." + ) + dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day) table_row_counts: dict[str, int] = {} for table_name, df in dfs.items(): if redacted: @@ -136,5 +161,6 @@ def write_bundle( bundle_root=root, generation_timestamp=generation_timestamp, redacted_columns=sorted(redacted), + relational_snapshot_safe=bundle_filter.relational_snapshot_safe, ) write_manifest(manifest, root) diff --git a/leadforge/exposure/filters.py b/leadforge/exposure/filters.py index af26089..61bae36 100644 --- a/leadforge/exposure/filters.py +++ b/leadforge/exposure/filters.py @@ -28,15 +28,33 @@ class BundleFilter: write_metadata: Whether to create ``metadata/`` with hidden-truth files (``graph.json``, ``graph.graphml``, ``world_spec.json``, ``latent_registry.json``, ``mechanism_summary.json``). + relational_snapshot_safe: Whether the relational ``tables/`` dict + must be projected onto the snapshot-safe shape before being + written. When ``True``, the bundle writer routes through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, + which strips :data:`leadforge.validation.relational_leakage.BANNED_LEAD_COLUMNS` + from ``leads``, :data:`~leadforge.validation.relational_leakage.BANNED_OPP_COLUMNS` + from ``opportunities``, filters event tables per-lead by + ``lead_created_at + snapshot_day``, and omits + :data:`~leadforge.validation.relational_leakage.BANNED_TABLES` + (``customers`` / ``subscriptions``) entirely. When ``False``, + the writer emits the full-horizon export. """ write_metadata: bool + relational_snapshot_safe: bool #: Canonical filter rules for every supported exposure mode. FILTERS: dict[ExposureMode, BundleFilter] = { - ExposureMode.student_public: BundleFilter(write_metadata=False), - ExposureMode.research_instructor: BundleFilter(write_metadata=True), + ExposureMode.student_public: BundleFilter( + write_metadata=False, + relational_snapshot_safe=True, + ), + ExposureMode.research_instructor: BundleFilter( + write_metadata=True, + relational_snapshot_safe=False, + ), } diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 27023c5..4f03bfd 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -36,7 +36,19 @@ # assuming "features computed over full horizon" must update. # ``manifest.snapshot_day`` recorded so the contract is # self-describing (``null`` means full-horizon, legacy behaviour). -BUNDLE_SCHEMA_VERSION = "4" +# "5" — PR 2.2: ``student_public`` bundles route through the +# snapshot-safe relational export ( +# :mod:`leadforge.render.relational_snapshot_safe`). Public +# ``leads`` drops ``converted_within_90_days`` / +# ``conversion_timestamp``; public ``opportunities`` drops +# ``close_outcome`` / ``closed_at``; public bundles omit +# ``customers`` / ``subscriptions``; event tables filtered +# per-lead to ``lead_created_at + snapshot_day``. +# ``manifest.relational_snapshot_safe`` records the contract so +# consumers / validators can tell from the bundle alone whether +# the tables are snapshot-safe. ``research_instructor`` bundles +# keep the full-horizon export (``relational_snapshot_safe = false``). +BUNDLE_SCHEMA_VERSION = "5" # Manifest fields whose value is non-deterministic by design (wall-clock, # host metadata, etc.). Determinism checks must ignore these fields when @@ -52,6 +64,7 @@ def build_manifest( bundle_root: Path, generation_timestamp: str | None = None, redacted_columns: list[str] | None = None, + relational_snapshot_safe: bool = False, ) -> dict[str, Any]: """Build the bundle manifest dict. @@ -71,6 +84,13 @@ def build_manifest( this exposure mode. Recorded in the manifest so consumers (and the validator) can audit redaction without inspecting package internals. Defaults to ``[]`` (nothing redacted). + relational_snapshot_safe: ``True`` if the relational ``tables/`` + were projected through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe` + before being written. Recorded in the manifest so a tool + reading a v5+ bundle can tell from the manifest alone whether + ``tables/`` is the snapshot-safe (public) shape or the + full-horizon (instructor) shape. Defaults to ``False``. Returns: A JSON-serialisable dict ready to be written as ``manifest.json``. @@ -117,6 +137,7 @@ def build_manifest( "snapshot_day": config.snapshot_day, "motif_family": world_graph.motif_family, "redacted_columns": redacted_columns_list, + "relational_snapshot_safe": bool(relational_snapshot_safe), "tables": tables, "tasks": tasks, } diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index 715336e..86bff36 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -15,12 +15,18 @@ import pyarrow.parquet as pq from leadforge.core.enums import ExposureMode +from leadforge.core.exceptions import LeadforgeError from leadforge.core.hashing import file_sha256 from leadforge.core.serialization import load_json from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for from leadforge.schema.relationships import ALL_CONSTRAINTS from leadforge.validation.difficulty import check_difficulty from leadforge.validation.realism import check_realism +from leadforge.validation.relational_leakage import ( + BANNED_TABLES, + LeakageReport, + run_all_probes, +) def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[str]: @@ -44,9 +50,10 @@ def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[ tables, table_errors = _check_tables(bundle_root, manifest) errors.extend(table_errors) errors.extend(_check_task_splits(bundle_root, manifest)) - errors.extend(_check_fk_integrity(tables)) + errors.extend(_check_fk_integrity(tables, manifest)) errors.extend(_check_leakage(bundle_root, manifest)) errors.extend(_check_exposure_redaction(bundle_root, manifest)) + errors.extend(_check_relational_leakage(bundle_root, manifest)) if include_realism: errors.extend(check_realism(bundle_root, manifest)) @@ -133,13 +140,22 @@ def _check_task_splits(root: Path, manifest: dict[str, Any]) -> list[str]: return errors -def _check_fk_integrity(tables: dict[str, pd.DataFrame]) -> list[str]: +def _check_fk_integrity(tables: dict[str, pd.DataFrame], manifest: dict[str, Any]) -> list[str]: + # In snapshot-safe (public) bundles ``customers`` / ``subscriptions`` + # are intentionally absent — emitting "FK check skipped" for them + # would be a false positive. The expected-absent set is the same + # ``BANNED_TABLES`` constant the writer omits. + snapshot_safe = bool(manifest.get("relational_snapshot_safe", False)) + expected_absent = set(BANNED_TABLES) if snapshot_safe else set() + errors: list[str] = [] for fk in ALL_CONSTRAINTS: child_df = tables.get(fk.child_table) parent_df = tables.get(fk.parent_table) if child_df is None or parent_df is None: missing = fk.child_table if child_df is None else fk.parent_table + if missing in expected_absent: + continue errors.append( f"FK check skipped: {fk.child_table}.{fk.child_column} → " f"{fk.parent_table}.{fk.parent_column} " @@ -260,3 +276,38 @@ def _check_exposure_redaction(root: Path, manifest: dict[str, Any]) -> list[str] ) return errors + + +def _check_relational_leakage(root: Path, manifest: dict[str, Any]) -> list[str]: + """Run :func:`run_all_probes` on snapshot-safe (public) bundles. + + Skips ``research_instructor`` bundles entirely — they retain the + full hidden truth (label column, customers, subscriptions, + ``close_outcome``) by design, so the leakage probes would be a + false positive there. The bonus-model probe stays off (PR 3.3 + will calibrate per-tier bands). + + Each :class:`~leadforge.validation.relational_leakage.LeakageFinding` + is rendered as one error string keeping the existing + ``validate_bundle`` contract (return list of strings, empty = pass). + """ + mode_str = manifest.get("exposure_mode") + if mode_str != ExposureMode.student_public.value: + return [] + + snapshot_day = manifest.get("snapshot_day") + if snapshot_day is None: + return [ + "Cannot run relational-leakage probes: manifest.snapshot_day is null " + "but exposure_mode is student_public (snapshot-safe contract requires " + "a windowed snapshot)." + ] + if not isinstance(snapshot_day, int) or isinstance(snapshot_day, bool): + return [f"Manifest snapshot_day must be an int, got {type(snapshot_day).__name__}"] + + try: + report: LeakageReport = run_all_probes(root, snapshot_day=snapshot_day) + except (FileNotFoundError, ValueError, LeadforgeError) as exc: + return [f"Relational-leakage probe failed: {type(exc).__name__}: {exc}"] + + return [f"Relational leakage [{f.channel}] {f.detail}: {f.message}" for f in report.findings] diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py index 58799e7..a547c14 100644 --- a/leadforge/validation/invariants.py +++ b/leadforge/validation/invariants.py @@ -18,6 +18,12 @@ from leadforge.core.hashing import file_sha256 from leadforge.render.manifests import NON_DETERMINISTIC_MANIFEST_FIELDS from leadforge.schema.features import redacted_columns_for +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + SNAPSHOT_FILTERED_TABLES, +) def check_determinism(bundle_a: Path, bundle_b: Path) -> list[str]: @@ -185,7 +191,27 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - "for at least one shared feature" ) - # Both must have the same tables with identical content + # Both must have the same tables with identical content, modulo the + # snapshot-safe export contract (PR 2.2 / bundle schema v5): + # + # * Student is allowed to omit ``BANNED_TABLES`` (``customers`` / + # ``subscriptions``) — these are conversion-conditional and thus + # leak the label. + # * Student's ``leads`` is allowed to drop ``BANNED_LEAD_COLUMNS``; + # ``opportunities`` is allowed to drop ``BANNED_OPP_COLUMNS``. + # * Student's event tables in ``SNAPSHOT_FILTERED_TABLES`` are + # allowed to be a *row-subset* of instructor (filtered to + # ``lead_created_at + snapshot_day``). + # + # In all cases, student is still a subset of instructor on every + # column / row that survives the contract. + expected_redacted = redacted_columns_for(ExposureMode.student_public) + snapshot_filtered_table_names = {name for name, _ in SNAPSHOT_FILTERED_TABLES} + extra_columns_allowed_per_table: dict[str, set[str]] = { + "leads": set(BANNED_LEAD_COLUMNS), + "opportunities": set(BANNED_OPP_COLUMNS), + } + student_tables = ( {p.name for p in (student_bundle / "tables").glob("*.parquet")} if (student_bundle / "tables").exists() @@ -199,25 +225,35 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - missing_from_instructor = student_tables - instructor_tables if missing_from_instructor: errors.append(f"Tables in student but not instructor: {sorted(missing_from_instructor)}") - extra_in_instructor = instructor_tables - student_tables + expected_only_in_instructor = {f"{name}.parquet" for name in BANNED_TABLES} + extra_in_instructor = instructor_tables - student_tables - expected_only_in_instructor if extra_in_instructor: errors.append(f"Tables in instructor but not student: {sorted(extra_in_instructor)}") - expected_redacted = redacted_columns_for(ExposureMode.student_public) for table in sorted(student_tables & instructor_tables): s_path = student_bundle / "tables" / table i_path = instructor_bundle / "tables" / table if file_sha256(s_path) == file_sha256(i_path): continue - # Mismatch is acceptable iff the only difference is redacted - # columns (same logic as for task splits below). s_df = pd.read_parquet(s_path) i_df = pd.read_parquet(i_path) - if len(s_df) != len(i_df): + + table_name = table[: -len(".parquet")] + # Row counts can differ for snapshot-filtered event tables; the + # contract is "student rows ⊆ instructor rows", checked below by + # comparing shared columns on the inner join. + is_snapshot_filtered = table_name in snapshot_filtered_table_names + if not is_snapshot_filtered and len(s_df) != len(i_df): errors.append( f"Table {table}: row count mismatch student={len(s_df)} instructor={len(i_df)}" ) continue + if is_snapshot_filtered and len(s_df) > len(i_df): + errors.append( + f"Table {table}: student has more rows than instructor " + f"({len(s_df)} > {len(i_df)}); snapshot-safe export must be a row-subset" + ) + continue s_cols = set(s_df.columns) i_cols = set(i_df.columns) extra_in_student = s_cols - i_cols @@ -228,17 +264,33 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - ) continue diff = i_cols - s_cols - if not diff.issubset(expected_redacted): + allowed = expected_redacted | extra_columns_allowed_per_table.get(table_name, set()) + if not diff.issubset(allowed): errors.append( f"Table {table}: instructor−student column diff {sorted(diff)} contains " - f"non-redacted columns (expected subset of {sorted(expected_redacted)})" + f"non-redacted columns (expected subset of {sorted(allowed)})" ) continue shared = [c for c in s_df.columns if c in i_df.columns] - s_shared = s_df[shared].reset_index(drop=True) - i_shared = i_df[shared].reset_index(drop=True) - if not s_shared.equals(i_shared): - errors.append(f"Table {table}: shared-column values differ between modes") + if is_snapshot_filtered and len(s_df) != len(i_df): + # Student is a row-subset; verify each student row appears + # verbatim in instructor (joined by the natural primary key + # of the table when one is available, else by full-row + # equality on the shared columns). + s_view = s_df[shared].reset_index(drop=True) + i_view = i_df[shared].reset_index(drop=True) + merged = s_view.merge(i_view, how="left", indicator=True) + unmatched = int((merged["_merge"] != "both").sum()) + if unmatched: + errors.append( + f"Table {table}: {unmatched} student row(s) not present in instructor " + "(snapshot-safe export must keep instructor's row content verbatim)" + ) + else: + s_shared = s_df[shared].reset_index(drop=True) + i_shared = i_df[shared].reset_index(drop=True) + if not s_shared.equals(i_shared): + errors.append(f"Table {table}: shared-column values differ between modes") # Both must have the same task splits with identical content student_tasks = ( diff --git a/tests/render/test_bundle_schema_v4_contract.py b/tests/render/test_bundle_schema_v4_contract.py deleted file mode 100644 index b839471..0000000 --- a/tests/render/test_bundle_schema_v4_contract.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Schema contract test for ``bundle_schema_version == "4"``. - -The constants below are an *intentional* duplication of the column sets -the bundle writer produces. The duplication is the point: any change to -``LEAD_SNAPSHOT_FEATURES``, ``LeadRow``, or the redaction policy that -also changes the published column set must update this contract. A bare -"add a new feature" PR that touches the spec but not this file will fail -here, forcing the author to either update the contract (and bump -``BUNDLE_SCHEMA_VERSION``) or revisit the change. - -v4 vs v3: the column SET is unchanged. The bump records the semantic -shift introduced by the windowed snapshot — event-aggregate column -VALUES are now anchored at ``snapshot_day`` instead of the full horizon. -``manifest.snapshot_day`` makes the contract self-describing. - -If you find yourself wondering "do I have to update this?": yes. That -is the failure mode this test is designed to catch. -""" - -from __future__ import annotations - -import json -from pathlib import Path - -import pyarrow.parquet as pq -import pytest - -from leadforge.api.generator import Generator - -# Pinned column lists for bundle schema v4. Update *together* with -# ``BUNDLE_SCHEMA_VERSION`` and ``LEAD_SNAPSHOT_FEATURES``. - -V4_TASK_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( - { - "account_id", - "industry", - "region", - "employee_band", - "estimated_revenue_band", - "process_maturity_band", - "contact_id", - "role_function", - "seniority", - "buyer_role", - "lead_id", - "lead_created_at", - "lead_source", - "first_touch_channel", - "touch_count", - "inbound_touch_count", - "outbound_touch_count", - "session_count", - "pricing_page_views", - "demo_page_views", - "total_session_duration_seconds", - "touches_week_1", - "touches_last_7_days", - "days_since_first_touch", - "activity_count", - "days_since_last_touch", - "opportunity_created", - "has_open_opportunity", - "opportunity_estimated_acv", - "expected_acv", - "total_touches_all", - "converted_within_90_days", - } -) - -V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V4_TASK_COLUMNS_STUDENT_PUBLIC | { - "current_stage", - "is_sql", -} - -V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( - { - "lead_id", - "contact_id", - "account_id", - "lead_created_at", - "lead_source", - "first_touch_channel", - # ``current_stage``, ``is_sql`` redacted in student_public - "owner_rep_id", - "converted_within_90_days", - "conversion_timestamp", - } -) - -V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC | { - "current_stage", - "is_sql", -} - -_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} - - -def _build(mode: str, out: Path, seed: int = 42) -> None: - gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) - gen.generate(**_SMALL).save(str(out)) - - -@pytest.fixture(scope="module") -def student_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("v4_student") - _build("student_public", out) - return out - - -@pytest.fixture(scope="module") -def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("v4_instructor") - _build("research_instructor", out) - return out - - -def _task_cols(bundle: Path) -> frozenset[str]: - return frozenset(pq.read_schema(bundle / "tasks/converted_within_90_days/train.parquet").names) - - -def _leads_cols(bundle: Path) -> frozenset[str]: - return frozenset(pq.read_schema(bundle / "tables/leads.parquet").names) - - -def test_manifest_declares_v4(student_bundle: Path, instructor_bundle: Path) -> None: - for b in (student_bundle, instructor_bundle): - manifest = json.loads((b / "manifest.json").read_text()) - assert manifest["bundle_schema_version"] == "4", ( - f"{b.name}: bundle_schema_version is {manifest['bundle_schema_version']!r}, " - "expected '4'" - ) - - -def test_manifest_records_snapshot_day(student_bundle: Path, instructor_bundle: Path) -> None: - """v4 contract: manifest must surface ``snapshot_day`` so consumers can - distinguish full-horizon (legacy) bundles from windowed bundles.""" - for b in (student_bundle, instructor_bundle): - manifest = json.loads((b / "manifest.json").read_text()) - assert "snapshot_day" in manifest, f"{b.name}: manifest is missing 'snapshot_day' field" - # The b2b_saas_procurement_v1 recipe pins snapshot_day=30. - assert manifest["snapshot_day"] == 30, ( - f"{b.name}: snapshot_day is {manifest['snapshot_day']!r}, expected 30" - ) - - -def test_student_public_task_columns_match_v4_contract(student_bundle: Path) -> None: - actual = _task_cols(student_bundle) - assert actual == V4_TASK_COLUMNS_STUDENT_PUBLIC, ( - f"student_public task split columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_TASK_COLUMNS_STUDENT_PUBLIC)}\n" - f" missing: {sorted(V4_TASK_COLUMNS_STUDENT_PUBLIC - actual)}\n" - " → either update tests/render/test_bundle_schema_v4_contract.py and " - "bump BUNDLE_SCHEMA_VERSION, or revert the schema change." - ) - - -def test_research_instructor_task_columns_match_v4_contract(instructor_bundle: Path) -> None: - actual = _task_cols(instructor_bundle) - assert actual == V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR, ( - f"research_instructor task split columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR)}\n" - f" missing: {sorted(V4_TASK_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" - ) - - -def test_student_public_leads_table_columns_match_v4_contract(student_bundle: Path) -> None: - actual = _leads_cols(student_bundle) - assert actual == V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC, ( - f"student_public tables/leads.parquet columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC)}\n" - f" missing: {sorted(V4_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC - actual)}" - ) - - -def test_research_instructor_leads_table_columns_match_v4_contract( - instructor_bundle: Path, -) -> None: - actual = _leads_cols(instructor_bundle) - assert actual == V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR, ( - f"research_instructor tables/leads.parquet columns drifted from v4 contract.\n" - f" unexpected: {sorted(actual - V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR)}\n" - f" missing: {sorted(V4_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" - ) - - -def test_student_public_redacted_set_in_manifest(student_bundle: Path) -> None: - manifest = json.loads((student_bundle / "manifest.json").read_text()) - assert set(manifest["redacted_columns"]) == {"current_stage", "is_sql"} - - -def test_research_instructor_redacted_set_in_manifest(instructor_bundle: Path) -> None: - manifest = json.loads((instructor_bundle / "manifest.json").read_text()) - assert manifest["redacted_columns"] == [] - - -def test_is_mql_absent_from_all_v4_artifacts(student_bundle: Path, instructor_bundle: Path) -> None: - """``is_mql`` was removed entirely in v3 (issue #57) and remains absent in v4: - not in the snapshot, not in the relational table, not in the feature - dictionary, not anywhere.""" - for b in (student_bundle, instructor_bundle): - assert "is_mql" not in _task_cols(b) - assert "is_mql" not in _leads_cols(b) diff --git a/tests/render/test_bundle_schema_v5_contract.py b/tests/render/test_bundle_schema_v5_contract.py new file mode 100644 index 0000000..ac50774 --- /dev/null +++ b/tests/render/test_bundle_schema_v5_contract.py @@ -0,0 +1,315 @@ +"""Schema contract test for ``bundle_schema_version == "5"``. + +The constants below are an *intentional* duplication of the column / +table sets the bundle writer produces. The duplication is the point: +any change to ``LEAD_SNAPSHOT_FEATURES``, ``LeadRow``, the redaction +policy, or the snapshot-safe contract (``BANNED_LEAD_COLUMNS`` / +``BANNED_OPP_COLUMNS`` / ``BANNED_TABLES``) that also changes the +published shape must update this contract. A bare "add a new feature" +PR that touches the spec but not this file will fail here, forcing the +author to either update the contract (and bump +``BUNDLE_SCHEMA_VERSION``) or revisit the change. + +v5 vs v4: ``student_public`` bundles now route through the +snapshot-safe relational export. Public ``leads.parquet`` drops +``converted_within_90_days`` and ``conversion_timestamp``; public +``opportunities.parquet`` drops ``close_outcome`` and ``closed_at``; +public bundles omit ``customers`` and ``subscriptions`` entirely; event +tables are filtered per-lead to ``lead_created_at + snapshot_day``. +``manifest.relational_snapshot_safe`` records the contract so the +bundle is self-describing. ``research_instructor`` bundles keep the +full-horizon export. + +Task split column SET is unchanged from v4 — the structural fix lives +in ``tables/``, not the snapshot. + +If you find yourself wondering "do I have to update this?": yes. That +is the failure mode this test is designed to catch. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pyarrow.parquet as pq +import pytest + +from leadforge.api.generator import Generator + +# Pinned column / table sets for bundle schema v5. Update *together* +# with ``BUNDLE_SCHEMA_VERSION``, ``LEAD_SNAPSHOT_FEATURES``, and the +# snapshot-safe contract constants in +# ``leadforge.validation.relational_leakage``. + +V5_TASK_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + { + "account_id", + "industry", + "region", + "employee_band", + "estimated_revenue_band", + "process_maturity_band", + "contact_id", + "role_function", + "seniority", + "buyer_role", + "lead_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + "touch_count", + "inbound_touch_count", + "outbound_touch_count", + "session_count", + "pricing_page_views", + "demo_page_views", + "total_session_duration_seconds", + "touches_week_1", + "touches_last_7_days", + "days_since_first_touch", + "activity_count", + "days_since_last_touch", + "opportunity_created", + "has_open_opportunity", + "opportunity_estimated_acv", + "expected_acv", + "total_touches_all", + "converted_within_90_days", + } +) + +V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = V5_TASK_COLUMNS_STUDENT_PUBLIC | { + "current_stage", + "is_sql", +} + +# v5: public ``leads.parquet`` drops ``converted_within_90_days`` and +# ``conversion_timestamp`` (BANNED_LEAD_COLUMNS). +V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + { + "lead_id", + "contact_id", + "account_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + # ``current_stage``, ``is_sql`` redacted in student_public + "owner_rep_id", + # ``converted_within_90_days`` / ``conversion_timestamp`` dropped + # by the snapshot-safe export (PR 2.2) + } +) + +V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR: frozenset[str] = frozenset( + { + "lead_id", + "contact_id", + "account_id", + "lead_created_at", + "lead_source", + "first_touch_channel", + "current_stage", + "is_sql", + "owner_rep_id", + "converted_within_90_days", + "conversion_timestamp", + } +) + +# v5: public ``opportunities.parquet`` drops ``close_outcome`` and +# ``closed_at`` (BANNED_OPP_COLUMNS). +V5_OPP_TABLE_BANNED_COLUMNS_STUDENT_PUBLIC: frozenset[str] = frozenset( + {"close_outcome", "closed_at"} +) + +# v5: public bundles must NOT include these tables. +V5_PUBLIC_BANNED_TABLES: frozenset[str] = frozenset({"customers", "subscriptions"}) + +# v5: tables that ARE expected in public bundles. +V5_PUBLIC_EXPECTED_TABLES: frozenset[str] = frozenset( + { + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + } +) + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +def _build(mode: str, out: Path, seed: int = 42) -> None: + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) + gen.generate(**_SMALL).save(str(out)) + + +@pytest.fixture(scope="module") +def student_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("v5_student") + _build("student_public", out) + return out + + +@pytest.fixture(scope="module") +def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("v5_instructor") + _build("research_instructor", out) + return out + + +def _task_cols(bundle: Path) -> frozenset[str]: + return frozenset(pq.read_schema(bundle / "tasks/converted_within_90_days/train.parquet").names) + + +def _table_cols(bundle: Path, table_name: str) -> frozenset[str]: + return frozenset(pq.read_schema(bundle / f"tables/{table_name}.parquet").names) + + +def _leads_cols(bundle: Path) -> frozenset[str]: + return _table_cols(bundle, "leads") + + +def test_manifest_declares_v5(student_bundle: Path, instructor_bundle: Path) -> None: + for b in (student_bundle, instructor_bundle): + manifest = json.loads((b / "manifest.json").read_text()) + assert manifest["bundle_schema_version"] == "5", ( + f"{b.name}: bundle_schema_version is {manifest['bundle_schema_version']!r}, " + "expected '5'" + ) + + +def test_manifest_records_snapshot_day(student_bundle: Path, instructor_bundle: Path) -> None: + for b in (student_bundle, instructor_bundle): + manifest = json.loads((b / "manifest.json").read_text()) + assert "snapshot_day" in manifest, f"{b.name}: manifest is missing 'snapshot_day' field" + assert manifest["snapshot_day"] == 30, ( + f"{b.name}: snapshot_day is {manifest['snapshot_day']!r}, expected 30" + ) + + +def test_manifest_records_relational_snapshot_safe( + student_bundle: Path, instructor_bundle: Path +) -> None: + """v5 contract: manifest must surface ``relational_snapshot_safe`` so a tool + reading a bundle can tell from the manifest alone whether ``tables/`` is the + snapshot-safe (public) shape or the full-horizon (instructor) shape.""" + student_manifest = json.loads((student_bundle / "manifest.json").read_text()) + assert student_manifest["relational_snapshot_safe"] is True, ( + "student_public bundle must declare relational_snapshot_safe = true" + ) + + instructor_manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert instructor_manifest["relational_snapshot_safe"] is False, ( + "research_instructor bundle must declare relational_snapshot_safe = false" + ) + + +def test_student_public_task_columns_match_v5_contract(student_bundle: Path) -> None: + actual = _task_cols(student_bundle) + assert actual == V5_TASK_COLUMNS_STUDENT_PUBLIC, ( + f"student_public task split columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_TASK_COLUMNS_STUDENT_PUBLIC)}\n" + f" missing: {sorted(V5_TASK_COLUMNS_STUDENT_PUBLIC - actual)}\n" + " → either update tests/render/test_bundle_schema_v5_contract.py and " + "bump BUNDLE_SCHEMA_VERSION, or revert the schema change." + ) + + +def test_research_instructor_task_columns_match_v5_contract(instructor_bundle: Path) -> None: + actual = _task_cols(instructor_bundle) + assert actual == V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR, ( + f"research_instructor task split columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR)}\n" + f" missing: {sorted(V5_TASK_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" + ) + + +def test_student_public_leads_table_columns_match_v5_contract(student_bundle: Path) -> None: + actual = _leads_cols(student_bundle) + assert actual == V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC, ( + f"student_public tables/leads.parquet columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC)}\n" + f" missing: {sorted(V5_LEAD_TABLE_COLUMNS_STUDENT_PUBLIC - actual)}" + ) + + +def test_research_instructor_leads_table_columns_match_v5_contract( + instructor_bundle: Path, +) -> None: + actual = _leads_cols(instructor_bundle) + assert actual == V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR, ( + f"research_instructor tables/leads.parquet columns drifted from v5 contract.\n" + f" unexpected: {sorted(actual - V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR)}\n" + f" missing: {sorted(V5_LEAD_TABLE_COLUMNS_RESEARCH_INSTRUCTOR - actual)}" + ) + + +def test_student_public_opportunities_drop_banned_columns(student_bundle: Path) -> None: + actual = _table_cols(student_bundle, "opportunities") + leak = actual & V5_OPP_TABLE_BANNED_COLUMNS_STUDENT_PUBLIC + assert not leak, ( + f"student_public tables/opportunities.parquet retains banned columns: {sorted(leak)}.\n" + " → snapshot-safe export must drop close_outcome / closed_at." + ) + + +def test_research_instructor_opportunities_keep_banned_columns(instructor_bundle: Path) -> None: + """Instructor bundles retain the full-horizon shape — those columns are + *not* banned for them, they're load-bearing for hidden-truth analysis.""" + actual = _table_cols(instructor_bundle, "opportunities") + assert "close_outcome" in actual, "research_instructor must retain close_outcome" + assert "closed_at" in actual, "research_instructor must retain closed_at" + + +def test_student_public_omits_banned_tables(student_bundle: Path) -> None: + tables_dir = student_bundle / "tables" + for name in V5_PUBLIC_BANNED_TABLES: + assert not (tables_dir / f"{name}.parquet").exists(), ( + f"student_public bundle must not include tables/{name}.parquet" + ) + + manifest = json.loads((student_bundle / "manifest.json").read_text()) + declared = set(manifest["tables"].keys()) + assert not (declared & V5_PUBLIC_BANNED_TABLES), ( + f"manifest.tables must not list banned tables: {sorted(declared & V5_PUBLIC_BANNED_TABLES)}" + ) + + +def test_student_public_includes_expected_tables(student_bundle: Path) -> None: + tables_dir = student_bundle / "tables" + for name in V5_PUBLIC_EXPECTED_TABLES: + assert (tables_dir / f"{name}.parquet").exists(), ( + f"student_public bundle is missing required tables/{name}.parquet" + ) + + +def test_research_instructor_keeps_banned_tables(instructor_bundle: Path) -> None: + """Instructor bundles retain ``customers`` / ``subscriptions``.""" + tables_dir = instructor_bundle / "tables" + for name in V5_PUBLIC_BANNED_TABLES: + assert (tables_dir / f"{name}.parquet").exists(), ( + f"research_instructor bundle must include tables/{name}.parquet" + ) + + +def test_student_public_redacted_set_in_manifest(student_bundle: Path) -> None: + manifest = json.loads((student_bundle / "manifest.json").read_text()) + assert set(manifest["redacted_columns"]) == {"current_stage", "is_sql"} + + +def test_research_instructor_redacted_set_in_manifest(instructor_bundle: Path) -> None: + manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert manifest["redacted_columns"] == [] + + +def test_is_mql_absent_from_all_v5_artifacts(student_bundle: Path, instructor_bundle: Path) -> None: + """``is_mql`` was removed entirely in v3 (issue #57) and remains absent in v5: + not in the snapshot, not in the relational table, not in the feature + dictionary, not anywhere.""" + for b in (student_bundle, instructor_bundle): + assert "is_mql" not in _task_cols(b) + assert "is_mql" not in _leads_cols(b) diff --git a/tests/render/test_render.py b/tests/render/test_render.py index aad4db1..ddeadcf 100644 --- a/tests/render/test_render.py +++ b/tests/render/test_render.py @@ -23,7 +23,12 @@ def _make_config(seed: int = 42, n_leads: int = 80) -> GenerationConfig: - return GenerationConfig(seed=seed, n_accounts=30, n_contacts=90, n_leads=n_leads) + # ``snapshot_day=30`` matches the recipe pin and satisfies the + # student_public writer contract introduced in PR 2.2 (the + # snapshot-safe export needs a finite window). + return GenerationConfig( + seed=seed, n_accounts=30, n_contacts=90, n_leads=n_leads, snapshot_day=30 + ) def _make_narrative(seed: int = 42): diff --git a/tests/test_pr_2_2_integration.py b/tests/test_pr_2_2_integration.py new file mode 100644 index 0000000..f84877f --- /dev/null +++ b/tests/test_pr_2_2_integration.py @@ -0,0 +1,244 @@ +"""PR 2.2 integration tests — snapshot-safe routing through the writer. + +Covers the contract turned on in PR 2.2: ``student_public`` bundles +route ``tables/`` through +:func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe` +(structural fix for the alpha-bundle reconstruction paths A-E), +``research_instructor`` bundles keep the full-horizon export, and the +manifest is self-describing via ``relational_snapshot_safe`` and +``bundle_schema_version == "5"``. + +Tests fall into four groups: + +* Round-trip on both modes — both bundles must validate cleanly. +* Manifest contract — version + ``relational_snapshot_safe`` flag. +* Negative — a tampered "public" bundle (instructor copy moved into + the public slot) must trip the relational-leakage probes through + ``validate_bundle``, with the expected channels populated. +* Hash determinism — two pinned-timestamp builds produce + byte-identical bundle content (same in-process check the slower + ``scripts/verify_hash_determinism.py`` runs). +""" + +from __future__ import annotations + +import json +import shutil +from pathlib import Path + +import pyarrow.parquet as pq +import pytest + +from leadforge.api.generator import Generator +from leadforge.core.hashing import file_sha256 +from leadforge.validation.bundle_checks import validate_bundle +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + CHANNEL_BANNED_COLUMN, + CHANNEL_BANNED_TABLE, + CHANNEL_JOIN_RECONSTRUCTION, + run_all_probes, +) + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} +_PINNED_TS = "1970-01-01T00:00:00+00:00" + + +def _build( + mode: str, out: Path, seed: int = 42, *, generation_timestamp: str | None = None +) -> None: + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) + gen.generate(**_SMALL).save(str(out), generation_timestamp=generation_timestamp) + + +@pytest.fixture(scope="module") +def public_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("pr22_public") + _build("student_public", out) + return out + + +@pytest.fixture(scope="module") +def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("pr22_instructor") + _build("research_instructor", out) + return out + + +# --------------------------------------------------------------------------- +# Round-trip +# --------------------------------------------------------------------------- + + +class TestRoundTripValidates: + def test_public_bundle_validates(self, public_bundle: Path) -> None: + errors = validate_bundle(public_bundle) + assert errors == [], f"public bundle should validate clean, got: {errors}" + + def test_instructor_bundle_validates(self, instructor_bundle: Path) -> None: + errors = validate_bundle(instructor_bundle) + assert errors == [], f"instructor bundle should validate clean, got: {errors}" + + def test_public_bundle_omits_banned_tables_on_disk(self, public_bundle: Path) -> None: + for name in BANNED_TABLES: + assert not (public_bundle / "tables" / f"{name}.parquet").exists(), ( + f"public bundle must not write tables/{name}.parquet" + ) + + def test_instructor_bundle_keeps_banned_tables_on_disk(self, instructor_bundle: Path) -> None: + for name in BANNED_TABLES: + assert (instructor_bundle / "tables" / f"{name}.parquet").exists(), ( + f"instructor bundle must retain tables/{name}.parquet" + ) + + def test_public_leads_drops_banned_columns(self, public_bundle: Path) -> None: + cols = set(pq.read_schema(public_bundle / "tables/leads.parquet").names) + for c in BANNED_LEAD_COLUMNS: + assert c not in cols, f"leads.{c} must be absent from public bundle" + + def test_public_opportunities_drops_banned_columns(self, public_bundle: Path) -> None: + cols = set(pq.read_schema(public_bundle / "tables/opportunities.parquet").names) + for c in BANNED_OPP_COLUMNS: + assert c not in cols, f"opportunities.{c} must be absent from public bundle" + + def test_instructor_leads_keeps_banned_columns(self, instructor_bundle: Path) -> None: + cols = set(pq.read_schema(instructor_bundle / "tables/leads.parquet").names) + for c in BANNED_LEAD_COLUMNS: + assert c in cols, ( + f"leads.{c} must be retained in research_instructor bundle (full-horizon export)" + ) + + +# --------------------------------------------------------------------------- +# Manifest contract +# --------------------------------------------------------------------------- + + +class TestManifestContract: + def test_public_manifest_declares_v5(self, public_bundle: Path) -> None: + manifest = json.loads((public_bundle / "manifest.json").read_text()) + assert manifest["bundle_schema_version"] == "5" + + def test_instructor_manifest_declares_v5(self, instructor_bundle: Path) -> None: + manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert manifest["bundle_schema_version"] == "5" + + def test_public_manifest_relational_snapshot_safe_true(self, public_bundle: Path) -> None: + manifest = json.loads((public_bundle / "manifest.json").read_text()) + assert manifest["relational_snapshot_safe"] is True + + def test_instructor_manifest_relational_snapshot_safe_false( + self, instructor_bundle: Path + ) -> None: + manifest = json.loads((instructor_bundle / "manifest.json").read_text()) + assert manifest["relational_snapshot_safe"] is False + + def test_public_manifest_table_row_counts_match_disk(self, public_bundle: Path) -> None: + """Manifest row counts must come from the *post-redaction* dict so + consumers reading the manifest see the truth on disk, not the + pre-redaction full-horizon shape.""" + manifest = json.loads((public_bundle / "manifest.json").read_text()) + for name, info in manifest["tables"].items(): + actual = pq.read_metadata(public_bundle / f"tables/{name}.parquet").num_rows + assert info["row_count"] == actual, ( + f"manifest row_count for {name} ({info['row_count']}) " + f"disagrees with parquet ({actual})" + ) + + def test_public_manifest_does_not_list_banned_tables(self, public_bundle: Path) -> None: + manifest = json.loads((public_bundle / "manifest.json").read_text()) + for name in BANNED_TABLES: + assert name not in manifest["tables"], ( + f"manifest.tables must not list banned table {name!r}" + ) + + +# --------------------------------------------------------------------------- +# Negative — tampered public bundle +# --------------------------------------------------------------------------- + + +class TestTamperedPublicBundle: + """Hand-craft a tampered public bundle and verify ``validate_bundle`` + surfaces the leakage findings. Tampering = take an instructor bundle + (full-horizon shape) and rewrite its manifest to claim + ``student_public``. This is the structural attack the contract + defends against.""" + + def _make_tampered(self, instructor_bundle: Path, dest: Path) -> Path: + shutil.copytree(instructor_bundle, dest) + manifest = json.loads((dest / "manifest.json").read_text()) + manifest["exposure_mode"] = "student_public" + # ``redacted_columns`` would mismatch the public expectation + # (instructor has []); align it so we test the relational leakage + # path specifically and not the redaction-set check. + manifest["redacted_columns"] = ["current_stage", "is_sql"] + # Same reason: leave ``relational_snapshot_safe`` at False — that + # IS the lie this test exercises (manifest claims public, tables + # are full-horizon). + (dest / "manifest.json").write_text(json.dumps(manifest, indent=2)) + return dest + + def test_validate_surfaces_leakage_findings( + self, tmp_path: Path, instructor_bundle: Path + ) -> None: + tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered") + errors = validate_bundle(tampered, include_realism=False) + leak_errors = [e for e in errors if e.startswith("Relational leakage")] + assert leak_errors, ( + f"tampered public bundle must surface relational-leakage errors; got {errors}" + ) + + def test_findings_cover_expected_channels( + self, tmp_path: Path, instructor_bundle: Path + ) -> None: + tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered2") + manifest = json.loads((tampered / "manifest.json").read_text()) + report = run_all_probes(tampered, snapshot_day=manifest["snapshot_day"]) + channels = {f.channel for f in report.findings} + # An instructor bundle masquerading as public will trip: + # - banned columns (leads.converted_within_90_days, + # leads.conversion_timestamp, opportunities.close_outcome, + # opportunities.closed_at); + # - banned tables (customers, subscriptions); + # - join reconstruction (paths B/C/D fire because the + # conversion-conditional tables are present). + assert CHANNEL_BANNED_COLUMN in channels + assert CHANNEL_BANNED_TABLE in channels + assert CHANNEL_JOIN_RECONSTRUCTION in channels + + +# --------------------------------------------------------------------------- +# Hash determinism +# --------------------------------------------------------------------------- + + +class TestHashDeterminism: + """Pinning ``generation_timestamp`` should produce byte-identical bundle + content across runs. Mirrors the contract of + ``scripts/verify_hash_determinism.py`` for a single bundle.""" + + def _hash_tree(self, root: Path) -> dict[str, str]: + return { + str(p.relative_to(root)): file_sha256(p) for p in sorted(root.rglob("*")) if p.is_file() + } + + @pytest.mark.parametrize("mode", ["student_public", "research_instructor"]) + def test_pinned_timestamp_byte_identical(self, tmp_path: Path, mode: str) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _build(mode, a, generation_timestamp=_PINNED_TS) + _build(mode, b, generation_timestamp=_PINNED_TS) + + ha = self._hash_tree(a) + hb = self._hash_tree(b) + + assert set(ha.keys()) == set(hb.keys()), ( + f"file set differs across runs ({mode}): " + f"only_in_a={sorted(set(ha) - set(hb))} " + f"only_in_b={sorted(set(hb) - set(ha))}" + ) + diffs = [k for k in ha if ha[k] != hb[k]] + assert not diffs, f"hash mismatches across runs ({mode}): {diffs}" From d042658e75fa92ca79b54e58ebdaebbec2632163 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 22:43:51 +0300 Subject: [PATCH 2/4] docs(.agent-plan): mark Phase 2 PR 2.2 complete Records v5 schema bump, snapshot-safe routing through the writer + validator, regenerated public bundles (3 tiers), instructor bundle keeps full-horizon, regression gate (``probe_relational_leakage --max-accuracy 0.65``) flips to exit 0 on all public tiers, and 67/67 hash-determinism preserved. --- .agent-plan.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/.agent-plan.md b/.agent-plan.md index c568ed1..46a1afc 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -24,13 +24,15 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family - [x] Reproduce relational-leakage finding on alpha bundles → `docs/release/v1_current_state_audit.md` — all three tiers reconstruct `converted_within_90_days` at 100% via paths A–E; LR/HistGBM AUC = 1.000 on join-derived features. Probe script: `scripts/probe_relational_leakage.py` (function `deterministic_relational_reconstruction` designed to lift into PR 3.1's `leadforge/validation/leakage_probes.py`). - [x] Lock dataset release name `leadforge-lead-scoring-v1` (already locked via PR #61's milestone rename + roadmap edits; G1.1 reaffirmed) -### Phase 2 — Snapshot-safe relational export +### Phase 2 — Snapshot-safe relational export ✓ - [x] `leadforge/render/relational_snapshot_safe.py` (new) — PR 2.1: `to_dataframes_snapshot_safe(dfs, *, snapshot_day)` projects the full-horizon dict from `to_dataframes` onto the public-bundle shape (drops `BANNED_LEAD_COLUMNS` from leads, `BANNED_OPP_COLUMNS` from opportunities, filters event tables per-lead by `lead_created_at + snapshot_day`, omits `customers`/`subscriptions`, passes accounts/contacts unchanged). - [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: owns the snapshot-safe contract constants (`BANNED_LEAD_COLUMNS`, `BANNED_OPP_COLUMNS`, `BANNED_TABLES`, `SNAPSHOT_FILTERED_TABLES`); ships `LeakageFinding`/`LeakageReport`/`RelationalLeakageError(LeadforgeError)` plus five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, opt-in `probe_bonus_model_auc`) and two orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). The bonus-model probe is opt-in: orchestrators skip it unless the caller passes `bonus_model_max_auc=...` (PR 3.3 will calibrate per-tier bands). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. -- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` — PR 2.2. -- [ ] Wire `relational_snapshot_safe` through `leadforge/exposure/filters.py` and `leadforge/api/bundle.py`; plumb the leakage validator into `leadforge/validation/bundle_checks.py` — PR 2.2. -- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles — PR 2.2 (the structural rules are already enforced module-side; PR 2.2 turns them on for actual writes). -- [ ] Hash-determinism preserved on regenerated bundles — PR 2.2. +- [x] PR 2.2: `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe: bool` (self-describes whether `tables/` is the snapshot-safe public shape or full-horizon instructor shape). +- [x] PR 2.2: `BundleFilter.relational_snapshot_safe` flag (student_public True; research_instructor False); `leadforge/api/bundle.py` calls `to_dataframes_snapshot_safe(dfs, snapshot_day=...)` for student_public after `to_dataframes(...)`. `leadforge/validation/bundle_checks.py` runs `run_all_probes` on public bundles (bonus probe stays off — PR 3.3 calibrates); `_check_fk_integrity` silently skips `BANNED_TABLES` for snapshot-safe bundles. +- [x] PR 2.2: public `leads.parquet` drops `converted_within_90_days` / `conversion_timestamp`; public `opportunities.parquet` drops `close_outcome` / `closed_at`; public bundles omit `customers` / `subscriptions`. Confirmed by regenerated `release/{intro,intermediate,advanced}/manifest.json` (`tables` key: 7 entries, no `customers` / `subscriptions`); `release/intermediate_instructor/` retains full-horizon (9 tables). +- [x] PR 2.2: `scripts/probe_relational_leakage.py release/{intro,intermediate,advanced} --max-accuracy 0.65` exits 0 on all three public tiers (was exiting 2 on alpha bundles); all path prediction rates A-E = 0.000. +- [x] PR 2.2: hash-determinism preserved on regenerated bundles (`scripts/verify_hash_determinism.py`: 67/67 files identical across pinned-timestamp runs; was 73/73 before — drop reflects the 2 omitted public tables × 3 public tiers). +- [x] PR 2.2: `check_exposure_monotonicity` updated for v5 — student is allowed to omit `BANNED_TABLES`, drop `BANNED_LEAD_COLUMNS` / `BANNED_OPP_COLUMNS`, and be a row-subset of instructor on snapshot-filtered event tables. ### Phase 3 — Release validation hardening - [ ] `leadforge/validation/{release_quality,leakage_probes,reporting}.py` (new) From e352453481b80ecc9d9b8cc01343f0ee2b79820a Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 22:57:53 +0300 Subject: [PATCH 3/4] review(pr-2.2): manifest self-description, leakier-probe, robust monotonicity, test cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review pass on PR 2.2 surfaced several rough edges; this commit addresses them. Manifest self-description (Fix B) - ``manifest.structural_redactions`` enumerates the table-level drops applied by the snapshot-safe export (``columns: {leads, opportunities}`` + ``omitted_tables``). Together with ``relational_snapshot_safe`` and ``redacted_columns`` (snapshot-feature redactions), the manifest now fully describes what the writer dropped without consumers needing to read package internals. v5 contract test extended. Validator coverage when snapshot_day missing (Fix C) - ``_check_relational_leakage`` no longer skips every probe when ``manifest.snapshot_day`` is missing or malformed. The structural probes (banned columns / banned tables / deterministic-reconstruction) do not depend on snapshot_day and now run in that case; only the snapshot-window probe is skipped, with an explicit ``snapshot_window``- channel finding so the gap is visible. Robust monotonicity row-subset (Fix D) - ``check_exposure_monotonicity`` now verifies the snapshot-filtered row-subset relationship via primary keys (``touch_id`` / ``session_id`` / ``activity_id`` / ``opportunity_id``) instead of an all-column left-merge. PK-set inclusion is cheaper, correct under NaN, and reads better. Test cleanup (Fix E + F + G) - ``tests/test_pr_2_2_integration.py`` → ``tests/integration/test_snapshot_safe_bundle.py`` (no PR-named files; future-readable location). - Negative-case test now asserts the *details* of the findings (every banned column / table is named in the report; path B fires) rather than just channel-set membership. - Dropped the parametrized hash-determinism test from the integration file; instead extended ``test_invariants.py::TestDeterminism`` with a ``research_instructor`` case so both modes are covered in the canonical home for determinism tests. CI gate test - ``test_cli_max_accuracy_gate_fails_on_alpha`` → ``test_cli_max_accuracy_gate_passes_on_v5_bundle``: PR 2.2 fixed the leak the test was originally pointed at, so the regenerated bundle must now exit 0 instead of 2. Fix A (post-init check) considered and reverted - A ``GenerationConfig.__post_init__`` check for ``student_public + snapshot_day=None`` would surface the misconfiguration earlier, but its blast radius is too wide: ``ExposureMode`` defaults to ``student_public``, so any test that builds a bare ``GenerationConfig()`` for purposes unrelated to bundle writing (population / snapshot / threading tests) would break. The writer-level ``ValueError`` already raises with a clear message before any disk write — kept that, dropped the post-init duplicate. Verification - 1033 tests pass; ruff + mypy clean. - Bundles regenerated; hash determinism re-verified (67/67 files). - Regression gate ``probe_relational_leakage.py --max-accuracy 0.65`` exits 0 on intro / intermediate / advanced. --- leadforge/render/manifests.py | 28 ++++ leadforge/validation/bundle_checks.py | 88 +++++++++-- leadforge/validation/invariants.py | 42 +++-- .../test_snapshot_safe_bundle.py} | 146 +++++++----------- .../render/test_bundle_schema_v5_contract.py | 21 +++ .../scripts/test_probe_relational_leakage.py | 15 +- tests/validation/test_invariants.py | 27 +++- 7 files changed, 245 insertions(+), 122 deletions(-) rename tests/{test_pr_2_2_integration.py => integration/test_snapshot_safe_bundle.py} (59%) diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 4f03bfd..79bd73c 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -14,6 +14,11 @@ from typing import TYPE_CHECKING, Any from leadforge.core.hashing import file_sha256 +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, +) if TYPE_CHECKING: from leadforge.core.models import GenerationConfig @@ -138,11 +143,34 @@ def build_manifest( "motif_family": world_graph.motif_family, "redacted_columns": redacted_columns_list, "relational_snapshot_safe": bool(relational_snapshot_safe), + "structural_redactions": _build_structural_redactions(bool(relational_snapshot_safe)), "tables": tables, "tasks": tasks, } +def _build_structural_redactions(relational_snapshot_safe: bool) -> dict[str, Any]: + """Self-describing record of the table-level redactions applied at write. + + For snapshot-safe (public) bundles this enumerates every column the + snapshot-safe export drops from ``leads`` / ``opportunities`` and the + tables it omits entirely. For full-horizon (instructor) bundles + every list is empty. Together with ``manifest.redacted_columns`` + (the snapshot-feature redactions) and ``manifest.relational_snapshot_safe`` + (the contract flag) the manifest fully describes what the writer + dropped without the consumer needing to consult package internals. + """ + if not relational_snapshot_safe: + return {"columns": {}, "omitted_tables": []} + return { + "columns": { + "leads": sorted(BANNED_LEAD_COLUMNS), + "opportunities": sorted(BANNED_OPP_COLUMNS), + }, + "omitted_tables": sorted(BANNED_TABLES), + } + + def write_manifest(manifest: dict[str, Any], bundle_root: Path) -> Path: """Serialise *manifest* to ``bundle_root/manifest.json`` and return the path.""" path = bundle_root / "manifest.json" diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index 86bff36..674b9f8 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -25,6 +25,9 @@ from leadforge.validation.relational_leakage import ( BANNED_TABLES, LeakageReport, + probe_banned_columns, + probe_banned_tables, + probe_deterministic_reconstruction, run_all_probes, ) @@ -279,16 +282,24 @@ def _check_exposure_redaction(root: Path, manifest: dict[str, Any]) -> list[str] def _check_relational_leakage(root: Path, manifest: dict[str, Any]) -> list[str]: - """Run :func:`run_all_probes` on snapshot-safe (public) bundles. + """Run the relational-leakage probes on snapshot-safe (public) bundles. Skips ``research_instructor`` bundles entirely — they retain the full hidden truth (label column, customers, subscriptions, - ``close_outcome``) by design, so the leakage probes would be a - false positive there. The bonus-model probe stays off (PR 3.3 - will calibrate per-tier bands). + ``close_outcome``) by design, so the probes would be a false + positive there. The bonus-model probe stays off (PR 3.3 will + calibrate per-tier bands). + + The structural probes (banned columns, banned tables, deterministic + join reconstruction) do not depend on ``snapshot_day``. They run + even if the manifest is missing or has a malformed ``snapshot_day`` + — those are exactly the cases where users *most* need leakage + detection, not least. Only the snapshot-window probe is skipped + when ``snapshot_day`` is unavailable, with an explicit error + surfaced so the gap is visible. Each :class:`~leadforge.validation.relational_leakage.LeakageFinding` - is rendered as one error string keeping the existing + is rendered as one error string, keeping the existing ``validate_bundle`` contract (return list of strings, empty = pass). """ mode_str = manifest.get("exposure_mode") @@ -296,18 +307,63 @@ def _check_relational_leakage(root: Path, manifest: dict[str, Any]) -> list[str] return [] snapshot_day = manifest.get("snapshot_day") - if snapshot_day is None: - return [ - "Cannot run relational-leakage probes: manifest.snapshot_day is null " - "but exposure_mode is student_public (snapshot-safe contract requires " - "a windowed snapshot)." - ] - if not isinstance(snapshot_day, int) or isinstance(snapshot_day, bool): - return [f"Manifest snapshot_day must be an int, got {type(snapshot_day).__name__}"] + snapshot_day_usable = ( + isinstance(snapshot_day, int) and not isinstance(snapshot_day, bool) and snapshot_day >= 0 + ) + errors: list[str] = [] try: - report: LeakageReport = run_all_probes(root, snapshot_day=snapshot_day) + if snapshot_day_usable and isinstance(snapshot_day, int): + report: LeakageReport = run_all_probes(root, snapshot_day=snapshot_day) + else: + # Run the structural subset that does not need ``snapshot_day``. + tables = _read_relational_tables(root) + findings = list(probe_banned_columns(tables)) + findings += list(probe_banned_tables(tables.keys())) + findings += list(probe_deterministic_reconstruction(tables)) + report = LeakageReport(findings=tuple(findings)) + errors.append( + "Relational leakage [snapshot_window] manifest.snapshot_day is " + f"missing or malformed ({snapshot_day!r}); skipping the snapshot-" + "window probe. Structural probes (banned columns / tables / " + "join reconstruction) ran normally." + ) except (FileNotFoundError, ValueError, LeadforgeError) as exc: - return [f"Relational-leakage probe failed: {type(exc).__name__}: {exc}"] + return errors + [f"Relational-leakage probe failed: {type(exc).__name__}: {exc}"] + + errors.extend( + f"Relational leakage [{f.channel}] {f.detail}: {f.message}" for f in report.findings + ) + return errors + - return [f"Relational leakage [{f.channel}] {f.detail}: {f.message}" for f in report.findings] +def _read_relational_tables(root: Path) -> dict[str, pd.DataFrame]: + """Read every public + banned-table parquet under ``/tables/``. + + Mirrors the read logic in + :func:`leadforge.validation.relational_leakage.run_all_probes` but + is reusable for the snapshot_day-missing path above. + """ + from leadforge.validation.relational_leakage import ( + BANNED_TABLES as _BANNED, + ) + + tables_dir = root / "tables" + if not tables_dir.is_dir() or not (tables_dir / "leads.parquet").exists(): + raise FileNotFoundError(f"missing tables/leads.parquet under {root}") + + public_tables = ( + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + ) + out: dict[str, pd.DataFrame] = {} + for name in (*public_tables, *_BANNED): + path = tables_dir / f"{name}.parquet" + if path.exists(): + out[name] = pd.read_parquet(path) + return out diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py index a547c14..1278045 100644 --- a/leadforge/validation/invariants.py +++ b/leadforge/validation/invariants.py @@ -211,6 +211,18 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - "leads": set(BANNED_LEAD_COLUMNS), "opportunities": set(BANNED_OPP_COLUMNS), } + # Per-snapshot-filtered-table primary key. Used to verify the + # row-subset relationship cheaply and correctly under NaN: a left + # merge over all shared columns is fragile (NaN doesn't equal NaN + # in pandas, even on the same float), and the natural PK is enough + # to assert ``student.rows ⊆ instructor.rows`` since student rows + # were derived directly from instructor by row-filtering. + snapshot_filtered_pks: dict[str, str] = { + "touches": "touch_id", + "sessions": "session_id", + "sales_activities": "activity_id", + "opportunities": "opportunity_id", + } student_tables = ( {p.name for p in (student_bundle / "tables").glob("*.parquet")} @@ -273,18 +285,26 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - continue shared = [c for c in s_df.columns if c in i_df.columns] if is_snapshot_filtered and len(s_df) != len(i_df): - # Student is a row-subset; verify each student row appears - # verbatim in instructor (joined by the natural primary key - # of the table when one is available, else by full-row - # equality on the shared columns). - s_view = s_df[shared].reset_index(drop=True) - i_view = i_df[shared].reset_index(drop=True) - merged = s_view.merge(i_view, how="left", indicator=True) - unmatched = int((merged["_merge"] != "both").sum()) - if unmatched: + # Row-subset by primary key. Each student row was derived + # from instructor by row-filtering, so the PK relationship + # is the strongest invariant we can assert without + # depending on column-by-column equality (which is fragile + # under NaN). + pk = snapshot_filtered_pks.get(table_name) + if pk is None or pk not in s_df.columns or pk not in i_df.columns: + errors.append( + f"Table {table}: snapshot-filtered table missing expected " + f"primary key {pk!r}; cannot verify row-subset" + ) + continue + student_pks = set(s_df[pk].tolist()) + instructor_pks = set(i_df[pk].tolist()) + orphans = student_pks - instructor_pks + if orphans: + sample = sorted(orphans)[:3] errors.append( - f"Table {table}: {unmatched} student row(s) not present in instructor " - "(snapshot-safe export must keep instructor's row content verbatim)" + f"Table {table}: {len(orphans)} student {pk}(s) absent from instructor, " + f"e.g. {sample} (snapshot-safe export must be a row-subset)" ) else: s_shared = s_df[shared].reset_index(drop=True) diff --git a/tests/test_pr_2_2_integration.py b/tests/integration/test_snapshot_safe_bundle.py similarity index 59% rename from tests/test_pr_2_2_integration.py rename to tests/integration/test_snapshot_safe_bundle.py index f84877f..11ad772 100644 --- a/tests/test_pr_2_2_integration.py +++ b/tests/integration/test_snapshot_safe_bundle.py @@ -1,23 +1,28 @@ -"""PR 2.2 integration tests — snapshot-safe routing through the writer. +"""Integration tests for the snapshot-safe bundle write path (bundle schema v5). Covers the contract turned on in PR 2.2: ``student_public`` bundles route ``tables/`` through :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe` -(structural fix for the alpha-bundle reconstruction paths A-E), -``research_instructor`` bundles keep the full-horizon export, and the -manifest is self-describing via ``relational_snapshot_safe`` and -``bundle_schema_version == "5"``. - -Tests fall into four groups: - -* Round-trip on both modes — both bundles must validate cleanly. -* Manifest contract — version + ``relational_snapshot_safe`` flag. -* Negative — a tampered "public" bundle (instructor copy moved into - the public slot) must trip the relational-leakage probes through - ``validate_bundle``, with the expected channels populated. -* Hash determinism — two pinned-timestamp builds produce - byte-identical bundle content (same in-process check the slower - ``scripts/verify_hash_determinism.py`` runs). +(the structural fix against the alpha-bundle reconstruction paths +A-E), ``research_instructor`` bundles keep the full-horizon export, +and the manifest is self-describing via ``relational_snapshot_safe``, +``structural_redactions``, and ``bundle_schema_version == "5"``. + +Tests fall into three groups: + +* **Round-trip** — both modes write, both validate clean, on-disk + shape matches the contract (banned columns / banned tables drop in + public; both retained in instructor). +* **Manifest contract** — row counts match disk; banned tables are + not listed under ``manifest.tables``. +* **Negative** — a tampered "public" bundle (instructor copy moved + into the public slot) trips the relational-leakage probes through + ``validate_bundle``, with specific banned columns / tables / join + paths named in the findings (not just channel-set membership). + +Hash determinism is covered by +:class:`tests.validation.test_invariants.TestDeterminism` for both +modes — no need to duplicate it here. """ from __future__ import annotations @@ -30,7 +35,6 @@ import pytest from leadforge.api.generator import Generator -from leadforge.core.hashing import file_sha256 from leadforge.validation.bundle_checks import validate_bundle from leadforge.validation.relational_leakage import ( BANNED_LEAD_COLUMNS, @@ -43,26 +47,23 @@ ) _SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} -_PINNED_TS = "1970-01-01T00:00:00+00:00" -def _build( - mode: str, out: Path, seed: int = 42, *, generation_timestamp: str | None = None -) -> None: +def _build(mode: str, out: Path, seed: int = 42) -> None: gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) - gen.generate(**_SMALL).save(str(out), generation_timestamp=generation_timestamp) + gen.generate(**_SMALL).save(str(out)) @pytest.fixture(scope="module") def public_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("pr22_public") + out = tmp_path_factory.mktemp("snapshot_safe_public") _build("student_public", out) return out @pytest.fixture(scope="module") def instructor_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: - out = tmp_path_factory.mktemp("pr22_instructor") + out = tmp_path_factory.mktemp("snapshot_safe_instructor") _build("research_instructor", out) return out @@ -117,24 +118,6 @@ def test_instructor_leads_keeps_banned_columns(self, instructor_bundle: Path) -> class TestManifestContract: - def test_public_manifest_declares_v5(self, public_bundle: Path) -> None: - manifest = json.loads((public_bundle / "manifest.json").read_text()) - assert manifest["bundle_schema_version"] == "5" - - def test_instructor_manifest_declares_v5(self, instructor_bundle: Path) -> None: - manifest = json.loads((instructor_bundle / "manifest.json").read_text()) - assert manifest["bundle_schema_version"] == "5" - - def test_public_manifest_relational_snapshot_safe_true(self, public_bundle: Path) -> None: - manifest = json.loads((public_bundle / "manifest.json").read_text()) - assert manifest["relational_snapshot_safe"] is True - - def test_instructor_manifest_relational_snapshot_safe_false( - self, instructor_bundle: Path - ) -> None: - manifest = json.loads((instructor_bundle / "manifest.json").read_text()) - assert manifest["relational_snapshot_safe"] is False - def test_public_manifest_table_row_counts_match_disk(self, public_bundle: Path) -> None: """Manifest row counts must come from the *post-redaction* dict so consumers reading the manifest see the truth on disk, not the @@ -172,12 +155,12 @@ def _make_tampered(self, instructor_bundle: Path, dest: Path) -> Path: manifest = json.loads((dest / "manifest.json").read_text()) manifest["exposure_mode"] = "student_public" # ``redacted_columns`` would mismatch the public expectation - # (instructor has []); align it so we test the relational leakage - # path specifically and not the redaction-set check. + # (instructor has []); align it so we test the relational + # leakage path specifically and not the redaction-set check. manifest["redacted_columns"] = ["current_stage", "is_sql"] - # Same reason: leave ``relational_snapshot_safe`` at False — that - # IS the lie this test exercises (manifest claims public, tables - # are full-horizon). + # Leave ``relational_snapshot_safe`` at False — that IS the + # lie this test exercises (manifest claims public, tables are + # full-horizon). (dest / "manifest.json").write_text(json.dumps(manifest, indent=2)) return dest @@ -191,54 +174,41 @@ def test_validate_surfaces_leakage_findings( f"tampered public bundle must surface relational-leakage errors; got {errors}" ) - def test_findings_cover_expected_channels( + def test_findings_name_specific_banned_columns( self, tmp_path: Path, instructor_bundle: Path ) -> None: - tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered2") + """Channel membership isn't enough — assert the *details* point at + the actual banned columns / tables, so a future regression that + leaves one finding behind doesn't slip through.""" + tampered = self._make_tampered(instructor_bundle, tmp_path / "tampered_details") manifest = json.loads((tampered / "manifest.json").read_text()) report = run_all_probes(tampered, snapshot_day=manifest["snapshot_day"]) - channels = {f.channel for f in report.findings} - # An instructor bundle masquerading as public will trip: - # - banned columns (leads.converted_within_90_days, - # leads.conversion_timestamp, opportunities.close_outcome, - # opportunities.closed_at); - # - banned tables (customers, subscriptions); - # - join reconstruction (paths B/C/D fire because the - # conversion-conditional tables are present). - assert CHANNEL_BANNED_COLUMN in channels - assert CHANNEL_BANNED_TABLE in channels - assert CHANNEL_JOIN_RECONSTRUCTION in channels + details_by_channel: dict[str, set[str]] = {} + for f in report.findings: + details_by_channel.setdefault(f.channel, set()).add(f.detail) -# --------------------------------------------------------------------------- -# Hash determinism -# --------------------------------------------------------------------------- - - -class TestHashDeterminism: - """Pinning ``generation_timestamp`` should produce byte-identical bundle - content across runs. Mirrors the contract of - ``scripts/verify_hash_determinism.py`` for a single bundle.""" - - def _hash_tree(self, root: Path) -> dict[str, str]: - return { - str(p.relative_to(root)): file_sha256(p) for p in sorted(root.rglob("*")) if p.is_file() + # Every banned column must be named in a banned-column finding. + banned_col_details = details_by_channel.get(CHANNEL_BANNED_COLUMN, set()) + expected_banned_cols = {f"leads.{c}" for c in BANNED_LEAD_COLUMNS} | { + f"opportunities.{c}" for c in BANNED_OPP_COLUMNS } + assert expected_banned_cols <= banned_col_details, ( + f"banned-column findings missing entries: " + f"{sorted(expected_banned_cols - banned_col_details)}" + ) - @pytest.mark.parametrize("mode", ["student_public", "research_instructor"]) - def test_pinned_timestamp_byte_identical(self, tmp_path: Path, mode: str) -> None: - a = tmp_path / "a" - b = tmp_path / "b" - _build(mode, a, generation_timestamp=_PINNED_TS) - _build(mode, b, generation_timestamp=_PINNED_TS) - - ha = self._hash_tree(a) - hb = self._hash_tree(b) + # Every banned table must be named in a banned-table finding. + banned_tbl_details = details_by_channel.get(CHANNEL_BANNED_TABLE, set()) + assert set(BANNED_TABLES) <= banned_tbl_details, ( + f"banned-table findings missing entries: " + f"{sorted(set(BANNED_TABLES) - banned_tbl_details)}" + ) - assert set(ha.keys()) == set(hb.keys()), ( - f"file set differs across runs ({mode}): " - f"only_in_a={sorted(set(ha) - set(hb))} " - f"only_in_b={sorted(set(hb) - set(ha))}" + # Join-reconstruction must name at least path B (closed_won + # opportunities are in the tampered bundle). Paths C / D may + # also fire when customers / subscriptions are present. + join_details = details_by_channel.get(CHANNEL_JOIN_RECONSTRUCTION, set()) + assert "path_b_opportunity_won" in join_details, ( + f"join-reconstruction finding for path B missing; got {sorted(join_details)}" ) - diffs = [k for k in ha if ha[k] != hb[k]] - assert not diffs, f"hash mismatches across runs ({mode}): {diffs}" diff --git a/tests/render/test_bundle_schema_v5_contract.py b/tests/render/test_bundle_schema_v5_contract.py index ac50774..e199d4e 100644 --- a/tests/render/test_bundle_schema_v5_contract.py +++ b/tests/render/test_bundle_schema_v5_contract.py @@ -208,6 +208,27 @@ def test_manifest_records_relational_snapshot_safe( ) +def test_manifest_records_structural_redactions( + student_bundle: Path, instructor_bundle: Path +) -> None: + """v5 contract: ``manifest.structural_redactions`` enumerates the table-level + drops (columns + omitted tables) so the bundle is self-describing.""" + student = json.loads((student_bundle / "manifest.json").read_text()) + assert student["structural_redactions"] == { + "columns": { + "leads": ["conversion_timestamp", "converted_within_90_days"], + "opportunities": ["close_outcome", "closed_at"], + }, + "omitted_tables": ["customers", "subscriptions"], + }, "student_public must record the snapshot-safe drops in manifest.structural_redactions" + + instructor = json.loads((instructor_bundle / "manifest.json").read_text()) + assert instructor["structural_redactions"] == { + "columns": {}, + "omitted_tables": [], + }, "research_instructor must record an empty structural_redactions" + + def test_student_public_task_columns_match_v5_contract(student_bundle: Path) -> None: actual = _task_cols(student_bundle) assert actual == V5_TASK_COLUMNS_STUDENT_PUBLIC, ( diff --git a/tests/scripts/test_probe_relational_leakage.py b/tests/scripts/test_probe_relational_leakage.py index 57a3fd3..e063be6 100644 --- a/tests/scripts/test_probe_relational_leakage.py +++ b/tests/scripts/test_probe_relational_leakage.py @@ -233,11 +233,14 @@ def test_cli_smoke_reports_numeric_accuracy() -> None: @pytest.mark.skipif( not (_REPO_ROOT / "release" / "intermediate" / "tables" / "leads.parquet").exists(), - reason="alpha bundle not present (clean checkout)", + reason="release bundle not present (clean checkout)", ) -def test_cli_max_accuracy_gate_fails_on_alpha() -> None: - """Phase-2 CI gate flag: alpha bundles trivially exceed any sane threshold, - so ``--max-accuracy 0.65`` must exit 2.""" +def test_cli_max_accuracy_gate_passes_on_v5_bundle() -> None: + """Phase-2 CI gate: post-PR 2.2 (snapshot-safe v5) public bundles must + pass ``--max-accuracy 0.65``. This test was originally pointed at + alpha bundles and asserted exit 2 (gate failure) as evidence of the + leak; PR 2.2 fixed the leak, so the regenerated bundles must now exit + 0 instead.""" result = subprocess.run( # noqa: S603 — controlled args, sys.executable [ sys.executable, @@ -250,5 +253,5 @@ def test_cli_max_accuracy_gate_fails_on_alpha() -> None: text=True, check=False, ) - assert result.returncode == 2, result.stderr - assert "GATE FAILURE" in result.stderr + assert result.returncode == 0, result.stderr + assert "GATE FAILURE" not in result.stderr diff --git a/tests/validation/test_invariants.py b/tests/validation/test_invariants.py index 078548c..ba854ea 100644 --- a/tests/validation/test_invariants.py +++ b/tests/validation/test_invariants.py @@ -22,7 +22,7 @@ @pytest.fixture(scope="module") def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: - """Generate two bundles with the same seed and pinned timestamp.""" + """Generate two student_public bundles with the same seed and pinned timestamp.""" a = tmp_path_factory.mktemp("det_a") b = tmp_path_factory.mktemp("det_b") for out in (a, b): @@ -33,6 +33,21 @@ def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, return a, b +@pytest.fixture(scope="module") +def determinism_instructor_bundles( + tmp_path_factory: pytest.TempPathFactory, +) -> tuple[Path, Path]: + """Generate two research_instructor bundles with the same seed and pinned timestamp.""" + a = tmp_path_factory.mktemp("det_instructor_a") + b = tmp_path_factory.mktemp("det_instructor_b") + for out in (a, b): + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", seed=77, exposure_mode="research_instructor" + ) + gen.generate(**_SMALL).save(str(out), generation_timestamp=_PINNED_TIMESTAMP) + return a, b + + @pytest.fixture(scope="module") def exposure_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: """Generate student_public and research_instructor bundles.""" @@ -55,6 +70,16 @@ def test_same_seed_produces_identical_bundles( errors = check_determinism(a, b) assert errors == [] + def test_same_seed_produces_identical_instructor_bundles( + self, determinism_instructor_bundles: tuple[Path, Path] + ) -> None: + """Determinism must hold for ``research_instructor`` too — the + full-horizon export and the metadata/ artefacts (graph, latents, + mechanism summary) are part of the deterministic contract.""" + a, b = determinism_instructor_bundles + errors = check_determinism(a, b) + assert errors == [] + def test_different_seeds_differ(self, tmp_path: Path) -> None: a = tmp_path / "seed1" b = tmp_path / "seed2" From e692b19d2de2182fa802c97da30df6ce84d480c5 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 23:05:31 +0300 Subject: [PATCH 4/4] review(pr-2.2): strict bool on relational_snapshot_safe + focused monotonicity tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses both Copilot review comments on PR #64. Strict-bool guard (bundle_checks.py) - ``_check_fk_integrity`` no longer coerces ``manifest.relational_snapshot_safe`` via ``bool(...)``: a JSON string ``"false"`` was truthy and would silently suppress FK errors on a corrupted manifest. Require an actual JSON boolean and surface a typed error otherwise; default to False (i.e. don't trust a malformed flag to expand the expected- absent set). - New test in tests/validation/test_bundle_checks.py asserting a ``"false"`` string trips the strict check. Focused monotonicity tests (test_invariants.py) - New TestExposureMonotonicitySnapshotSafe class with 9 unit tests using synthetic parquet bundles in tmp_path: * omitted BANNED_TABLES (customers / subscriptions) is allowed * non-banned extra instructor table is rejected * dropped BANNED_LEAD_COLUMNS / BANNED_OPP_COLUMNS are allowed * non-banned extra leads column is rejected * PK-based row-subset on touches is allowed (student PK ⊂ instructor PK) * student PK absent from instructor is rejected * student row count > instructor is rejected * dropped PK column is caught by the column-diff check (with a clear test name documenting which path is reached) The new tests exercise the snapshot-safe branches directly without running the full generator, so a regression in the table-comparison logic now fails immediately with a localised error. All 1043 tests pass; ruff + mypy clean. --- leadforge/validation/bundle_checks.py | 17 +- tests/validation/test_bundle_checks.py | 20 +++ tests/validation/test_invariants.py | 212 +++++++++++++++++++++++++ 3 files changed, 247 insertions(+), 2 deletions(-) diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index 674b9f8..c92e27e 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -148,10 +148,23 @@ def _check_fk_integrity(tables: dict[str, pd.DataFrame], manifest: dict[str, Any # are intentionally absent — emitting "FK check skipped" for them # would be a false positive. The expected-absent set is the same # ``BANNED_TABLES`` constant the writer omits. - snapshot_safe = bool(manifest.get("relational_snapshot_safe", False)) + # + # Strict bool check: ``bool(...)`` would coerce malformed manifest + # values (the JSON string ``"false"`` is truthy; the int ``1`` would + # masquerade as snapshot-safe) and silently suppress real FK errors. + # Surface the bad value instead. + errors: list[str] = [] + raw_flag = manifest.get("relational_snapshot_safe", False) + if not isinstance(raw_flag, bool): + errors.append( + f"manifest.relational_snapshot_safe must be a JSON boolean, got " + f"{type(raw_flag).__name__}={raw_flag!r}" + ) + snapshot_safe = False + else: + snapshot_safe = raw_flag expected_absent = set(BANNED_TABLES) if snapshot_safe else set() - errors: list[str] = [] for fk in ALL_CONSTRAINTS: child_df = tables.get(fk.child_table) parent_df = tables.get(fk.parent_table) diff --git a/tests/validation/test_bundle_checks.py b/tests/validation/test_bundle_checks.py index 31d3e51..4f1679a 100644 --- a/tests/validation/test_bundle_checks.py +++ b/tests/validation/test_bundle_checks.py @@ -131,3 +131,23 @@ def test_missing_required_file(self, tmp_path: Path, valid_bundle: Path) -> None errors = validate_bundle(corrupt) assert any("dataset_card.md" in e for e in errors) + + def test_relational_snapshot_safe_must_be_a_bool( + self, tmp_path: Path, valid_bundle: Path + ) -> None: + """``manifest.relational_snapshot_safe`` is consulted to decide + whether ``customers`` / ``subscriptions`` are *expected* absent. + A bare ``bool(...)`` coercion would make the JSON string + ``"false"`` truthy, silently suppressing FK errors on a + corrupted manifest. Surface the type problem instead.""" + corrupt = tmp_path / "bad_flag" + shutil.copytree(valid_bundle, corrupt) + manifest = json.loads((corrupt / "manifest.json").read_text()) + # JSON string "false" — truthy under bool() but invalid. + manifest["relational_snapshot_safe"] = "false" + (corrupt / "manifest.json").write_text(json.dumps(manifest, indent=2)) + + errors = validate_bundle(corrupt, include_realism=False) + assert any("relational_snapshot_safe" in e and "boolean" in e for e in errors), ( + f"expected strict-bool error; got: {errors}" + ) diff --git a/tests/validation/test_invariants.py b/tests/validation/test_invariants.py index ba854ea..f125b3e 100644 --- a/tests/validation/test_invariants.py +++ b/tests/validation/test_invariants.py @@ -112,6 +112,218 @@ def test_instructor_without_metadata_fails(self, exposure_bundles: tuple[Path, P assert any("missing metadata" in e for e in errors) +# --------------------------------------------------------------------------- +# check_exposure_monotonicity — focused unit tests for the v5 (PR 2.2) +# snapshot-safe branches. These build minimal synthetic bundles so the +# table-comparison logic is exercised independently of generation. +# --------------------------------------------------------------------------- + + +class TestExposureMonotonicitySnapshotSafe: + """Exercise the table-comparison branches added in PR 2.2: + omitted ``BANNED_TABLES``, dropped ``BANNED_LEAD_COLUMNS`` / + ``BANNED_OPP_COLUMNS``, and PK-based row-subset on + ``SNAPSHOT_FILTERED_TABLES``.""" + + @staticmethod + def _shell(root: Path, *, has_metadata: bool) -> Path: + """Write the minimum scaffolding ``check_exposure_monotonicity`` requires + before it reaches the table-comparison logic. Feature dictionary is + written empty so the subset check trivially passes.""" + import pandas as pd + + root.mkdir(parents=True, exist_ok=True) + (root / "manifest.json").write_text("{}") + (root / "dataset_card.md").write_text("") + # Identical empty FD on both sides → subset check trivially passes. + pd.DataFrame({"name": [], "dtype": []}).to_csv(root / "feature_dictionary.csv", index=False) + if has_metadata: + (root / "metadata").mkdir() + (root / "tables").mkdir() + return root + + @staticmethod + def _write_parquet(root: Path, name: str, df) -> None: + df.to_parquet(root / "tables" / f"{name}.parquet", index=False) + + @pytest.fixture + def student_root(self, tmp_path: Path) -> Path: + return self._shell(tmp_path / "student", has_metadata=False) + + @pytest.fixture + def instructor_root(self, tmp_path: Path) -> Path: + return self._shell(tmp_path / "instructor", has_metadata=True) + + def test_omitted_banned_tables_pass(self, student_root: Path, instructor_root: Path) -> None: + """``customers`` / ``subscriptions`` in instructor only is the + expected snapshot-safe contract — must NOT raise an error.""" + import pandas as pd + + for root in (student_root, instructor_root): + self._write_parquet(root, "accounts", pd.DataFrame({"account_id": ["a1"]})) + self._write_parquet( + instructor_root, + "customers", + pd.DataFrame({"customer_id": ["c1"], "opportunity_id": ["o1"]}), + ) + self._write_parquet( + instructor_root, + "subscriptions", + pd.DataFrame({"subscription_id": ["s1"], "customer_id": ["c1"]}), + ) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_unexpected_extra_instructor_table_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """A NON-banned extra table on instructor must still be flagged — + snapshot-safe doesn't license arbitrary instructor-only tables.""" + import pandas as pd + + for root in (student_root, instructor_root): + self._write_parquet(root, "accounts", pd.DataFrame({"account_id": ["a1"]})) + self._write_parquet(instructor_root, "rogue", pd.DataFrame({"x": [1]})) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("rogue.parquet" in e and "instructor but not student" in e for e in errors) + + def test_dropped_banned_lead_columns_pass( + self, student_root: Path, instructor_root: Path + ) -> None: + """Public ``leads`` drops ``converted_within_90_days`` / + ``conversion_timestamp`` — must NOT raise.""" + import pandas as pd + + student_leads = pd.DataFrame({"lead_id": ["l1"], "lead_created_at": ["2024-01-01"]}) + instructor_leads = pd.DataFrame( + { + "lead_id": ["l1"], + "lead_created_at": ["2024-01-01"], + "converted_within_90_days": [True], + "conversion_timestamp": ["2024-02-01"], + } + ) + self._write_parquet(student_root, "leads", student_leads) + self._write_parquet(instructor_root, "leads", instructor_leads) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_dropped_banned_opp_columns_pass( + self, student_root: Path, instructor_root: Path + ) -> None: + """Public ``opportunities`` drops ``close_outcome`` / ``closed_at`` — + must NOT raise (and must still respect row-equality on shared + columns since the table size is below the snapshot-filter threshold + in this synthetic case).""" + import pandas as pd + + student_opps = pd.DataFrame( + { + "opportunity_id": ["o1"], + "lead_id": ["l1"], + "created_at": ["2024-01-01"], + } + ) + instructor_opps = pd.DataFrame( + { + "opportunity_id": ["o1"], + "lead_id": ["l1"], + "created_at": ["2024-01-01"], + "close_outcome": ["closed_won"], + "closed_at": ["2024-02-01"], + } + ) + self._write_parquet(student_root, "opportunities", student_opps) + self._write_parquet(instructor_root, "opportunities", instructor_opps) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_unexpected_extra_instructor_lead_column_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """A non-banned, non-redacted extra column on instructor's leads + must still trip the column-diff check.""" + import pandas as pd + + student_leads = pd.DataFrame({"lead_id": ["l1"]}) + instructor_leads = pd.DataFrame({"lead_id": ["l1"], "rogue_col": [42]}) + self._write_parquet(student_root, "leads", student_leads) + self._write_parquet(instructor_root, "leads", instructor_leads) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("leads.parquet" in e and "rogue_col" in e for e in errors) + + def test_pk_row_subset_pass(self, student_root: Path, instructor_root: Path) -> None: + """Student touches PK ⊂ instructor touches PK — must NOT raise.""" + import pandas as pd + + instructor_touches = pd.DataFrame( + { + "touch_id": ["t1", "t2", "t3", "t4"], + "lead_id": ["l1", "l1", "l1", "l1"], + } + ) + # Student has a snapshot-window subset (rows for t1, t2 only). + student_touches = instructor_touches.iloc[:2].reset_index(drop=True) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert errors == [], errors + + def test_pk_orphan_row_fails(self, student_root: Path, instructor_root: Path) -> None: + """Student touch_id not present in instructor — must raise. The PK + branch fires when row counts differ; instructor strictly larger + with student carrying an orphan PK is the regression case.""" + import pandas as pd + + instructor_touches = pd.DataFrame( + {"touch_id": ["t1", "t2", "t3"], "lead_id": ["l1", "l1", "l1"]} + ) + student_touches = pd.DataFrame( + {"touch_id": ["t1", "t99"], "lead_id": ["l1", "l1"]} # t99 is the orphan + ) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("touches.parquet" in e and "absent from instructor" in e for e in errors), ( + f"expected PK-orphan error; got: {errors}" + ) + + def test_student_has_more_rows_than_instructor_fails( + self, student_root: Path, instructor_root: Path + ) -> None: + """Snapshot-safe must be a row-subset; student strictly larger means + the writer leaked extra rows somewhere — must raise.""" + import pandas as pd + + instructor_touches = pd.DataFrame({"touch_id": ["t1"], "lead_id": ["l1"]}) + student_touches = pd.DataFrame( + {"touch_id": ["t1", "t2", "t3"], "lead_id": ["l1", "l1", "l1"]} + ) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any("touches.parquet" in e and "more rows than instructor" in e for e in errors) + + def test_missing_pk_column_surfaces_an_error( + self, student_root: Path, instructor_root: Path + ) -> None: + """Student dropping ``touch_id`` is structurally invalid — the + column-diff check catches it (``touch_id`` is not in the + snapshot-safe allowlist for ``touches``), surfacing it as a + non-redacted-column error before the PK-set comparison runs.""" + import pandas as pd + + instructor_touches = pd.DataFrame({"touch_id": ["t1", "t2"], "lead_id": ["l1", "l1"]}) + student_touches = pd.DataFrame({"lead_id": ["l1"]}) + self._write_parquet(student_root, "touches", student_touches) + self._write_parquet(instructor_root, "touches", instructor_touches) + errors = check_exposure_monotonicity(student_root, instructor_root) + assert any( + "touches.parquet" in e and "touch_id" in e and "non-redacted columns" in e + for e in errors + ), f"expected column-diff error naming touch_id; got: {errors}" + + def _make_synthetic_bundle( root: Path, files: dict[str, str | bytes],