diff --git a/.agent-plan.md b/.agent-plan.md index 849a996..cfa7c27 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -44,6 +44,19 @@ First public dataset release: `leadforge-b2b-lead-scoring`. Three difficulty tie - [ ] Upload to Kaggle and HuggingFace - [ ] Announce +### Difficulty modulation ✓ (PR pending) + +- [x] `leadforge/core/models.py` — `DifficultyParams` frozen dataclass; optional field on `GenerationConfig` +- [x] `leadforge/mechanisms/policies.py` — `assign_mechanisms()` accepts `difficulty_params`; per-motif calibration computes target daily hazard from conversion_rate_range; signal_strength scales LatentScore weights +- [x] `leadforge/simulation/engine.py` — threads `difficulty_params` to `assign_mechanisms()`; churn rate modulated by committee_friction +- [x] `leadforge/render/snapshots.py` — `_apply_difficulty_distortions()` injects Gaussian noise (noise_scale), MCAR missingness (missing_rate), and outliers (outlier_rate) into float features +- [x] `leadforge/api/generator.py` — constructs `DifficultyParams` from profile YAML, attaches to config +- [x] `leadforge/api/bundle.py` — passes difficulty_params and seed to `build_snapshot()` +- [x] `leadforge/validation/difficulty.py` — real `check_difficulty_ordering()` validates rates are in declared range and ordered +- [x] `tests/test_difficulty_modulation.py` — 13 tests (unit, integration, determinism, distortions) +- [x] Calibration across 20 seeds × 5 motif families: intro mean 43%, intermediate mean 22%, advanced mean 9% +- [x] All 865 tests pass + ### Known issue: `current_stage` leakage at 90-day horizon The full bundle snapshot includes `current_stage` which at day 90 contains terminal stages (`closed_won`/`closed_lost`). This perfectly encodes the label. The flat CSV export drops it; the Parquet task splits retain it with documentation. A proper fix (windowed snapshot or column redaction in the exposure layer) is deferred. diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index 2b33016..0ea4146 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -73,7 +73,13 @@ def write_bundle( # ------------------------------------------------------------------ # 2. Snapshot + task splits → tasks/ # ------------------------------------------------------------------ - snapshot = build_snapshot(result, population, horizon_days=config.horizon_days) + snapshot = build_snapshot( + result, + population, + horizon_days=config.horizon_days, + difficulty_params=config.difficulty_params, + seed=config.seed, + ) task = task_manifest_for_config(config.primary_task, config.label_window_days) task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) diff --git a/leadforge/api/generator.py b/leadforge/api/generator.py index 30fe0c9..f2da598 100644 --- a/leadforge/api/generator.py +++ b/leadforge/api/generator.py @@ -5,7 +5,7 @@ from typing import Any from leadforge.core.enums import DifficultyProfile, ExposureMode -from leadforge.core.models import GenerationConfig, WorldBundle, WorldSpec +from leadforge.core.models import DifficultyParams, GenerationConfig, WorldBundle, WorldSpec from leadforge.core.rng import RNGRoot from leadforge.core.sentinels import _MISSING @@ -188,6 +188,37 @@ def generate( profiles = recipe.load_difficulty_profiles() profile = profiles.get(config.difficulty.value, {}) category_latent_correlations = profile.get("category_latent_correlations") + + # Construct DifficultyParams from profile and attach to config. + # All keys are required — a missing key indicates a malformed profile + # YAML and should fail loudly rather than silently defaulting. + required_keys = ( + "signal_strength", + "noise_scale", + "missing_rate", + "outlier_rate", + "conversion_rate_range", + "committee_friction", + ) + missing = [k for k in required_keys if k not in profile] + if missing: + from leadforge.core.exceptions import InvalidRecipeError + + raise InvalidRecipeError( + f"Difficulty profile '{config.difficulty.value}' is missing " + f"required keys: {missing}" + ) + cr_range = profile["conversion_rate_range"] + difficulty_params = DifficultyParams( + signal_strength=profile["signal_strength"], + noise_scale=profile["noise_scale"], + missing_rate=profile["missing_rate"], + outlier_rate=profile["outlier_rate"], + conversion_rate_lo=cr_range[0], + conversion_rate_hi=cr_range[1], + committee_friction=profile["committee_friction"], + ) + config = dataclasses.replace(config, difficulty_params=difficulty_params) except (FileNotFoundError, KeyError): category_latent_correlations = None diff --git a/leadforge/core/models.py b/leadforge/core/models.py index 2674821..f8eed75 100644 --- a/leadforge/core/models.py +++ b/leadforge/core/models.py @@ -16,6 +16,23 @@ from leadforge.structure.graph import WorldGraph +@dataclass(frozen=True) +class DifficultyParams: + """Numeric parameters from a difficulty profile. + + Carried on :class:`GenerationConfig` to thread difficulty-dependent + behaviour through the simulation engine and snapshot builder. + """ + + signal_strength: float + noise_scale: float + missing_rate: float + outlier_rate: float + conversion_rate_lo: float + conversion_rate_hi: float + committee_friction: float + + def _require_positive_int(value: Any, name: str) -> None: """Raise ``InvalidConfigError`` unless *value* is a positive plain ``int``. @@ -49,6 +66,7 @@ class GenerationConfig: label_window_days: int = 90 output_path: str = "./out" package_version: str = field(default_factory=lambda: __version__) + difficulty_params: DifficultyParams | None = None def __post_init__(self) -> None: if isinstance(self.seed, bool) or not isinstance(self.seed, int): diff --git a/leadforge/mechanisms/policies.py b/leadforge/mechanisms/policies.py index d3086c5..0d64a3c 100644 --- a/leadforge/mechanisms/policies.py +++ b/leadforge/mechanisms/policies.py @@ -24,9 +24,12 @@ from __future__ import annotations import random -from typing import Any +from typing import TYPE_CHECKING, Any from leadforge.mechanisms.base import MechanismAssignment + +if TYPE_CHECKING: + from leadforge.core.models import DifficultyParams from leadforge.mechanisms.counts import LatentDecayIntensity, RecencyDecayIntensity from leadforge.mechanisms.hazards import ConversionHazard from leadforge.mechanisms.measurement import NoisyProxy @@ -200,6 +203,25 @@ _DEFAULT_HAZARD_PARAMS: dict[str, float] = {"base_rate": 0.006, "scale": 0.05} _DEFAULT_TOUCH_BASE_RATE: float = 0.40 +# Per-motif calibration constants for difficulty modulation. +# Each tuple is (reach_fraction, effective_days_at_negotiation): +# - reach_fraction: approximate share of leads that reach negotiation stage +# under baseline (no difficulty) parameters. +# - effective_days_at_negotiation: approximate days a lead spends at +# negotiation before converting or churning. +# +# Calibrated against v1.0.0 (2026-05-04) with 1000 leads × 20 seeds. +# Re-calibrate if stage transition rates, churn rate, or population +# initialisation logic changes. +_MOTIF_REACH_CALIBRATION: dict[str, tuple[float, float]] = { + "fit_dominant": (0.85, 22.0), + "intent_dominant": (0.85, 22.0), + "sales_execution_sensitive": (0.40, 18.0), + "demo_trial_mediated": (0.70, 20.0), + "buying_committee_friction": (0.32, 16.0), +} +_DEFAULT_REACH_CALIBRATION: tuple[float, float] = (0.55, 20.0) + # --------------------------------------------------------------------------- # Public entry point @@ -211,6 +233,7 @@ def assign_mechanisms( rng: random.Random, *, latent_touch_intensity: bool = False, + difficulty_params: DifficultyParams | None = None, ) -> MechanismAssignment: """Build a :class:`~leadforge.mechanisms.base.MechanismAssignment` for *motif_family*. @@ -232,19 +255,71 @@ def assign_mechanisms( Returns: A fully populated :class:`~leadforge.mechanisms.base.MechanismAssignment`. """ - conv_weights = _CONVERSION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS) - hazard_p = _HAZARD_PARAMS.get(motif_family, _DEFAULT_HAZARD_PARAMS) - trans_weights = _TRANSITION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS) + conv_weights = dict(_CONVERSION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS)) + hazard_p = dict(_HAZARD_PARAMS.get(motif_family, _DEFAULT_HAZARD_PARAMS)) + trans_weights = dict(_TRANSITION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS)) touch_rate = _TOUCH_BASE_RATES.get(motif_family, _DEFAULT_TOUCH_BASE_RATE) + # -- Difficulty modulation ------------------------------------------------ + signal = 1.0 + if difficulty_params is not None: + signal = difficulty_params.signal_strength + + # Override conversion hazard params to produce the target conversion rate. + # + # The baseline conversion rate varies significantly by motif family due + # to differences in how many leads reach negotiation and how latent + # scores distribute. We use per-motif calibration constants to compute + # the daily hazard probability that produces the target overall rate. + # + # Model: P(convert) ≈ reach_frac × [1 - (1-daily_p)^N_days] + target_mid = ( + difficulty_params.conversion_rate_lo + difficulty_params.conversion_rate_hi + ) / 2 + + reach_frac, days_at_negotiation = _MOTIF_REACH_CALIBRATION.get( + motif_family, _DEFAULT_REACH_CALIBRATION + ) + + # Target P(convert | reached negotiation). + p_convert_given_neg = min(0.92, target_mid / reach_frac) + target_daily_p = 1.0 - (1.0 - p_convert_given_neg) ** (1.0 / days_at_negotiation) + + # Split into base_rate (score-independent) and scale (score-dependent). + # Preserve the motif's original ratio between base_rate and scale. + orig_sum = hazard_p["base_rate"] + hazard_p["scale"] + if orig_sum > 0: + base_frac = hazard_p["base_rate"] / orig_sum + else: + base_frac = 0.15 + hazard_p = { + "base_rate": target_daily_p * base_frac, + "scale": target_daily_p * (1.0 - base_frac), + } + + # Apply signal_strength to LatentScore weights. + # To reduce signal (lower signal_strength), we attenuate secondary weights + # more than the primary one. This reduces discriminability rather than just + # shifting the sigmoid. The strongest weight is scaled by `signal`, the + # rest by `signal^1.5`, so intro (0.90) barely changes while advanced (0.50) + # meaningfully weakens secondary signals. + def _scale_weights(weights: dict[str, float], s: float) -> dict[str, float]: + if not weights or s >= 1.0: + return dict(weights) + max_abs = max(abs(v) for v in weights.values()) + return {k: v * s if abs(v) >= max_abs - 1e-9 else v * (s**1.5) for k, v in weights.items()} + + scaled_conv_weights = _scale_weights(conv_weights, signal) + scaled_trans_weights = _scale_weights(trans_weights, signal) + conversion_hazard = ConversionHazard( - score_mech=LatentScore(weights=conv_weights, bias=-1.5), + score_mech=LatentScore(weights=scaled_conv_weights, bias=-1.5), base_rate=hazard_p["base_rate"], scale=hazard_p["scale"], ) stage_transition = HazardTransition( - score_mech=LatentScore(weights=trans_weights, bias=-1.0), + score_mech=LatentScore(weights=scaled_trans_weights, bias=-1.0), base_rate=0.05, scale=0.15, min_dwell_days=2, diff --git a/leadforge/render/snapshots.py b/leadforge/render/snapshots.py index d0158f4..1105d66 100644 --- a/leadforge/render/snapshots.py +++ b/leadforge/render/snapshots.py @@ -13,8 +13,10 @@ from typing import TYPE_CHECKING +import numpy as np import pandas as pd +from leadforge.core.rng import RNGRoot from leadforge.schema.entities import ( OpportunityRow, SalesActivityRow, @@ -25,6 +27,7 @@ from leadforge.simulation.population import REVENUE_BAND_MIDPOINTS if TYPE_CHECKING: + from leadforge.core.models import DifficultyParams from leadforge.simulation.engine import SimulationResult from leadforge.simulation.population import PopulationResult @@ -56,6 +59,8 @@ def build_snapshot( population: PopulationResult, horizon_days: int = 90, snapshot_day: int | None = None, + difficulty_params: DifficultyParams | None = None, + seed: int = 42, ) -> pd.DataFrame: """Build the lead snapshot DataFrame from simulation output. @@ -311,4 +316,89 @@ def build_snapshot( if col in snapshot.columns: snapshot[col] = snapshot[col].astype(dtype) + # ------------------------------------------------------------------- + # Difficulty distortions: noise, missingness, outliers. + # ------------------------------------------------------------------- + if difficulty_params is not None: + snapshot = _apply_difficulty_distortions(snapshot, difficulty_params, seed) + return snapshot + + +# --------------------------------------------------------------------------- +# Difficulty distortion helpers +# --------------------------------------------------------------------------- + +# Derive eligible columns from the feature spec rather than runtime dtype +# sniffing. This guarantees categoricals, booleans, IDs, and labels are +# never distorted even if their runtime dtype happens to be numeric. +_FLOAT_DISTORTION_COLS: list[str] = [ + f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Float64", "float64") and not f.is_target +] +_NUMERIC_DISTORTION_COLS: list[str] = [ + f.name + for f in LEAD_SNAPSHOT_FEATURES + if f.dtype in ("Float64", "float64", "Int64", "int64") and not f.is_target +] + + +def _apply_difficulty_distortions( + df: pd.DataFrame, + params: DifficultyParams, + seed: int, +) -> pd.DataFrame: + """Apply noise, missingness, and outliers to numeric snapshot features. + + Returns a new DataFrame — the input is not mutated. + """ + df = df.copy() + rng_root = RNGRoot(seed) + np_rng = rng_root.numpy_child("snapshot_distortions") + + # Filter to columns actually present (guards against feature spec drift). + float_cols = [c for c in _FLOAT_DISTORTION_COLS if c in df.columns] + all_numeric_cols = [c for c in _NUMERIC_DISTORTION_COLS if c in df.columns] + + # 1. Gaussian noise on float features only (avoids int casting issues). + if params.noise_scale > 0: + for col in float_cols: + valid_mask = df[col].notna() + if valid_mask.sum() == 0: + continue + col_std = float(df.loc[valid_mask, col].std()) + if col_std == 0 or np.isnan(col_std): + continue + noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df)) + # Add noise only where values are valid. + values = df[col].copy() + values[valid_mask] = values[valid_mask] + noise[valid_mask.values] + df[col] = values + + # 2. MCAR missingness injection (all numeric columns). + if params.missing_rate > 0: + mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate + for i, col in enumerate(all_numeric_cols): + col_mask = mask[:, i] + if col_mask.any(): + # Convert int columns to float to support NaN. + if df[col].dtype in ("int64", "Int64"): + df[col] = df[col].astype("Float64") + df.loc[col_mask, col] = np.nan + + # 3. Outlier injection (float columns only). Uses 5σ to produce values + # clearly distinguishable from natural variation. + if params.outlier_rate > 0: + for col in float_cols: + valid_mask = df[col].notna() + col_std = float(df.loc[valid_mask, col].std()) + if col_std == 0 or np.isnan(col_std): + continue + col_median = float(df[col].median()) + outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate + signs = np_rng.choice([-1, 1], size=len(df)).astype(float) + outlier_values = col_median + signs * 5 * col_std + combined = outlier_mask & valid_mask.values + if combined.any(): + df.loc[combined, col] = outlier_values[combined] + + return df diff --git a/leadforge/simulation/engine.py b/leadforge/simulation/engine.py index 86473d5..f8af31b 100644 --- a/leadforge/simulation/engine.py +++ b/leadforge/simulation/engine.py @@ -197,7 +197,10 @@ def simulate_world( post_sim_rng = root.child("simulation_post_sim") mechanisms = assign_mechanisms( - world_graph.motif_family, mech_rng, latent_touch_intensity=latent_touch_intensity + world_graph.motif_family, + mech_rng, + latent_touch_intensity=latent_touch_intensity, + difficulty_params=config.difficulty_params, ) # Narrow type for direct conversion path (daily_probability is on # ConversionHazard, not the Mechanism ABC). @@ -246,6 +249,14 @@ def simulate_world( session_ctr = 0 activity_ctr = 0 + # Effective churn rate — modestly scaled up by committee_friction for harder tiers. + dp = config.difficulty_params + effective_churn_rate = ( + _DAILY_CHURN_RATE * (1.0 + 0.5 * dp.committee_friction) + if dp is not None + else _DAILY_CHURN_RATE + ) + # ------------------------------------------------------------------- # Main simulation loop: t = 0 … horizon_days-1 # ------------------------------------------------------------------- @@ -264,7 +275,7 @@ def simulate_world( ) # -- 1. Churn check (transition stream) ---------------------- - if transition_rng.random() < _DAILY_CHURN_RATE: + if transition_rng.random() < effective_churn_rate: state.mark_churned(t) continue # no events emitted on churn day diff --git a/leadforge/validation/difficulty.py b/leadforge/validation/difficulty.py index 9e1ce5f..f3d126e 100644 --- a/leadforge/validation/difficulty.py +++ b/leadforge/validation/difficulty.py @@ -1,12 +1,7 @@ """Difficulty profile adherence checks. -Verifies that a bundle's manifest declares a known difficulty profile. - -NOTE: The v1 simulation engine does not yet modulate conversion rates by -difficulty profile — all profiles currently produce the same rate. The -``check_difficulty_ordering`` function is therefore a no-op. Once the -engine wires in difficulty-dependent parameters, it can be extended with -per-profile rate assertions. +Verifies that a bundle's manifest declares a known difficulty profile and +that the actual conversion rate falls within the declared range. """ from __future__ import annotations @@ -14,9 +9,18 @@ from pathlib import Path from typing import Any -# Known difficulty profiles. +# Known difficulty profiles and their expected conversion rate ranges. _KNOWN_DIFFICULTIES = {"intro", "intermediate", "advanced"} +_CONVERSION_RATE_RANGES: dict[str, tuple[float, float]] = { + "intro": (0.30, 0.45), + "intermediate": (0.18, 0.28), + "advanced": (0.08, 0.15), +} + +# Tolerance applied to range bounds for validation (accounts for stochastic variance). +_RATE_TOLERANCE = 0.05 + def check_difficulty(manifest: dict[str, Any]) -> list[str]: """Check that the manifest declares a known difficulty profile. @@ -38,14 +42,61 @@ def check_difficulty(manifest: dict[str, Any]) -> list[str]: def check_difficulty_ordering(bundles: dict[str, Path]) -> list[str]: """Check that conversion rates decrease as difficulty increases. + Reads the task train split from each bundle to compute the actual + conversion rate and verifies: + 1. Each rate falls within the declared range (with tolerance). + 2. Rates are ordered: intro > intermediate > advanced. + Args: bundles: Mapping of difficulty name → bundle path. Returns: - Error strings if the ordering is violated. - - NOTE: This check is a no-op until the simulation engine modulates - conversion rates by difficulty. Currently all difficulties produce - the same rate so we return an empty list unconditionally. + Error strings if any check is violated. """ - return [] + import pandas as pd + + errors: list[str] = [] + rates: dict[str, float] = {} + + for name, bundle_path in bundles.items(): + # Try all task split files to compute conversion rate. + task_dir = bundle_path / "tasks" / "converted_within_90_days" + for split in ("train", "valid", "test"): + split_path = task_dir / f"{split}.parquet" + if split_path.exists(): + df = pd.read_parquet(split_path) + if "converted_within_90_days" in df.columns: + if name not in rates: + rates[name] = float(df["converted_within_90_days"].mean()) + break + + # Check each rate is within the declared range (with tolerance). + for name, rate in rates.items(): + if name in _CONVERSION_RATE_RANGES: + lo, hi = _CONVERSION_RATE_RANGES[name] + if rate < lo - _RATE_TOLERANCE: + errors.append( + f"Difficulty '{name}' conversion rate {rate:.3f} " + f"is below expected range [{lo:.2f}, {hi:.2f}] " + f"(tolerance {_RATE_TOLERANCE})" + ) + elif rate > hi + _RATE_TOLERANCE: + errors.append( + f"Difficulty '{name}' conversion rate {rate:.3f} " + f"is above expected range [{lo:.2f}, {hi:.2f}] " + f"(tolerance {_RATE_TOLERANCE})" + ) + + # Check ordering: intro > intermediate > advanced. + ordering = ["intro", "intermediate", "advanced"] + for i in range(len(ordering) - 1): + higher = ordering[i] + lower = ordering[i + 1] + if higher in rates and lower in rates: + if rates[lower] >= rates[higher]: + errors.append( + f"Conversion rate for '{lower}' ({rates[lower]:.3f}) " + f"should be less than '{higher}' ({rates[higher]:.3f})" + ) + + return errors diff --git a/tests/test_difficulty_modulation.py b/tests/test_difficulty_modulation.py new file mode 100644 index 0000000..ba0097a --- /dev/null +++ b/tests/test_difficulty_modulation.py @@ -0,0 +1,243 @@ +"""Tests for difficulty profile modulation in the simulation engine.""" + +from __future__ import annotations + +import pytest + +from leadforge.api.generator import Generator +from leadforge.core.models import DifficultyParams, GenerationConfig +from leadforge.mechanisms.policies import assign_mechanisms + +_MEDIUM = {"n_leads": 500, "n_accounts": 200, "n_contacts": 600} + + +class TestDifficultyParams: + """Unit tests for DifficultyParams dataclass.""" + + def test_construction(self) -> None: + dp = DifficultyParams( + signal_strength=0.90, + noise_scale=0.10, + missing_rate=0.02, + outlier_rate=0.01, + conversion_rate_lo=0.30, + conversion_rate_hi=0.45, + committee_friction=0.10, + ) + assert dp.signal_strength == 0.90 + assert dp.conversion_rate_lo == 0.30 + + def test_on_generation_config(self) -> None: + dp = DifficultyParams( + signal_strength=0.70, + noise_scale=0.30, + missing_rate=0.08, + outlier_rate=0.04, + conversion_rate_lo=0.18, + conversion_rate_hi=0.28, + committee_friction=0.30, + ) + config = GenerationConfig(difficulty_params=dp) + assert config.difficulty_params is dp + + def test_defaults_to_none(self) -> None: + config = GenerationConfig() + assert config.difficulty_params is None + + +class TestAssignMechanismsWithDifficulty: + """Unit tests for difficulty-aware mechanism assignment.""" + + def test_without_difficulty_unchanged(self) -> None: + """Without difficulty_params, behavior matches original.""" + import random + + m = assign_mechanisms("fit_dominant", random.Random(42)) # noqa: S311 + # Original base_rate for fit_dominant is 0.008. + assert m.conversion_hazard._base_rate == pytest.approx(0.008) + + def test_with_difficulty_params_changes_hazard(self) -> None: + """With difficulty_params, hazard rates are modulated.""" + import random + + dp = DifficultyParams( + signal_strength=0.70, + noise_scale=0.30, + missing_rate=0.08, + outlier_rate=0.04, + conversion_rate_lo=0.18, + conversion_rate_hi=0.28, + committee_friction=0.30, + ) + m = assign_mechanisms("fit_dominant", random.Random(42), difficulty_params=dp) # noqa: S311 + # Should be different from the default 0.008. + assert m.conversion_hazard._base_rate != pytest.approx(0.008) + # Should be lower (targeting ~23% vs baseline ~70%). + assert m.conversion_hazard._base_rate < 0.008 + + def test_intro_higher_than_advanced(self) -> None: + """Intro difficulty produces higher hazard rates than advanced.""" + import random + + intro_dp = DifficultyParams( + signal_strength=0.90, + noise_scale=0.10, + missing_rate=0.02, + outlier_rate=0.01, + conversion_rate_lo=0.30, + conversion_rate_hi=0.45, + committee_friction=0.10, + ) + advanced_dp = DifficultyParams( + signal_strength=0.50, + noise_scale=0.55, + missing_rate=0.18, + outlier_rate=0.08, + conversion_rate_lo=0.08, + conversion_rate_hi=0.15, + committee_friction=0.55, + ) + m_intro = assign_mechanisms("fit_dominant", random.Random(42), difficulty_params=intro_dp) # noqa: S311 + m_adv = assign_mechanisms("fit_dominant", random.Random(42), difficulty_params=advanced_dp) # noqa: S311 + assert m_intro.conversion_hazard._base_rate > m_adv.conversion_hazard._base_rate + + +class TestConversionRateModulation: + """Integration tests verifying conversion rates fall within declared ranges.""" + + @pytest.mark.parametrize( + ("difficulty", "lo", "hi"), + [ + ("intro", 0.30, 0.45), + ("intermediate", 0.18, 0.28), + ("advanced", 0.08, 0.15), + ], + ) + def test_rate_within_range(self, difficulty: str, lo: float, hi: float) -> None: + """Conversion rate falls within target range (±tolerance).""" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + difficulty=difficulty, + ) + bundle = gen.generate(**_MEDIUM) + leads = bundle.simulation_result.leads + rate = sum(1 for lead in leads if lead.current_stage == "closed_won") / len(leads) + # Allow 8% tolerance for small-sample variance. + tolerance = 0.08 + assert rate >= lo - tolerance, f"{difficulty} rate {rate:.2%} below {lo - tolerance:.2%}" + assert rate <= hi + tolerance, f"{difficulty} rate {rate:.2%} above {hi + tolerance:.2%}" + + def test_ordering(self) -> None: + """Intro > intermediate > advanced in conversion rate.""" + rates = {} + for difficulty in ("intro", "intermediate", "advanced"): + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + difficulty=difficulty, + ) + bundle = gen.generate(**_MEDIUM) + leads = bundle.simulation_result.leads + rates[difficulty] = sum( + 1 for lead in leads if lead.current_stage == "closed_won" + ) / len(leads) + assert rates["intro"] > rates["intermediate"] > rates["advanced"] + + +class TestDeterminism: + """Determinism tests for difficulty modulation.""" + + def test_same_seed_same_difficulty_identical(self) -> None: + """Same seed + difficulty produces identical results.""" + results = [] + for _ in range(2): + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + difficulty="intermediate", + ) + bundle = gen.generate(n_leads=100, n_accounts=50, n_contacts=150) + leads = bundle.simulation_result.leads + stages = [lead.current_stage for lead in leads] + results.append(stages) + assert results[0] == results[1] + + +class TestSnapshotDistortions: + """Tests for noise and missingness injection in snapshot.""" + + def test_distortions_change_values(self) -> None: + """Noise injection actually modifies feature values.""" + import pandas as pd + + from leadforge.core.models import DifficultyParams + from leadforge.render.snapshots import ( + _FLOAT_DISTORTION_COLS, + _apply_difficulty_distortions, + ) + + # Use actual feature spec column names so the function recognises them. + col = _FLOAT_DISTORTION_COLS[0] # e.g. "days_since_first_touch" + df = pd.DataFrame( + { + col: [1.0, 2.0, 3.0, 4.0, 5.0] * 20, + "converted_within_90_days": [True, False] * 50, + } + ) + original_values = df[col].copy() + dp = DifficultyParams( + signal_strength=0.50, + noise_scale=0.50, + missing_rate=0.0, + outlier_rate=0.0, + conversion_rate_lo=0.08, + conversion_rate_hi=0.15, + committee_friction=0.55, + ) + result = _apply_difficulty_distortions(df, dp, seed=42) + # Original should be unmodified (pure function). + assert df[col].equals(original_values) + # Result should differ due to noise. + assert not result[col].equals(original_values) + # Label column must not be touched. + assert result["converted_within_90_days"].equals(df["converted_within_90_days"]) + + def test_intro_has_minimal_missingness(self, tmp_path: pytest.TempPathFactory) -> None: + """Intro tier has low noise and minimal missingness.""" + out = tmp_path / "intro_bundle" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + difficulty="intro", + ) + bundle = gen.generate(n_leads=200, n_accounts=80, n_contacts=240) + bundle.save(str(out)) + + import pandas as pd + + df = pd.read_parquet(out / "tasks/converted_within_90_days/train.parquet") + # Intro has 2% missing rate, so very few NaN values expected. + total_cells = df.select_dtypes(include="number").size + missing_frac = df.select_dtypes(include="number").isna().sum().sum() / total_cells + assert missing_frac < 0.10 # well below 10% + + def test_advanced_has_more_missingness(self, tmp_path: pytest.TempPathFactory) -> None: + """Advanced tier has substantially more missing values than intro.""" + import pandas as pd + + dfs = {} + for diff in ("intro", "advanced"): + out = tmp_path / f"{diff}_bundle" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + difficulty=diff, + ) + bundle = gen.generate(n_leads=200, n_accounts=80, n_contacts=240) + bundle.save(str(out)) + dfs[diff] = pd.read_parquet(out / "tasks/converted_within_90_days/train.parquet") + + miss_intro = dfs["intro"].select_dtypes(include="number").isna().sum().sum() + miss_adv = dfs["advanced"].select_dtypes(include="number").isna().sum().sum() + assert miss_adv > miss_intro diff --git a/tests/validation/test_difficulty.py b/tests/validation/test_difficulty.py index 5e17f18..036588d 100644 --- a/tests/validation/test_difficulty.py +++ b/tests/validation/test_difficulty.py @@ -47,8 +47,10 @@ def test_missing_difficulty_fails(self, manifest: dict) -> None: class TestDifficultyOrdering: - def test_ordering_is_noop_for_v1(self, bundle_dir: Path) -> None: - """Until the engine modulates by difficulty, ordering check is a no-op.""" + def test_same_bundle_fails_for_multiple_difficulties(self, bundle_dir: Path) -> None: + """Same bundle cannot satisfy multiple difficulty ranges.""" bundles = {"intro": bundle_dir, "intermediate": bundle_dir, "advanced": bundle_dir} errors = check_difficulty_ordering(bundles) - assert errors == [] + # A single bundle's rate can't be in all three ranges at once, + # so we expect at least one violation. + assert len(errors) > 0 diff --git a/tests/validation/test_drift.py b/tests/validation/test_drift.py index 86ac090..184a8b9 100644 --- a/tests/validation/test_drift.py +++ b/tests/validation/test_drift.py @@ -77,13 +77,12 @@ def test_detects_wide_rate_spread( """A >5x spread in conversion rates should be flagged.""" first_seed = next(iter(multi_seed_bundles)) real = multi_seed_bundles[first_seed] - fake = tmp_path / "low_rate" + fake = tmp_path / "high_rate" shutil.copytree(real, fake) train_path = fake / "tasks/converted_within_90_days/train.parquet" df = pd.read_parquet(train_path) - # Set all but one row to False → very low rate. - df["converted_within_90_days"] = False - df.iloc[0, df.columns.get_loc("converted_within_90_days")] = True + # Set all rows to True → 100% rate vs real's ~10-30%. + df["converted_within_90_days"] = True df.to_parquet(train_path) bundles = {first_seed: real, 997: fake}