Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,17 @@ Documentation + CI:
| M14: Notebook 4 (recipe customization) | Discarded | Premature |
| M15: Docs polish + v1.0 release | **Done** | README, CHANGELOG, version bump to 1.0.0 complete; architecture diagram and notebooks remain post-v1 |

### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup
### v7 follow-up: pipeline deduplication + LatentDecayIntensity cleanup

From self-review of PR #50. Should be a single follow-up PR.
From self-review of PR #50. Completed in a single follow-up PR.

| Item | Description |
| Item | Status |
|---|---|
| Extract shared pipeline functions | Move `subsample`, `inject_missingness`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select` into `leadforge/pipelines/common.py`. v5/v6/v7 modules import from common + add version-specific logic. |
| Extract shared ML pipeline | Canonical sklearn pipeline (preprocessor + LR) used by validators and eval scripts should be a single shared utility. |
| Deduplicate feature lists | `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once (in pipeline module or shared constants), imported by validators and eval scripts. |
| Group followup params into dataclass | Introduce `FollowupRampConfig(boost_after_day, boost_factor, ramp_days, latent_weights)` in `mechanisms/counts.py`. Replace 4 constructor params with `followup: FollowupRampConfig | None = None`. |
| Fix `subsample` silent short-return | `subsample()` can return fewer than `n` rows when there aren't enough negatives. Should raise `ValueError` instead. Fix in the shared `common.py`. |
| Extract shared pipeline functions | ✓ `leadforge/pipelines/common.py` — `subsample`, `inject_missingness_v6`, `derive_features`, `softcap_expected_acv`, `assign_acquisition_wave`, `rename_and_select`. v5/v6/v7 modules import from common. |
| Extract shared ML pipeline | ✓ `leadforge/pipelines/ml.py` — `build_baseline_pipeline`, `build_preprocessor`, `fit_evaluate`, `get_feature_cols`, `sanitize_categoricals`. Used by validators and eval scripts. |
| Deduplicate feature lists | `CAT_FEATURES`, `NUM_FEATURES`, `BINARY_FEATURES` defined once in `leadforge/pipelines/common.py`, imported by validators and eval scripts. |
| Group followup params into dataclass | `FollowupRampConfig` frozen dataclass in `mechanisms/counts.py`. `LatentDecayIntensity` accepts `followup: FollowupRampConfig | None`. Legacy params still accepted for backward compat. |
| Fix `subsample` silent short-return | `subsample()` now raises `ValueError` when insufficient negatives. |

### From post-v1 list

Expand Down
94 changes: 67 additions & 27 deletions leadforge/mechanisms/counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,40 @@

import math
import random
from dataclasses import dataclass, field
from typing import Any

from leadforge.mechanisms.base import Mechanism, MechanismContext


@dataclass(frozen=True)
class FollowupRampConfig:
"""Configuration for the follow-up ramp on :class:`LatentDecayIntensity`.

Groups the four follow-up parameters into a single cohesive unit.

Attributes:
boost_after_day: Day after which latent modulation ramps up.
boost_factor: Multiplier applied to ``boost`` at the end of the ramp.
ramp_days: Number of days over which the ramp transitions linearly.
latent_weights: Optional separate latent weights used after the
followup day.
"""

boost_after_day: int
boost_factor: float = 1.0
ramp_days: int = 10
latent_weights: dict[str, float] = field(default_factory=dict)

def __post_init__(self) -> None:
if self.boost_after_day < 0:
raise ValueError(f"boost_after_day must be non-negative, got {self.boost_after_day}")
if self.boost_factor < 1.0:
raise ValueError(f"boost_factor must be >= 1.0, got {self.boost_factor}")
if self.ramp_days < 1:
raise ValueError(f"ramp_days must be >= 1, got {self.ramp_days}")


