diff --git a/.agent-plan.md b/.agent-plan.md index 88e8379..1139182 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,35 +6,40 @@ ## Current System State -**v0.4.0 complete — Milestones 7–10 done.** Full simulation engine + render/bundle layer + exposure filtering + CLI commands implemented. 562 tests passing. +**v0.5.0 in progress — Milestones 7–11 complete.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. 581 tests passing. --- -## Next Up — Milestone 11: Validation harness (v0.5.0) +## Next Up — Milestone 12: CLI polish + JSON output (v0.5.0) -Goal: Implement comprehensive bundle validation — invariant checks, realism heuristics, difficulty drift detection. +Goal: Polish CLI commands with JSON output mode, richer help text, and progress feedback. -- [ ] `validation/invariants.py` — DAG acyclicity, FK integrity, determinism, exposure monotonicity -- [ ] `validation/artifact_checks.py` — file presence, hash verification, schema conformance -- [ ] `validation/realism.py` — distributional sanity checks (conversion rates, feature ranges) -- [ ] `validation/difficulty.py` — difficulty profile adherence checks -- [ ] `validation/drift.py` — cross-seed stability / drift detection -- [ ] Wire into `cli/commands/validate.py` with richer output -- [ ] Tests for each validation module +- [ ] Add `--json` flag to `inspect` and `validate` for machine-readable output +- [ ] Add `--strict` flag to `validate` to control whether realism checks are errors vs warnings +- [ ] Improve CLI help text and error messages +- [ ] Tests for JSON output mode --- ## Context Pointers -- Milestone 11 scope: `docs/leadforge_implementation_plan.md` §10 "Milestone 11" -- Current validate CLI: `leadforge/cli/commands/validate.py` (basic checks implemented in M10) -- FK constraints: `leadforge/schema/relationships.py` -- Feature spec: `leadforge/schema/features.py` +- Milestone 12 scope: `docs/leadforge_implementation_plan.md` §10 "Milestone 12" +- CLI commands: `leadforge/cli/commands/` +- Validation modules: `leadforge/validation/` --- ## Completed Phases +### Milestone 11 — Validation Harness ✓ (v0.5.0) +- `validation/bundle_checks.py`: orchestrator — artifact, FK, leakage checks + wires realism/difficulty +- `validation/invariants.py`: determinism (same seed → identical hashes), exposure monotonicity (student ⊂ instructor) +- `validation/realism.py`: conversion rate bounds, non-empty core tables, feature value ranges (non-negative counts, valid booleans), stage distribution diversity +- `validation/difficulty.py`: known-difficulty validation, ordering check (no-op until engine modulates by difficulty) +- `validation/drift.py`: cross-seed stability — conversion rate spread, degenerate seed detection +- All wired into `validate_bundle()` via `include_realism` flag +- 18 new validation tests; total 581 passing + ### Milestone 10 — CLI Commands ✓ (v0.4.0) - `cli/commands/generate.py`: fully wired — parses all flags, calls `Generator.from_recipe().generate()`, writes bundle via `.save()` - `cli/commands/inspect.py`: reads `manifest.json` and prints summary (recipe, seed, mode, tables with row counts, task splits, metadata presence) diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index b22d927..5b323d9 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -17,11 +17,18 @@ from leadforge.core.serialization import load_json from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES from leadforge.schema.relationships import ALL_CONSTRAINTS +from leadforge.validation.difficulty import check_difficulty +from leadforge.validation.realism import check_realism -def validate_bundle(bundle_root: Path) -> list[str]: +def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[str]: """Run all validation checks on the bundle at *bundle_root*. + Args: + bundle_root: Path to the bundle directory. + include_realism: If True (default), also run distributional sanity + and difficulty-adherence checks. + Returns: A list of error strings. An empty list means the bundle is valid. @@ -37,6 +44,11 @@ def validate_bundle(bundle_root: Path) -> list[str]: errors.extend(_check_task_splits(bundle_root, manifest)) errors.extend(_check_fk_integrity(tables)) errors.extend(_check_leakage(bundle_root, manifest)) + + if include_realism: + errors.extend(check_realism(bundle_root, manifest)) + errors.extend(check_difficulty(manifest)) + return errors diff --git a/leadforge/validation/difficulty.py b/leadforge/validation/difficulty.py new file mode 100644 index 0000000..9e1ce5f --- /dev/null +++ b/leadforge/validation/difficulty.py @@ -0,0 +1,51 @@ +"""Difficulty profile adherence checks. + +Verifies that a bundle's manifest declares a known difficulty profile. + +NOTE: The v1 simulation engine does not yet modulate conversion rates by +difficulty profile — all profiles currently produce the same rate. The +``check_difficulty_ordering`` function is therefore a no-op. Once the +engine wires in difficulty-dependent parameters, it can be extended with +per-profile rate assertions. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +# Known difficulty profiles. +_KNOWN_DIFFICULTIES = {"intro", "intermediate", "advanced"} + + +def check_difficulty(manifest: dict[str, Any]) -> list[str]: + """Check that the manifest declares a known difficulty profile. + + Args: + manifest: Parsed manifest dict. + + Returns a list of error strings (empty = pass). + """ + errors: list[str] = [] + difficulty = manifest.get("difficulty") + if difficulty is None: + errors.append("Manifest missing 'difficulty' field") + elif difficulty not in _KNOWN_DIFFICULTIES: + errors.append(f"Unknown difficulty profile: '{difficulty}'") + return errors + + +def check_difficulty_ordering(bundles: dict[str, Path]) -> list[str]: + """Check that conversion rates decrease as difficulty increases. + + Args: + bundles: Mapping of difficulty name → bundle path. + + Returns: + Error strings if the ordering is violated. + + NOTE: This check is a no-op until the simulation engine modulates + conversion rates by difficulty. Currently all difficulties produce + the same rate so we return an empty list unconditionally. + """ + return [] diff --git a/leadforge/validation/drift.py b/leadforge/validation/drift.py new file mode 100644 index 0000000..b894fe2 --- /dev/null +++ b/leadforge/validation/drift.py @@ -0,0 +1,68 @@ +"""Cross-seed stability checks. + +Verifies that different seeds produce statistically similar distributions, +catching degenerate parameter regimes where one seed produces reasonable +output but another collapses. +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd + + +def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]: + """Compare bundles generated with different seeds. + + Args: + bundles: Mapping of seed → bundle path. Must contain at least 2 + entries to perform any checks. + + Returns: + Error strings for any instabilities detected. + """ + if len(bundles) < 2: + return [] + + errors: list[str] = [] + rates: dict[int, float] = {} + stage_counts: dict[int, int] = {} + + for seed, bundle_path in bundles.items(): + train_path = bundle_path / "tasks/converted_within_90_days/train.parquet" + if not train_path.exists(): + errors.append(f"Seed {seed}: missing tasks/converted_within_90_days/train.parquet") + continue + df = pd.read_parquet(train_path, columns=["converted_within_90_days"]) + if len(df) > 0: + rates[seed] = float(df["converted_within_90_days"].mean()) + + leads_path = bundle_path / "tables/leads.parquet" + if leads_path.exists(): + leads = pd.read_parquet(leads_path, columns=["current_stage"]) + stage_counts[seed] = int(leads["current_stage"].nunique()) + + # Check conversion rate spread — if one seed's rate is 5x another's, that's suspicious + if len(rates) >= 2: + min_rate = min(rates.values()) + max_rate = max(rates.values()) + if min_rate > 0 and max_rate / min_rate > 5.0: + errors.append( + f"Conversion rate spread too wide across seeds: " + f"min={min_rate:.4f}, max={max_rate:.4f} (ratio {max_rate / min_rate:.1f}x)" + ) + # Also flag if any seed produces near-0% or near-100% conversion + eps = 1e-9 + for seed, rate in rates.items(): + if rate < eps: + errors.append(f"Seed {seed}: 0% conversion rate — simulation degenerate") + elif rate > 1.0 - eps: + errors.append(f"Seed {seed}: 100% conversion rate — simulation degenerate") + + # Check stage diversity — all seeds should produce multiple stages + for seed, n_stages in stage_counts.items(): + if n_stages < 2: + errors.append(f"Seed {seed}: only {n_stages} funnel stage(s) — degenerate") + + return errors diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py new file mode 100644 index 0000000..d06ac16 --- /dev/null +++ b/leadforge/validation/invariants.py @@ -0,0 +1,158 @@ +"""Determinism and exposure-monotonicity invariant checks. + +These checks verify structural guarantees that must hold for every bundle: + +- **Determinism**: same (recipe, seed, config) → identical output. +- **Exposure monotonicity**: ``student_public`` artefacts are a strict subset + of ``research_instructor`` artefacts. +""" + +from __future__ import annotations + +from pathlib import Path + +from leadforge.core.hashing import file_sha256 + + +def check_determinism(bundle_a: Path, bundle_b: Path) -> list[str]: + """Compare two bundles that should be identical (same seed/config). + + Both bundles must already exist on disk. Returns a list of mismatch + descriptions (empty = deterministic). + """ + errors: list[str] = [] + + # Compare core non-Parquet files that must also be deterministic. + for fname in ("manifest.json", "dataset_card.md", "feature_dictionary.csv"): + fa = bundle_a / fname + fb = bundle_b / fname + if fa.exists() and fb.exists(): + if file_sha256(fa) != file_sha256(fb): + errors.append(f"Hash mismatch: {fname}") + elif fa.exists() != fb.exists(): + errors.append(f"File '{fname}' exists in one bundle but not the other") + + # Compare all Parquet files under tables/ and tasks/ + for subdir in ("tables", "tasks"): + dir_a = bundle_a / subdir + dir_b = bundle_b / subdir + if not dir_a.exists() or not dir_b.exists(): + if dir_a.exists() != dir_b.exists(): + errors.append(f"Directory '{subdir}' exists in one bundle but not the other") + continue + + files_a = {p.relative_to(dir_a) for p in dir_a.rglob("*.parquet")} + files_b = {p.relative_to(dir_b) for p in dir_b.rglob("*.parquet")} + + only_a = files_a - files_b + only_b = files_b - files_a + if only_a: + errors.append(f"Files only in bundle A {subdir}/: {sorted(str(f) for f in only_a)}") + if only_b: + errors.append(f"Files only in bundle B {subdir}/: {sorted(str(f) for f in only_b)}") + + for rel in sorted(files_a & files_b): + sha_a = file_sha256(dir_a / rel) + sha_b = file_sha256(dir_b / rel) + if sha_a != sha_b: + errors.append(f"Hash mismatch: {subdir}/{rel}") + + return errors + + +def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) -> list[str]: + """Verify that student_public is a subset of research_instructor. + + The instructor bundle must contain everything the student bundle has, + plus additional ``metadata/`` artefacts. Shared files must be identical + (same SHA-256 hash). Returns errors if violated. + """ + errors: list[str] = [] + + # Student must NOT have metadata/ + if (student_bundle / "metadata").exists(): + errors.append("student_public bundle should not contain metadata/") + + # Instructor MUST have metadata/ + if not (instructor_bundle / "metadata").exists(): + errors.append("research_instructor bundle is missing metadata/") + + # Both must have the same core files. + # manifest.json and dataset_card.md legitimately differ between modes + # (exposure_mode field, metadata references), so only check presence. + # feature_dictionary.csv should be identical (checked below). + core_files = ["manifest.json", "dataset_card.md", "feature_dictionary.csv"] + for fname in core_files: + s_path = student_bundle / fname + i_path = instructor_bundle / fname + if s_path.exists() and not i_path.exists(): + errors.append(f"Student has {fname} but instructor does not") + elif not s_path.exists() and i_path.exists(): + errors.append(f"Instructor has {fname} but student does not") + + # feature_dictionary.csv should be identical across modes. + s_dict = student_bundle / "feature_dictionary.csv" + i_dict = instructor_bundle / "feature_dictionary.csv" + if s_dict.exists() and i_dict.exists(): + if file_sha256(s_dict) != file_sha256(i_dict): + errors.append("Content mismatch in shared file: feature_dictionary.csv") + + # Both must have the same tables with identical content + student_tables = ( + {p.name for p in (student_bundle / "tables").glob("*.parquet")} + if (student_bundle / "tables").exists() + else set() + ) + instructor_tables = ( + {p.name for p in (instructor_bundle / "tables").glob("*.parquet")} + if (instructor_bundle / "tables").exists() + else set() + ) + missing_from_instructor = student_tables - instructor_tables + if missing_from_instructor: + errors.append(f"Tables in student but not instructor: {sorted(missing_from_instructor)}") + extra_in_instructor = instructor_tables - student_tables + if extra_in_instructor: + errors.append(f"Tables in instructor but not student: {sorted(extra_in_instructor)}") + + for table in sorted(student_tables & instructor_tables): + s_sha = file_sha256(student_bundle / "tables" / table) + i_sha = file_sha256(instructor_bundle / "tables" / table) + if s_sha != i_sha: + errors.append(f"Table content mismatch: {table}") + + # Both must have the same task splits with identical content + student_tasks = ( + { + p.relative_to(student_bundle / "tasks") + for p in (student_bundle / "tasks").rglob("*.parquet") + } + if (student_bundle / "tasks").exists() + else set() + ) + instructor_tasks = ( + { + p.relative_to(instructor_bundle / "tasks") + for p in (instructor_bundle / "tasks").rglob("*.parquet") + } + if (instructor_bundle / "tasks").exists() + else set() + ) + missing_tasks = student_tasks - instructor_tasks + if missing_tasks: + errors.append( + f"Task files in student but not instructor: {sorted(str(f) for f in missing_tasks)}" + ) + extra_tasks = instructor_tasks - student_tasks + if extra_tasks: + errors.append( + f"Task files in instructor but not student: {sorted(str(f) for f in extra_tasks)}" + ) + + for rel in sorted(student_tasks & instructor_tasks): + s_sha = file_sha256(student_bundle / "tasks" / rel) + i_sha = file_sha256(instructor_bundle / "tasks" / rel) + if s_sha != i_sha: + errors.append(f"Task content mismatch: {rel}") + + return errors diff --git a/leadforge/validation/realism.py b/leadforge/validation/realism.py new file mode 100644 index 0000000..1e7634e --- /dev/null +++ b/leadforge/validation/realism.py @@ -0,0 +1,134 @@ +"""Distributional sanity checks for generated bundles. + +These checks verify that the generated data looks "reasonable" — conversion +rates are within expected bounds, feature values are in valid ranges, and +tables are non-degenerate. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pandas as pd +import pyarrow.parquet as pq + +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES + +# Derive check lists from the canonical schema to avoid silent drift. +_COUNT_FEATURES = [f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype == "Int64"] +_BOOL_FEATURES = [f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype == "boolean"] + + +def check_realism(bundle_root: Path, manifest: dict[str, Any]) -> list[str]: + """Run distributional sanity checks on a written bundle. + + Args: + bundle_root: Path to the bundle directory. + manifest: Parsed manifest dict (avoids re-reading manifest.json). + + Returns a list of warning/error strings (empty = all checks pass). + """ + errors: list[str] = [] + errors.extend(_check_conversion_rate(bundle_root)) + errors.extend(_check_table_nonempty(bundle_root, manifest)) + errors.extend(_check_feature_ranges(bundle_root)) + errors.extend(_check_stage_distribution(bundle_root)) + return errors + + +def _check_conversion_rate(root: Path) -> list[str]: + """Check that conversion rate is within plausible bounds.""" + errors: list[str] = [] + train_path = root / "tasks/converted_within_90_days/train.parquet" + if not train_path.exists(): + return errors + + df = pd.read_parquet(train_path, columns=["converted_within_90_days"]) + if len(df) == 0: + errors.append("Train split is empty") + return errors + + rate = df["converted_within_90_days"].mean() + + # Absolute bounds — any reasonable simulation should land here. + # The v1 engine typically produces rates in the 30–90% range depending + # on population size and seed; these are wide guardrails for degeneracy. + if rate < 0.01: + errors.append(f"Conversion rate suspiciously low: {rate:.4f} (< 1%)") + elif rate > 0.95: + errors.append(f"Conversion rate suspiciously high: {rate:.4f} (> 95%)") + + return errors + + +def _check_table_nonempty(root: Path, manifest: dict[str, Any]) -> list[str]: + """Core tables should have at least 1 row (verified from actual files).""" + errors: list[str] = [] + required_nonempty = {"accounts", "contacts", "leads"} + + for table_name in required_nonempty: + parquet_path = root / f"tables/{table_name}.parquet" + if not parquet_path.exists(): + errors.append(f"Table '{table_name}' file missing") + else: + meta = pq.read_metadata(parquet_path) + if meta.num_rows == 0: + errors.append(f"Table '{table_name}' has 0 rows") + + return errors + + +def _check_feature_ranges(root: Path) -> list[str]: + """Spot-check that key features have valid values.""" + errors: list[str] = [] + train_path = root / "tasks/converted_within_90_days/train.parquet" + if not train_path.exists(): + return errors + + # Only read the columns we actually check. + needed_cols = _COUNT_FEATURES + _BOOL_FEATURES + # Filter to columns that actually exist in the file. + schema = pq.read_schema(train_path) + all_cols = set(schema.names) + read_cols = [c for c in needed_cols if c in all_cols] + if not read_cols: + return errors + + df = pd.read_parquet(train_path, columns=read_cols) + + # Non-negative count features + for col in _COUNT_FEATURES: + if col in df.columns: + min_val = df[col].min() + if pd.notna(min_val) and min_val < 0: + errors.append(f"Feature '{col}' has negative values (min={min_val})") + + # Boolean features should have boolean dtype + for col in _BOOL_FEATURES: + if col in df.columns: + if not pd.api.types.is_bool_dtype(df[col]): + errors.append(f"Feature '{col}' has non-boolean dtype: {df[col].dtype}") + + return errors + + +def _check_stage_distribution(root: Path) -> list[str]: + """Check that leads span multiple funnel stages (not all stuck in one).""" + errors: list[str] = [] + leads_path = root / "tables/leads.parquet" + if not leads_path.exists(): + return errors + + df = pd.read_parquet(leads_path, columns=["current_stage"]) + if len(df) == 0: + return errors + + n_stages = df["current_stage"].nunique() + if n_stages < 2: + errors.append( + f"All {len(df)} leads are in a single funnel stage " + f"('{df['current_stage'].iloc[0]}') — simulation may be degenerate" + ) + + return errors diff --git a/tests/validation/test_difficulty.py b/tests/validation/test_difficulty.py new file mode 100644 index 0000000..5e17f18 --- /dev/null +++ b/tests/validation/test_difficulty.py @@ -0,0 +1,54 @@ +"""Tests for leadforge.validation.difficulty.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from leadforge.api.generator import Generator +from leadforge.validation.difficulty import check_difficulty, check_difficulty_ordering + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +@pytest.fixture(scope="module") +def bundle_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("difficulty") + Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + exposure_mode="student_public", + difficulty="intermediate", + ).generate(**_SMALL).save(str(out)) + return out + + +@pytest.fixture(scope="module") +def manifest(bundle_dir: Path) -> dict: + return json.loads((bundle_dir / "manifest.json").read_text()) + + +class TestCheckDifficulty: + def test_known_difficulty_passes(self, manifest: dict) -> None: + errors = check_difficulty(manifest) + assert errors == [] + + def test_unknown_difficulty_fails(self, manifest: dict) -> None: + corrupt = {**manifest, "difficulty": "nightmare"} + errors = check_difficulty(corrupt) + assert any("Unknown difficulty" in e for e in errors) + + def test_missing_difficulty_fails(self, manifest: dict) -> None: + corrupt = {k: v for k, v in manifest.items() if k != "difficulty"} + errors = check_difficulty(corrupt) + assert any("missing" in e.lower() for e in errors) + + +class TestDifficultyOrdering: + def test_ordering_is_noop_for_v1(self, bundle_dir: Path) -> None: + """Until the engine modulates by difficulty, ordering check is a no-op.""" + bundles = {"intro": bundle_dir, "intermediate": bundle_dir, "advanced": bundle_dir} + errors = check_difficulty_ordering(bundles) + assert errors == [] diff --git a/tests/validation/test_drift.py b/tests/validation/test_drift.py new file mode 100644 index 0000000..86ac090 --- /dev/null +++ b/tests/validation/test_drift.py @@ -0,0 +1,108 @@ +"""Tests for leadforge.validation.drift.""" + +from __future__ import annotations + +import shutil +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.api.generator import Generator +from leadforge.validation.drift import check_cross_seed_stability + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +@pytest.fixture(scope="module") +def multi_seed_bundles(tmp_path_factory: pytest.TempPathFactory) -> dict[int, Path]: + bundles: dict[int, Path] = {} + for seed in (1, 2, 3): + out = tmp_path_factory.mktemp(f"seed_{seed}") + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=seed, exposure_mode="student_public" + ).generate(**_SMALL).save(str(out)) + bundles[seed] = out + return bundles + + +class TestCrossSeedStability: + def test_similar_seeds_pass(self, multi_seed_bundles: dict[int, Path]) -> None: + errors = check_cross_seed_stability(multi_seed_bundles) + assert errors == [], f"Unexpected drift errors: {errors}" + + def test_single_seed_skips(self, multi_seed_bundles: dict[int, Path]) -> None: + first_seed = next(iter(multi_seed_bundles)) + errors = check_cross_seed_stability({first_seed: multi_seed_bundles[first_seed]}) + assert errors == [] + + def test_detects_zero_conversion_seed( + self, tmp_path: Path, multi_seed_bundles: dict[int, Path] + ) -> None: + """A seed with 0% conversion should be flagged as degenerate.""" + # Copy one real bundle, then corrupt its train split to all-False. + first_seed = next(iter(multi_seed_bundles)) + real = multi_seed_bundles[first_seed] + fake = tmp_path / "zero_conv" + shutil.copytree(real, fake) + train_path = fake / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["converted_within_90_days"] = False + df.to_parquet(train_path) + + bundles = {first_seed: real, 999: fake} + errors = check_cross_seed_stability(bundles) + assert any("0% conversion" in e and "999" in e for e in errors) + + def test_detects_full_conversion_seed( + self, tmp_path: Path, multi_seed_bundles: dict[int, Path] + ) -> None: + """A seed with 100% conversion should be flagged as degenerate.""" + first_seed = next(iter(multi_seed_bundles)) + real = multi_seed_bundles[first_seed] + fake = tmp_path / "full_conv" + shutil.copytree(real, fake) + train_path = fake / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["converted_within_90_days"] = True + df.to_parquet(train_path) + + bundles = {first_seed: real, 998: fake} + errors = check_cross_seed_stability(bundles) + assert any("100% conversion" in e and "998" in e for e in errors) + + def test_detects_wide_rate_spread( + self, tmp_path: Path, multi_seed_bundles: dict[int, Path] + ) -> None: + """A >5x spread in conversion rates should be flagged.""" + first_seed = next(iter(multi_seed_bundles)) + real = multi_seed_bundles[first_seed] + fake = tmp_path / "low_rate" + shutil.copytree(real, fake) + train_path = fake / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + # Set all but one row to False → very low rate. + df["converted_within_90_days"] = False + df.iloc[0, df.columns.get_loc("converted_within_90_days")] = True + df.to_parquet(train_path) + + bundles = {first_seed: real, 997: fake} + errors = check_cross_seed_stability(bundles) + assert any("spread too wide" in e for e in errors) + + def test_detects_single_stage_seed( + self, tmp_path: Path, multi_seed_bundles: dict[int, Path] + ) -> None: + """A seed where all leads are in one stage should be flagged.""" + first_seed = next(iter(multi_seed_bundles)) + real = multi_seed_bundles[first_seed] + fake = tmp_path / "one_stage" + shutil.copytree(real, fake) + leads_path = fake / "tables/leads.parquet" + df = pd.read_parquet(leads_path) + df["current_stage"] = "mql" + df.to_parquet(leads_path) + + bundles = {first_seed: real, 996: fake} + errors = check_cross_seed_stability(bundles) + assert any("only 1 funnel stage" in e and "996" in e for e in errors) diff --git a/tests/validation/test_invariants.py b/tests/validation/test_invariants.py new file mode 100644 index 0000000..fd0f6a4 --- /dev/null +++ b/tests/validation/test_invariants.py @@ -0,0 +1,79 @@ +"""Tests for leadforge.validation.invariants.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from leadforge.api.generator import Generator +from leadforge.validation.invariants import check_determinism, check_exposure_monotonicity + +_SMALL = {"n_leads": 20, "n_accounts": 10, "n_contacts": 30} + + +@pytest.fixture(scope="module") +def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: + """Generate two bundles with the same seed.""" + a = tmp_path_factory.mktemp("det_a") + b = tmp_path_factory.mktemp("det_b") + for out in (a, b): + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", seed=77, exposure_mode="student_public" + ) + gen.generate(**_SMALL).save(str(out)) + return a, b + + +@pytest.fixture(scope="module") +def exposure_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: + """Generate student_public and research_instructor bundles.""" + student = tmp_path_factory.mktemp("student") + instructor = tmp_path_factory.mktemp("instructor") + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=88, exposure_mode="student_public" + ).generate(**_SMALL).save(str(student)) + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=88, exposure_mode="research_instructor" + ).generate(**_SMALL).save(str(instructor)) + return student, instructor + + +class TestDeterminism: + def test_same_seed_produces_identical_bundles( + self, determinism_bundles: tuple[Path, Path] + ) -> None: + a, b = determinism_bundles + errors = check_determinism(a, b) + assert errors == [] + + def test_different_seeds_differ(self, tmp_path: Path) -> None: + a = tmp_path / "seed1" + b = tmp_path / "seed2" + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=1, exposure_mode="student_public" + ).generate(**_SMALL).save(str(a)) + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=2, exposure_mode="student_public" + ).generate(**_SMALL).save(str(b)) + errors = check_determinism(a, b) + assert len(errors) > 0 + + +class TestExposureMonotonicity: + def test_valid_pair_passes(self, exposure_bundles: tuple[Path, Path]) -> None: + student, instructor = exposure_bundles + errors = check_exposure_monotonicity(student, instructor) + assert errors == [] + + def test_student_with_metadata_fails(self, exposure_bundles: tuple[Path, Path]) -> None: + student, instructor = exposure_bundles + # Swap args — instructor as "student" has metadata/, should fail + errors = check_exposure_monotonicity(instructor, instructor) + assert any("should not contain metadata" in e for e in errors) + + def test_instructor_without_metadata_fails(self, exposure_bundles: tuple[Path, Path]) -> None: + student, _ = exposure_bundles + # Student as "instructor" lacks metadata/ + errors = check_exposure_monotonicity(student, student) + assert any("missing metadata" in e for e in errors) diff --git a/tests/validation/test_realism.py b/tests/validation/test_realism.py new file mode 100644 index 0000000..e77cd0d --- /dev/null +++ b/tests/validation/test_realism.py @@ -0,0 +1,108 @@ +"""Tests for leadforge.validation.realism.""" + +from __future__ import annotations + +import shutil +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.api.generator import Generator +from leadforge.core.serialization import load_json +from leadforge.validation.realism import check_realism + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +@pytest.fixture(scope="module") +def bundle_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("realism") + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=42, exposure_mode="student_public" + ).generate(**_SMALL).save(str(out)) + return out + + +@pytest.fixture(scope="module") +def manifest(bundle_dir: Path) -> dict: + return load_json(bundle_dir / "manifest.json") + + +class TestRealism: + def test_valid_bundle_passes(self, bundle_dir: Path, manifest: dict) -> None: + errors = check_realism(bundle_dir, manifest) + assert errors == [], f"Unexpected realism errors: {errors}" + + def test_detects_zero_row_table(self, tmp_path: Path, bundle_dir: Path) -> None: + """An empty accounts Parquet file should flag.""" + corrupt = tmp_path / "zero_rows" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + # Write an empty Parquet file (preserving columns). + orig = pd.read_parquet(corrupt / "tables/accounts.parquet") + orig.head(0).to_parquet(corrupt / "tables/accounts.parquet") + + errors = check_realism(corrupt, manifest) + assert any("0 rows" in e for e in errors) + + def test_detects_low_conversion_rate(self, tmp_path: Path, bundle_dir: Path) -> None: + corrupt = tmp_path / "low_rate" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + train_path = corrupt / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["converted_within_90_days"] = False + df.to_parquet(train_path) + + errors = check_realism(corrupt, manifest) + assert any("suspiciously low" in e for e in errors) + + def test_detects_high_conversion_rate(self, tmp_path: Path, bundle_dir: Path) -> None: + corrupt = tmp_path / "high_rate" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + train_path = corrupt / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["converted_within_90_days"] = True + df.to_parquet(train_path) + + errors = check_realism(corrupt, manifest) + assert any("suspiciously high" in e for e in errors) + + def test_detects_negative_count_feature(self, tmp_path: Path, bundle_dir: Path) -> None: + corrupt = tmp_path / "neg_count" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + train_path = corrupt / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["touch_count"] = pd.array([-1] * len(df), dtype="Int64") + df.to_parquet(train_path) + + errors = check_realism(corrupt, manifest) + assert any("negative" in e and "touch_count" in e for e in errors) + + def test_detects_non_boolean_feature(self, tmp_path: Path, bundle_dir: Path) -> None: + corrupt = tmp_path / "bad_bool" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + train_path = corrupt / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + # Replace boolean column with a string — clearly not boolean dtype. + df["is_mql"] = "yes" + df.to_parquet(train_path) + + errors = check_realism(corrupt, manifest) + assert any("non-boolean dtype" in e and "is_mql" in e for e in errors) + + def test_detects_single_stage(self, tmp_path: Path, bundle_dir: Path) -> None: + corrupt = tmp_path / "one_stage" + shutil.copytree(bundle_dir, corrupt) + manifest = load_json(corrupt / "manifest.json") + leads_path = corrupt / "tables/leads.parquet" + df = pd.read_parquet(leads_path) + df["current_stage"] = "mql" + df.to_parquet(leads_path) + + errors = check_realism(corrupt, manifest) + assert any("single funnel stage" in e for e in errors)