Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ First public dataset release: `leadforge-b2b-lead-scoring`. Three difficulty tie
- [ ] Upload to Kaggle and HuggingFace
- [ ] Announce

### Difficulty modulation ✓ (PR pending)

- [x] `leadforge/core/models.py` — `DifficultyParams` frozen dataclass; optional field on `GenerationConfig`
- [x] `leadforge/mechanisms/policies.py` — `assign_mechanisms()` accepts `difficulty_params`; per-motif calibration computes target daily hazard from conversion_rate_range; signal_strength scales LatentScore weights
- [x] `leadforge/simulation/engine.py` — threads `difficulty_params` to `assign_mechanisms()`; churn rate modulated by committee_friction
- [x] `leadforge/render/snapshots.py` — `_apply_difficulty_distortions()` injects Gaussian noise (noise_scale), MCAR missingness (missing_rate), and outliers (outlier_rate) into float features
- [x] `leadforge/api/generator.py` — constructs `DifficultyParams` from profile YAML, attaches to config
- [x] `leadforge/api/bundle.py` — passes difficulty_params and seed to `build_snapshot()`
- [x] `leadforge/validation/difficulty.py` — real `check_difficulty_ordering()` validates rates are in declared range and ordered
- [x] `tests/test_difficulty_modulation.py` — 13 tests (unit, integration, determinism, distortions)
- [x] Calibration across 20 seeds × 5 motif families: intro mean 43%, intermediate mean 22%, advanced mean 9%
- [x] All 865 tests pass

### Known issue: `current_stage` leakage at 90-day horizon

The full bundle snapshot includes `current_stage` which at day 90 contains terminal stages (`closed_won`/`closed_lost`). This perfectly encodes the label. The flat CSV export drops it; the Parquet task splits retain it with documentation. A proper fix (windowed snapshot or column redaction in the exposure layer) is deferred.
Expand Down
8 changes: 7 additions & 1 deletion leadforge/api/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ def write_bundle(
# ------------------------------------------------------------------
# 2. Snapshot + task splits → tasks/
# ------------------------------------------------------------------
snapshot = build_snapshot(result, population, horizon_days=config.horizon_days)
snapshot = build_snapshot(
result,
population,
horizon_days=config.horizon_days,
difficulty_params=config.difficulty_params,
seed=config.seed,
)
task = task_manifest_for_config(config.primary_task, config.label_window_days)
task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task)

Expand Down
33 changes: 32 additions & 1 deletion leadforge/api/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any

from leadforge.core.enums import DifficultyProfile, ExposureMode
from leadforge.core.models import GenerationConfig, WorldBundle, WorldSpec
from leadforge.core.models import DifficultyParams, GenerationConfig, WorldBundle, WorldSpec
from leadforge.core.rng import RNGRoot
from leadforge.core.sentinels import _MISSING