class PoissonIntensity(Mechanism):
"""Poisson-distributed event count driven by latent traits.

Expand Down Expand Up @@ -145,8 +174,8 @@ class LatentDecayIntensity(Mechanism):

where ``latent_multiplier = sum(weight_i * latents[key_i])``.

After ``followup_boost_after_day``, the effective boost ramps linearly from
``boost`` to ``boost * followup_boost_factor`` over ``followup_ramp_days``.
After ``followup.boost_after_day``, the effective boost ramps linearly from
``boost`` to ``boost * followup.boost_factor`` over ``followup.ramp_days``.
This models sales teams increasing follow-up intensity for leads that show
strong latent signals (engagement, fit, intent) — a causally legitimate
amplification of the latent → touch pathway.
Expand All @@ -159,18 +188,12 @@ class LatentDecayIntensity(Mechanism):
latent_weights: Mapping of latent-key → weight for the multiplier.
boost: Scaling factor for the latent multiplier (controls how much
latent traits amplify touch intensity).
followup_boost_after_day: Day after which latent modulation ramps up.
Set to ``None`` (default) to disable the ramp.
followup_boost_factor: Multiplier applied to ``boost`` at the end of
the ramp period. E.g. ``3.0`` means the effective boost is
``boost * 3.0`` once the ramp completes.
followup_ramp_days: Number of days over which the ramp transitions
linearly from ``boost`` to ``boost * followup_boost_factor``.
followup_latent_weights: Optional separate latent weights used after
the followup day. Models sales teams responding to *different*
latent signals during the follow-up period (e.g. prioritizing
authority and budget over raw engagement). Blended with the
base weights during the ramp period.
followup: Optional :class:`FollowupRampConfig` grouping the ramp
parameters. Set to ``None`` (default) to disable the ramp.
followup_boost_after_day: **Deprecated** — use ``followup`` instead.
followup_boost_factor: **Deprecated** — use ``followup`` instead.
followup_ramp_days: **Deprecated** — use ``followup`` instead.
followup_latent_weights: **Deprecated** — use ``followup`` instead.
"""