Expand Down Expand Up @@ -188,6 +188,37 @@ def generate(
profiles = recipe.load_difficulty_profiles()
profile = profiles.get(config.difficulty.value, {})
category_latent_correlations = profile.get("category_latent_correlations")

# Construct DifficultyParams from profile and attach to config.
# All keys are required — a missing key indicates a malformed profile
# YAML and should fail loudly rather than silently defaulting.
required_keys = (
"signal_strength",
"noise_scale",
"missing_rate",
"outlier_rate",
"conversion_rate_range",
"committee_friction",
)
missing = [k for k in required_keys if k not in profile]
if missing:
from leadforge.core.exceptions import InvalidRecipeError

raise InvalidRecipeError(
f"Difficulty profile '{config.difficulty.value}' is missing "
f"required keys: {missing}"
)
cr_range = profile["conversion_rate_range"]
difficulty_params = DifficultyParams(
signal_strength=profile["signal_strength"],
noise_scale=profile["noise_scale"],
missing_rate=profile["missing_rate"],
outlier_rate=profile["outlier_rate"],
conversion_rate_lo=cr_range[0],
conversion_rate_hi=cr_range[1],
committee_friction=profile["committee_friction"],
)
config = dataclasses.replace(config, difficulty_params=difficulty_params)
except (FileNotFoundError, KeyError):
category_latent_correlations = None

Expand Down
18 changes: 18 additions & 0 deletions leadforge/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
from leadforge.structure.graph import WorldGraph


@dataclass(frozen=True)
class DifficultyParams:
"""Numeric parameters from a difficulty profile.

Carried on :class:`GenerationConfig` to thread difficulty-dependent
behaviour through the simulation engine and snapshot builder.
"""

signal_strength: float
noise_scale: float
missing_rate: float
outlier_rate: float
conversion_rate_lo: float
conversion_rate_hi: float
committee_friction: float


def _require_positive_int(value: Any, name: str) -> None:
"""Raise ``InvalidConfigError`` unless *value* is a positive plain ``int``.

Expand Down Expand Up @@ -49,6 +66,7 @@ class GenerationConfig:
label_window_days: int = 90
output_path: str = "./out"
package_version: str = field(default_factory=lambda: __version__)
difficulty_params: DifficultyParams | None = None

def __post_init__(self) -> None:
if isinstance(self.seed, bool) or not isinstance(self.seed, int):
Expand Down
87 changes: 81 additions & 6 deletions leadforge/mechanisms/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
from __future__ import annotations

import random
from typing import Any
from typing import TYPE_CHECKING, Any

from leadforge.mechanisms.base import MechanismAssignment

if TYPE_CHECKING:
from leadforge.core.models import DifficultyParams
from leadforge.mechanisms.counts import LatentDecayIntensity, RecencyDecayIntensity
from leadforge.mechanisms.hazards import ConversionHazard
from leadforge.mechanisms.measurement import NoisyProxy
Expand Down Expand Up @@ -200,6 +203,25 @@
_DEFAULT_HAZARD_PARAMS: dict[str, float] = {"base_rate": 0.006, "scale": 0.05}
_DEFAULT_TOUCH_BASE_RATE: float = 0.40

# Per-motif calibration constants for difficulty modulation.
# Each tuple is (reach_fraction, effective_days_at_negotiation):
# - reach_fraction: approximate share of leads that reach negotiation stage
# under baseline (no difficulty) parameters.
# - effective_days_at_negotiation: approximate days a lead spends at
# negotiation before converting or churning.
#
# Calibrated against v1.0.0 (2026-05-04) with 1000 leads × 20 seeds.
# Re-calibrate if stage transition rates, churn rate, or population
# initialisation logic changes.
_MOTIF_REACH_CALIBRATION: dict[str, tuple[float, float]] = {
"fit_dominant": (0.85, 22.0),
"intent_dominant": (0.85, 22.0),
"sales_execution_sensitive": (0.40, 18.0),
"demo_trial_mediated": (0.70, 20.0),
"buying_committee_friction": (0.32, 16.0),
}
_DEFAULT_REACH_CALIBRATION: tuple[float, float] = (0.55, 20.0)


# ---------------------------------------------------------------------------
# Public entry point
Expand All @@ -211,6 +233,7 @@ def assign_mechanisms(
rng: random.Random,
*,
latent_touch_intensity: bool = False,
difficulty_params: DifficultyParams | None = None,
) -> MechanismAssignment:
"""Build a :class:`~leadforge.mechanisms.base.MechanismAssignment` for *motif_family*.

Expand All @@ -232,19 +255,71 @@ def assign_mechanisms(
Returns:
A fully populated :class:`~leadforge.mechanisms.base.MechanismAssignment`.
"""
conv_weights = _CONVERSION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS)
hazard_p = _HAZARD_PARAMS.get(motif_family, _DEFAULT_HAZARD_PARAMS)
trans_weights = _TRANSITION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS)
conv_weights = dict(_CONVERSION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS))
hazard_p = dict(_HAZARD_PARAMS.get(motif_family, _DEFAULT_HAZARD_PARAMS))
trans_weights = dict(_TRANSITION_SCORE_WEIGHTS.get(motif_family, _DEFAULT_CONVERSION_WEIGHTS))
touch_rate = _TOUCH_BASE_RATES.get(motif_family, _DEFAULT_TOUCH_BASE_RATE)

# -- Difficulty modulation ------------------------------------------------
signal = 1.0
if difficulty_params is not None:
signal = difficulty_params.signal_strength

# Override conversion hazard params to produce the target conversion rate.
#
# The baseline conversion rate varies significantly by motif family due
# to differences in how many leads reach negotiation and how latent
# scores distribute. We use per-motif calibration constants to compute
# the daily hazard probability that produces the target overall rate.
#
# Model: P(convert) ≈ reach_frac × [1 - (1-daily_p)^N_days]
target_mid = (
difficulty_params.conversion_rate_lo + difficulty_params.conversion_rate_hi
) / 2

reach_frac, days_at_negotiation = _MOTIF_REACH_CALIBRATION.get(
motif_family, _DEFAULT_REACH_CALIBRATION
)

# Target P(convert | reached negotiation).
p_convert_given_neg = min(0.92, target_mid / reach_frac)
target_daily_p = 1.0 - (1.0 - p_convert_given_neg) ** (1.0 / days_at_negotiation)

# Split into base_rate (score-independent) and scale (score-dependent).
# Preserve the motif's original ratio between base_rate and scale.
orig_sum = hazard_p["base_rate"] + hazard_p["scale"]
if orig_sum > 0:
base_frac = hazard_p["base_rate"] / orig_sum
else:
base_frac = 0.15
hazard_p = {
"base_rate": target_daily_p * base_frac,
"scale": target_daily_p * (1.0 - base_frac),
}

# Apply signal_strength to LatentScore weights.
# To reduce signal (lower signal_strength), we attenuate secondary weights
# more than the primary one. This reduces discriminability rather than just
# shifting the sigmoid. The strongest weight is scaled by `signal`, the
# rest by `signal^1.5`, so intro (0.90) barely changes while advanced (0.50)
# meaningfully weakens secondary signals.
def _scale_weights(weights: dict[str, float], s: float) -> dict[str, float]:
if not weights or s >= 1.0:
return dict(weights)
max_abs = max(abs(v) for v in weights.values())
return {k: v * s if abs(v) >= max_abs - 1e-9 else v * (s**1.5) for k, v in weights.items()}

scaled_conv_weights = _scale_weights(conv_weights, signal)
scaled_trans_weights = _scale_weights(trans_weights, signal)

conversion_hazard = ConversionHazard(
score_mech=LatentScore(weights=conv_weights, bias=-1.5),
score_mech=LatentScore(weights=scaled_conv_weights, bias=-1.5),
base_rate=hazard_p["base_rate"],
scale=hazard_p["scale"],
)

stage_transition = HazardTransition(
score_mech=LatentScore(weights=trans_weights, bias=-1.0),
score_mech=LatentScore(weights=scaled_trans_weights, bias=-1.0),
base_rate=0.05,
scale=0.15,
min_dwell_days=2,
Expand Down
90 changes: 90 additions & 0 deletions leadforge/render/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from leadforge.core.rng import RNGRoot
from leadforge.schema.entities import (
OpportunityRow,
SalesActivityRow,
Expand All @@ -25,6 +27,7 @@
from leadforge.simulation.population import REVENUE_BAND_MIDPOINTS

if TYPE_CHECKING:
from leadforge.core.models import DifficultyParams
from leadforge.simulation.engine import SimulationResult
from leadforge.simulation.population import PopulationResult

Expand Down Expand Up @@ -56,6 +59,8 @@ def build_snapshot(
population: PopulationResult,
horizon_days: int = 90,
snapshot_day: int | None = None,
difficulty_params: DifficultyParams | None = None,
seed: int = 42,
) -> pd.DataFrame:
"""Build the lead snapshot DataFrame from simulation output.

Expand Down Expand Up @@ -311,4 +316,89 @@ def build_snapshot(
if col in snapshot.columns:
snapshot[col] = snapshot[col].astype(dtype)

# -------------------------------------------------------------------
# Difficulty distortions: noise, missingness, outliers.
# -------------------------------------------------------------------
if difficulty_params is not None:
snapshot = _apply_difficulty_distortions(snapshot, difficulty_params, seed)

return snapshot


# ---------------------------------------------------------------------------
# Difficulty distortion helpers
# ---------------------------------------------------------------------------

# Derive eligible columns from the feature spec rather than runtime dtype
# sniffing. This guarantees categoricals, booleans, IDs, and labels are
# never distorted even if their runtime dtype happens to be numeric.
_FLOAT_DISTORTION_COLS: list[str] = [
f.name for f in LEAD_SNAPSHOT_FEATURES if f.dtype in ("Float64", "float64") and not f.is_target
]
_NUMERIC_DISTORTION_COLS: list[str] = [
f.name
for f in LEAD_SNAPSHOT_FEATURES
if f.dtype in ("Float64", "float64", "Int64", "int64") and not f.is_target
]


def _apply_difficulty_distortions(
df: pd.DataFrame,
params: DifficultyParams,
seed: int,
) -> pd.DataFrame:
"""Apply noise, missingness, and outliers to numeric snapshot features.

Returns a new DataFrame — the input is not mutated.
"""
df = df.copy()
rng_root = RNGRoot(seed)
np_rng = rng_root.numpy_child("snapshot_distortions")

# Filter to columns actually present (guards against feature spec drift).
float_cols = [c for c in _FLOAT_DISTORTION_COLS if c in df.columns]
all_numeric_cols = [c for c in _NUMERIC_DISTORTION_COLS if c in df.columns]

# 1. Gaussian noise on float features only (avoids int casting issues).
if params.noise_scale > 0:
for col in float_cols:
valid_mask = df[col].notna()
if valid_mask.sum() == 0:
continue
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
noise = np_rng.normal(0, params.noise_scale * col_std, size=len(df))
# Add noise only where values are valid.
values = df[col].copy()
values[valid_mask] = values[valid_mask] + noise[valid_mask.values]
df[col] = values

# 2. MCAR missingness injection (all numeric columns).
if params.missing_rate > 0:
mask = np_rng.random(size=(len(df), len(all_numeric_cols))) < params.missing_rate
for i, col in enumerate(all_numeric_cols):
col_mask = mask[:, i]
if col_mask.any():
# Convert int columns to float to support NaN.
if df[col].dtype in ("int64", "Int64"):
df[col] = df[col].astype("Float64")
df.loc[col_mask, col] = np.nan

# 3. Outlier injection (float columns only). Uses 5σ to produce values
# clearly distinguishable from natural variation.
if params.outlier_rate > 0:
for col in float_cols:
valid_mask = df[col].notna()
col_std = float(df.loc[valid_mask, col].std())
if col_std == 0 or np.isnan(col_std):
continue
col_median = float(df[col].median())
outlier_mask = np_rng.random(size=len(df)) < params.outlier_rate
signs = np_rng.choice([-1, 1], size=len(df)).astype(float)
outlier_values = col_median + signs * 5 * col_std
combined = outlier_mask & valid_mask.values
if combined.any():
df.loc[combined, col] = outlier_values[combined]

return df
15 changes: 13 additions & 2 deletions leadforge/simulation/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ def simulate_world(
post_sim_rng = root.child("simulation_post_sim")

mechanisms = assign_mechanisms(
world_graph.motif_family, mech_rng, latent_touch_intensity=latent_touch_intensity
world_graph.motif_family,
mech_rng,
latent_touch_intensity=latent_touch_intensity,
difficulty_params=config.difficulty_params,
)
# Narrow type for direct conversion path (daily_probability is on
# ConversionHazard, not the Mechanism ABC).
Expand Down Expand Up @@ -246,6 +249,14 @@ def simulate_world(
session_ctr = 0
activity_ctr = 0

# Effective churn rate — modestly scaled up by committee_friction for harder tiers.
dp = config.difficulty_params
effective_churn_rate = (
_DAILY_CHURN_RATE * (1.0 + 0.5 * dp.committee_friction)
if dp is not None
else _DAILY_CHURN_RATE
)

# -------------------------------------------------------------------
# Main simulation loop: t = 0 … horizon_days-1
# -------------------------------------------------------------------
Expand All @@ -264,7 +275,7 @@ def simulate_world(
)

# -- 1. Churn check (transition stream) ----------------------
if transition_rng.random() < _DAILY_CHURN_RATE:
if transition_rng.random() < effective_churn_rate:
state.mark_churned(t)
continue # no events emitted on churn day

Expand Down
Loading
Loading