def __init__(
Expand All @@ -180,6 +203,8 @@ def __init__(
floor_rate: float = 0.01,
latent_weights: dict[str, float] | None = None,
boost: float = 0.8,
followup: FollowupRampConfig | None = None,
# Legacy params — kept for backward compatibility during transition
followup_boost_after_day: int | None = None,
followup_boost_factor: float = 1.0,
followup_ramp_days: int = 10,
Expand All @@ -191,25 +216,40 @@ def __init__(
raise ValueError(f"decay_factor must be in (0, 1], got {decay_factor}")
if floor_rate < 0:
raise ValueError(f"floor_rate must be non-negative, got {floor_rate}")
if followup_boost_after_day is not None and followup_boost_after_day < 0:
raise ValueError(
f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}"

# Resolve followup config: prefer the dataclass, fall back to legacy params
if followup is not None:
Comment on lines +220 to +221
# Validation is handled by FollowupRampConfig.__post_init__
self._followup_after: int | None = followup.boost_after_day
self._followup_factor = followup.boost_factor
self._followup_ramp = followup.ramp_days
self._followup_latent_weights: dict[str, float] | None = (
dict(followup.latent_weights) if followup.latent_weights else None
)
else:
# Legacy path
if followup_boost_after_day is not None and followup_boost_after_day < 0:
raise ValueError(
f"followup_boost_after_day must be non-negative, got {followup_boost_after_day}"
)
if followup_boost_factor < 1.0:
raise ValueError(
f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}"
)
if followup_ramp_days < 1:
raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}")
self._followup_after = followup_boost_after_day
self._followup_factor = followup_boost_factor
self._followup_ramp = followup_ramp_days
self._followup_latent_weights = (
dict(followup_latent_weights) if followup_latent_weights else None
)
if followup_boost_factor < 1.0:
raise ValueError(f"followup_boost_factor must be >= 1.0, got {followup_boost_factor}")
if followup_ramp_days < 1:
raise ValueError(f"followup_ramp_days must be >= 1, got {followup_ramp_days}")

self._base_rate = base_rate
self._decay = decay_factor
self._floor = floor_rate
self._latent_weights: dict[str, float] = dict(latent_weights) if latent_weights else {}
self._boost = boost
self._followup_after: int | None = followup_boost_after_day
self._followup_factor = followup_boost_factor
self._followup_ramp = followup_ramp_days
self._followup_latent_weights: dict[str, float] | None = (
dict(followup_latent_weights) if followup_latent_weights else None
)

@property
def name(self) -> str:
Expand Down
16 changes: 11 additions & 5 deletions leadforge/mechanisms/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@

if TYPE_CHECKING:
from leadforge.core.models import DifficultyParams
from leadforge.mechanisms.counts import LatentDecayIntensity, RecencyDecayIntensity
from leadforge.mechanisms.counts import (
FollowupRampConfig,
LatentDecayIntensity,
RecencyDecayIntensity,
)
from leadforge.mechanisms.hazards import ConversionHazard
from leadforge.mechanisms.measurement import NoisyProxy
from leadforge.mechanisms.scores import LatentScore
Expand Down Expand Up @@ -342,10 +346,12 @@ def _scale_weights(weights: dict[str, float], s: float) -> dict[str, float]:
floor_rate=0.02,
latent_weights=touch_latent_w,
boost=1.2,
followup_boost_after_day=20,
followup_boost_factor=10.0,
followup_ramp_days=10,
followup_latent_weights=followup_latent_w,
followup=FollowupRampConfig(
boost_after_day=20,
boost_factor=10.0,
ramp_days=10,
latent_weights=followup_latent_w,
),
)
else:
touch_intensity = RecencyDecayIntensity(
Expand Down
81 changes: 19 additions & 62 deletions leadforge/pipelines/build_v5.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,23 @@

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd

from leadforge.core.rng import RNGRoot
from leadforge.pipelines.common import (
ACV_CAP,
ACV_FLOOR,
SUBSAMPLE_N,
TARGET_RATE,
subsample,
)
from leadforge.pipelines.common import (
derive_features as _derive_features,
)
from leadforge.pipelines.common import (
rename_and_select as _rename_and_select_generic,
)

__all__ = [
"ACV_CAP",
Expand All @@ -38,12 +49,6 @@
SEED = 42
N_LEADS = 5000
SNAPSHOT_DAY = 10
SUBSAMPLE_N = 1000
TARGET_RATE = 0.30

# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k).
ACV_FLOOR = 18_000.0
ACV_CAP = 120_000.0

# v5 column set: 18 features + 1 target = 19 columns.
FINAL_COLUMNS = [
Expand Down Expand Up @@ -89,10 +94,7 @@

def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame:
"""Derive binary features for the v5 column set."""
df = df.copy()
df["opportunity_created"] = df["opportunity_created"].astype(int)
df["demo_completed"] = (df["demo_page_views"] > 0).astype(int)
return df
return _derive_features(df)


def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -112,56 +114,11 @@ def rename_and_select(
label_column: Source column for the binary label. Defaults to
``"converted_within_90_days"`` for backward compatibility.
"""
if label_column not in df.columns:
raise ValueError(
f"Label column {label_column!r} not found. Available: {sorted(df.columns)}"
)
if label_column == "converted_within_90_days":
rename_map = RENAME_MAP
else:
rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"}
rename_map[label_column] = "converted"
df = df.rename(columns=rename_map)
df["converted"] = df["converted"].astype(int)
missing = [c for c in FINAL_COLUMNS if c not in df.columns]
if missing:
raise ValueError(
f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}"
)
return df[FINAL_COLUMNS]


def subsample(
df: pd.DataFrame,
seed: int,
n: int = SUBSAMPLE_N,
target_rate: float = TARGET_RATE,
) -> pd.DataFrame:
"""Stratified subsample to n rows at target_rate conversion."""
rng = RNGRoot(seed).numpy_child("subsample")
positives = df[df["converted"] == 1]
negatives = df[df["converted"] == 0]
n_pos = int(n * target_rate)
n_neg = n - n_pos

if len(positives) < n_pos:
warnings.warn(
f"only {len(positives)} positives available, need {n_pos}",
stacklevel=2,
)
n_pos = len(positives)
n_neg = n - n_pos
if len(negatives) < n_neg:
warnings.warn(
f"only {len(negatives)} negatives available, need {n_neg}",
stacklevel=2,
)
n_neg = len(negatives)

pos_sample = positives.sample(n=n_pos, random_state=rng)
neg_sample = negatives.sample(n=n_neg, random_state=rng)
return (
pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True)
return _rename_and_select_generic(
df,
rename_map=RENAME_MAP,
final_columns=FINAL_COLUMNS,
label_column=label_column,
)


Expand Down
Loading
Loading