diff --git a/.agent-plan.md b/.agent-plan.md index dc9e7e9..4ed3a55 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,22 +6,22 @@ ## Current System State -**v0.3.0 in progress — Milestone 6 complete (PR open).** Full mechanism library implemented: -base ABC + context, static draws, influence transforms, latent scoring, conversion hazard, -stage transitions, count intensities, categorical influence, measurement proxies, and -motif-family-aware mechanism assignment. 437 tests passing. +**v0.4.0 in progress — Milestone 7 complete (PR open).** Full simulation engine implemented: +per-lead mutable state, 90-day daily-step loop, touch/session/sales-activity emission, +HazardTransition stage advancement, ConversionHazard final-close, and post-conversion +opportunity/customer/subscription creation. 490 tests passing. --- -## Active Task Breakdown — Milestone 7: Simulation Engine (v0.3.0) +## Active Task Breakdown — Milestone 8: Observation Model (v0.4.0) -Goal: Run the first real evolving world and derive conversion outcomes from events. +Goal: Transform the hidden simulated world into realistic CRM-like observations. -- [ ] **1. World state** (`simulation/state.py`) — per-lead mutable state during simulation -- [ ] **2. Simulation engine** (`simulation/engine.py`) — daily step loop, 90-day horizon -- [ ] **3. Event generation** — touches/sessions derived from count mechanisms -- [ ] **4. Stage advancement** — HazardTransition drives funnel progression -- [ ] **5. Conversion derivation** — ConversionHazard fires event; sets `converted_within_90_days` +- [ ] **1. Snapshot builder** (`render/snapshots.py`) — lead-anchored flat feature snapshot +- [ ] **2. Relational export** (`render/relational.py`) — DataFrame per table from SimulationResult +- [ ] **3. Task export** (`render/tasks.py`) — train/valid/test Parquet split for `converted_within_90_days` +- [ ] **4. Manifest builder** (`render/manifests.py`) — bundle manifest.json +- [ ] **5. Bundle writer** (`api/bundle.py`) — `WorldBundle.save(path)` --- @@ -35,7 +35,17 @@ Goal: Run the first real evolving world and derive conversion outcomes from even ## Completed Phases -### Milestone 6 — Mechanism Layer v1 ✓ (v0.3.0 in PR) +### Milestone 7 — Simulation Engine ✓ (v0.4.0 in PR) +- `simulation/state.py`: `LeadSimState` — per-lead mutable state (stage, dwell, converted, churned, sql_day) +- `simulation/engine.py`: `simulate_world()` — 90-day daily-step loop; `SimulationResult` output type + - Churn: daily 0.4% probability → `closed_lost` + - Stage advance: `HazardTransition` drives mql → … → negotiation + - Conversion: `ConversionHazard` fires from `negotiation` → `closed_won` + - Touch emission: `RecencyDecayIntensity` per day; session (30% of touch-days) and sales-activity (20% at sql+) emission + - Post-sim: `OpportunityRow` for sql+ leads; `CustomerRow` + `SubscriptionRow` for `closed_won` leads +- 45 new tests; total 490 passing + +### Milestone 6 — Mechanism Layer v1 ✓ (v0.3.0 merged) - `mechanisms/base.py`: `Mechanism` ABC, `MechanismContext`, `MechanismSummary`, `MechanismAssignment` - `mechanisms/static.py`: `CategoricalDraw`, `BoundedNumericDraw`, `MixtureDraw` - `mechanisms/influence.py`: `AdditiveInfluence`, `LogisticInfluence`, `SaturatingInfluence`, `ThresholdInfluence`, `InteractionTerm` diff --git a/leadforge/simulation/engine.py b/leadforge/simulation/engine.py new file mode 100644 index 0000000..21eaafc --- /dev/null +++ b/leadforge/simulation/engine.py @@ -0,0 +1,439 @@ +"""Discrete-time simulation engine — 90-day hybrid world evolution. + +:func:`simulate_world` is the single public entry point. It iterates daily +steps for every lead in the population, driven by the +:class:`~leadforge.mechanisms.base.MechanismAssignment` produced by +:func:`~leadforge.mechanisms.policies.assign_mechanisms`, and emits the full +set of relational event rows. + +Simulation contract +------------------- +- All randomness is derived from named substreams of ``RNGRoot(config.seed)``, + making every run fully deterministic given ``(config, population, world_graph)``. +- ``converted_within_90_days`` is **event-derived** — it becomes ``True`` only + when a lead's simulated trajectory reaches the ``closed_won`` terminal stage. +- Stage advancement is driven by :class:`~leadforge.mechanisms.transitions.HazardTransition` + (mql → … → negotiation); final conversion is driven by + :class:`~leadforge.mechanisms.hazards.ConversionHazard` (negotiation → closed_won). +- A small daily churn probability independently moves any non-terminal lead to + ``closed_lost``. + +RNG substreams +-------------- +Three named substreams keep unrelated outputs stable when any single part of +the simulation logic changes: + +- ``simulation_transitions`` — churn and stage/conversion hazard rolls. +- ``simulation_events`` — touch, session, and sales-activity emission. +- ``simulation_post_sim`` — ACV sampling for post-loop entity creation. + +Post-simulation entity creation +-------------------------------- +- An :class:`~leadforge.schema.entities.OpportunityRow` is created for every + lead that reached ``sql`` or any deeper stage. +- :class:`~leadforge.schema.entities.CustomerRow` and + :class:`~leadforge.schema.entities.SubscriptionRow` are created only for + converted leads (``closed_won``). +""" + +from __future__ import annotations + +import random +from dataclasses import dataclass, field +from datetime import date, timedelta + +from leadforge.core.ids import ID_PREFIXES, make_id +from leadforge.core.models import GenerationConfig +from leadforge.core.rng import RNGRoot +from leadforge.mechanisms.base import MechanismContext +from leadforge.mechanisms.policies import assign_mechanisms +from leadforge.mechanisms.transitions import StageSequence +from leadforge.schema.entities import ( + CustomerRow, + LeadRow, + OpportunityRow, + SalesActivityRow, + SessionRow, + SubscriptionRow, + TouchRow, +) +from leadforge.simulation.population import PopulationResult +from leadforge.simulation.state import LeadSimState +from leadforge.structure.graph import WorldGraph + +# --------------------------------------------------------------------------- +# Internal constants +# --------------------------------------------------------------------------- + +# Daily churn probability from any active stage. +_DAILY_CHURN_RATE = 0.004 + +# Funnel stages that imply meaningful sales engagement → opportunity creation. +_SQL_OR_DEEPER = frozenset( + { + "sql", + "demo_scheduled", + "demo_completed", + "proposal_sent", + "negotiation", + "closed_won", + } +) + +# Stages where a sales rep is actively working the deal. +_SALES_ACTIVE_STAGES = frozenset( + { + "sal", + "sql", + "demo_scheduled", + "demo_completed", + "proposal_sent", + "negotiation", + } +) + +# Touch / session / activity catalogues. +_TOUCH_TYPES = ("email", "call", "linkedin_message", "content_download", "webinar") +_SESSION_TYPES = ("website", "pricing_page", "demo_page") +_ACTIVITY_TYPES = ("call", "email", "meeting", "demo") +_ACTIVITY_OUTCOMES = ( + "connected", + "no_answer", + "left_voicemail", + "meeting_set", + "demo_completed", +) + +# ACV range (lo, hi) in USD by account employee band. +_EMPLOYEE_ACV_RANGES: dict[str, tuple[int, int]] = { + "200-499": (15_000, 50_000), + "500-999": (30_000, 80_000), + "1000-1999": (50_000, 120_000), + "2000+": (80_000, 200_000), +} +_DEFAULT_ACV_RANGE: tuple[int, int] = (20_000, 60_000) + + +# --------------------------------------------------------------------------- +# Public output type +# --------------------------------------------------------------------------- + + +@dataclass +class SimulationResult: + """Fully simulated world output, ready for the rendering layer. + + All lists are in insertion order (chronological within each lead, + ascending lead-index across leads). + + Args: + leads: Updated :class:`~leadforge.schema.entities.LeadRow` list + with simulation outcomes filled in. + """ + + leads: list[LeadRow] + touches: list[TouchRow] = field(default_factory=list) + sessions: list[SessionRow] = field(default_factory=list) + sales_activities: list[SalesActivityRow] = field(default_factory=list) + opportunities: list[OpportunityRow] = field(default_factory=list) + customers: list[CustomerRow] = field(default_factory=list) + subscriptions: list[SubscriptionRow] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def simulate_world( + config: GenerationConfig, + population: PopulationResult, + world_graph: WorldGraph, +) -> SimulationResult: + """Run the discrete-time simulation for all leads in *population*. + + Iterates ``config.horizon_days`` daily steps. On each step, every + non-terminal lead is processed in population order: churn check, stage + advance or final-close check, then event emission (touches, sessions, + sales activities). After the main loop, post-conversion entities are + created. + + Args: + config: Fully resolved generation configuration (counts, seed, + horizon). + population: Output of + :func:`~leadforge.simulation.population.build_population`. + world_graph: The sampled hidden world graph; its ``motif_family`` + attribute selects the appropriate mechanism parameters. + + Returns: + A :class:`SimulationResult` with all nine relational tables + populated. + """ + root = RNGRoot(config.seed) + mech_rng = root.child("mechanisms") + transition_rng = root.child("simulation_transitions") + event_rng = root.child("simulation_events") + post_sim_rng = root.child("simulation_post_sim") + + mechanisms = assign_mechanisms(world_graph.motif_family, mech_rng) + stage_seq = StageSequence() + + # Build lookup indexes. + account_by_id = {a.account_id: a for a in population.accounts} + contact_by_id = {c.contact_id: c for c in population.contacts} + + # Merge latent traits per lead: account + contact + lead latents. + lat = population.latent_state + merged_latents: dict[str, dict[str, float]] = {} + for lead in population.leads: + contact = contact_by_id[lead.contact_id] + merged: dict[str, float] = {} + merged.update(lat.account_latents.get(lead.account_id, {})) + merged.update(lat.contact_latents.get(contact.contact_id, {})) + merged.update(lat.lead_latents.get(lead.lead_id, {})) + merged_latents[lead.lead_id] = merged + + # Precompute lead creation dates to avoid repeated ISO parsing inside + # the hot day × lead loop. + lead_dates: dict[str, date] = { + lead.lead_id: date.fromisoformat(lead.lead_created_at) for lead in population.leads + } + + # Initialise per-lead mutable state. + states: dict[str, LeadSimState] = { + lead.lead_id: LeadSimState( + lead_id=lead.lead_id, + current_stage=lead.current_stage, + # Track leads already at sql+ from population initialisation. + sql_day=0 if lead.current_stage in _SQL_OR_DEEPER else None, + ) + for lead in population.leads + } + + # Event row buffers and counters. + touches: list[TouchRow] = [] + sessions: list[SessionRow] = [] + sales_activities: list[SalesActivityRow] = [] + touch_ctr = 0 + session_ctr = 0 + activity_ctr = 0 + + # ------------------------------------------------------------------- + # Main simulation loop: t = 0 … horizon_days-1 + # ------------------------------------------------------------------- + for t in range(config.horizon_days): + for lead in population.leads: + state = states[lead.lead_id] + if state.is_terminal: + continue + + latents = merged_latents[lead.lead_id] + ctx = MechanismContext( + latents=latents, + stage=state.current_stage, + t=t, + extra={"dwell_days": state.dwell_days}, + ) + + # -- 1. Churn check (transition stream) ---------------------- + if transition_rng.random() < _DAILY_CHURN_RATE: + state.mark_churned(t) + continue # no events emitted on churn day + + # -- 2. Stage advance or conversion check (transition stream) - + if state.current_stage == "negotiation": + # Final close: ConversionHazard decides closed_won. + if mechanisms.conversion_hazard.sample(ctx, transition_rng): + state.mark_converted(t) + # Fall through to emit events on conversion day. + else: + # Funnel advancement: HazardTransition advances the stage. + if mechanisms.stage_transition.sample(ctx, transition_rng): + next_s = stage_seq.next_stage(state.current_stage) + if next_s is not None: + state.advance_stage(next_s, t) + + # -- 3. Touches (event stream) -------------------------------- + event_date = (lead_dates[lead.lead_id] + timedelta(days=t)).isoformat() + + n_touches = mechanisms.touch_intensity.sample(ctx, event_rng) + for _ in range(n_touches): + touch_ctr += 1 + touches.append( + TouchRow( + touch_id=make_id(ID_PREFIXES["touch"], touch_ctr), + lead_id=lead.lead_id, + touch_timestamp=event_date, + touch_type=event_rng.choice(_TOUCH_TYPES), + touch_channel=lead.first_touch_channel, + touch_direction="inbound" + if lead.first_touch_channel == "inbound_marketing" + else "outbound", + campaign_id=None, + ) + ) + + # -- 4. Sessions (≈30 % of touch-days, event stream) ---------- + if n_touches > 0 and event_rng.random() < 0.30: + session_ctr += 1 + at_demo_stage = state.current_stage in { + "demo_scheduled", + "demo_completed", + } + at_late_stage = state.current_stage in _SQL_OR_DEEPER + sessions.append( + SessionRow( + session_id=make_id(ID_PREFIXES["session"], session_ctr), + lead_id=lead.lead_id, + session_timestamp=event_date, + session_type=event_rng.choice(_SESSION_TYPES), + page_views=event_rng.randint(1, 10), + pricing_page_views=event_rng.randint(0, 2) if at_late_stage else 0, + demo_page_views=event_rng.randint(0, 2) if at_demo_stage else 0, + session_duration_seconds=event_rng.randint(60, 600), + ) + ) + + # -- 5. Sales activities (≈20 % of active-stage days) --------- + if state.current_stage in _SALES_ACTIVE_STAGES and event_rng.random() < 0.20: + activity_ctr += 1 + sales_activities.append( + SalesActivityRow( + activity_id=make_id(ID_PREFIXES["sales_activity"], activity_ctr), + lead_id=lead.lead_id, + rep_id=lead.owner_rep_id, + activity_timestamp=event_date, + activity_type=event_rng.choice(_ACTIVITY_TYPES), + activity_outcome=event_rng.choice(_ACTIVITY_OUTCOMES), + ) + ) + + # -- 6. Advance dwell counter --------------------------------- + state.dwell_days += 1 + + # ------------------------------------------------------------------- + # Post-simulation: build final entity rows + # ------------------------------------------------------------------- + updated_leads: list[LeadRow] = [] + opportunities: list[OpportunityRow] = [] + customers: list[CustomerRow] = [] + subscriptions: list[SubscriptionRow] = [] + opp_ctr = 0 + cust_ctr = 0 + sub_ctr = 0 + + for lead in population.leads: + state = states[lead.lead_id] + lead_date = lead_dates[lead.lead_id] + + is_sql = state.sql_day is not None or state.current_stage in _SQL_OR_DEEPER + conv_ts: str | None = None + if state.converted and state.conversion_day is not None: + conv_ts = (lead_date + timedelta(days=state.conversion_day)).isoformat() + + updated_leads.append( + LeadRow( + lead_id=lead.lead_id, + contact_id=lead.contact_id, + account_id=lead.account_id, + lead_created_at=lead.lead_created_at, + lead_source=lead.lead_source, + first_touch_channel=lead.first_touch_channel, + current_stage=state.current_stage, + owner_rep_id=lead.owner_rep_id, + is_mql=True, # all leads start at mql + is_sql=is_sql, + converted_within_90_days=state.converted, + conversion_timestamp=conv_ts, + ) + ) + + # Opportunity: created when lead first reached sql or deeper. + if is_sql: + opp_ctr += 1 + opp_id = make_id(ID_PREFIXES["opportunity"], opp_ctr) + opp_day = state.sql_day if state.sql_day is not None else 0 + opp_created_at = (lead_date + timedelta(days=opp_day)).isoformat() + + close_outcome: str | None = None + closed_at: str | None = None + if state.converted: + close_outcome = "closed_won" + closed_at = conv_ts + elif state.churned and state.churn_day is not None: + close_outcome = "closed_lost" + closed_at = (lead_date + timedelta(days=state.churn_day)).isoformat() + + acct = account_by_id.get(lead.account_id) + emp_band = acct.employee_band if acct else "" + acv = _sample_acv(post_sim_rng, emp_band) + + opp = OpportunityRow( + opportunity_id=opp_id, + lead_id=lead.lead_id, + created_at=opp_created_at, + stage=state.current_stage, + estimated_acv=acv, + close_outcome=close_outcome, + closed_at=closed_at, + ) + opportunities.append(opp) + + # Customer + subscription: converted leads only. + # state.conversion_day is always set when state.converted is True, + # so conv_ts_str is computed as a plain str (not str | None) here. + if state.converted and state.conversion_day is not None: + conv_ts_str = (lead_date + timedelta(days=state.conversion_day)).isoformat() + cust_ctr += 1 + cust_id = make_id(ID_PREFIXES["customer"], cust_ctr) + customers.append( + CustomerRow( + customer_id=cust_id, + opportunity_id=opp_id, + account_id=lead.account_id, + customer_start_at=conv_ts_str, + ) + ) + + sub_ctr += 1 + sub_id = make_id(ID_PREFIXES["subscription"], sub_ctr) + subscriptions.append( + SubscriptionRow( + subscription_id=sub_id, + customer_id=cust_id, + plan_name=_plan_from_acv(acv), + subscription_start_at=conv_ts_str, + subscription_status="active", + ) + ) + + return SimulationResult( + leads=updated_leads, + touches=touches, + sessions=sessions, + sales_activities=sales_activities, + opportunities=opportunities, + customers=customers, + subscriptions=subscriptions, + ) + + +# --------------------------------------------------------------------------- +# Private helpers +# --------------------------------------------------------------------------- + + +def _sample_acv(rng: random.Random, employee_band: str) -> int: + """Draw a random ACV (USD) appropriate for *employee_band*.""" + lo, hi = _EMPLOYEE_ACV_RANGES.get(employee_band, _DEFAULT_ACV_RANGE) + return rng.randint(lo, hi) + + +def _plan_from_acv(acv: int) -> str: + """Map ACV to a subscription plan tier name.""" + if acv < 30_000: + return "starter" + if acv < 80_000: + return "growth" + return "enterprise" diff --git a/leadforge/simulation/state.py b/leadforge/simulation/state.py new file mode 100644 index 0000000..74e73e3 --- /dev/null +++ b/leadforge/simulation/state.py @@ -0,0 +1,92 @@ +"""Per-lead mutable state for the discrete-time simulation engine. + +:class:`LeadSimState` is the only mutable object touched by +:func:`~leadforge.simulation.engine.simulate_world`. After the simulation +loop completes, the final state of each instance is used to populate the +:class:`~leadforge.schema.entities.LeadRow` and any post-conversion entity +rows (opportunity, customer, subscription). +""" + +from __future__ import annotations + +from dataclasses import dataclass + +# Funnel stages that are at or past the SQL qualification gate. +# Used by advance_stage() to record sql_day regardless of the exact +# stage name, so opportunity timestamps are correct even if the stage +# sequence evolves in future milestones. +_SQL_OR_DEEPER: frozenset[str] = frozenset( + { + "sql", + "demo_scheduled", + "demo_completed", + "proposal_sent", + "negotiation", + "closed_won", + } +) + + +@dataclass +class LeadSimState: + """Mutable simulation state for one lead across the 90-day horizon. + + The engine updates this object on each daily step. It is never written + to disk directly — the final state is used to populate relational rows. + + Args: + lead_id: Stable opaque lead identifier. + current_stage: Funnel stage at initialisation (typically ``"mql"``). + """ + + lead_id: str + current_stage: str + dwell_days: int = 0 + """Days spent in the current stage; reset to 0 on each stage advance.""" + + converted: bool = False + conversion_day: int | None = None + """0-based day index within the simulation horizon when conversion fired.""" + + churned: bool = False + churn_day: int | None = None + """0-based day index when the lead was marked ``closed_lost``.""" + + sql_day: int | None = None + """First day the lead entered ``sql`` or any deeper funnel stage. + Used to anchor opportunity creation timestamps.""" + + @property + def is_terminal(self) -> bool: + """``True`` once the lead has converted or churned.""" + return self.converted or self.churned + + def advance_stage(self, new_stage: str, day: int) -> None: + """Transition to *new_stage* on *day*, resetting the dwell counter. + + Records the first time the lead enters ``sql`` or any deeper stage + (``demo_scheduled``, ``demo_completed``, ``proposal_sent``, + ``negotiation``, ``closed_won``) so the engine can create an + opportunity row at the correct timestamp regardless of which + qualifying stage is reached first. + + Args: + new_stage: The funnel stage to transition into. + day: Current 0-based day index in the simulation horizon. + """ + self.current_stage = new_stage + self.dwell_days = 0 + if new_stage in _SQL_OR_DEEPER and self.sql_day is None: + self.sql_day = day + + def mark_converted(self, day: int) -> None: + """Record a ``closed_won`` conversion event on *day*.""" + self.converted = True + self.conversion_day = day + self.current_stage = "closed_won" + + def mark_churned(self, day: int) -> None: + """Record a ``closed_lost`` churn event on *day*.""" + self.churned = True + self.churn_day = day + self.current_stage = "closed_lost" diff --git a/tests/simulation/test_engine.py b/tests/simulation/test_engine.py new file mode 100644 index 0000000..cc26231 --- /dev/null +++ b/tests/simulation/test_engine.py @@ -0,0 +1,384 @@ +"""Tests for simulation/engine.py and simulation/state.py.""" + +from __future__ import annotations + +import pytest + +from leadforge.core.models import GenerationConfig +from leadforge.schema.entities import ( + CustomerRow, + LeadRow, + OpportunityRow, + SalesActivityRow, + SessionRow, + SubscriptionRow, + TouchRow, +) +from leadforge.simulation.engine import SimulationResult, _plan_from_acv, simulate_world +from leadforge.simulation.population import build_population +from leadforge.simulation.state import LeadSimState +from leadforge.structure.sampler import sample_hidden_graph + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_config(seed: int = 42, n_leads: int = 50) -> GenerationConfig: + """Return a small GenerationConfig suitable for unit tests.""" + return GenerationConfig(seed=seed, n_accounts=20, n_contacts=60, n_leads=n_leads) + + +def _make_narrative(): + """Return the default recipe narrative via Generator.""" + from leadforge.api.generator import Generator + + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=42) + assert gen.world_spec.narrative is not None + return gen.world_spec.narrative + + +def _run_sim(seed: int = 42, n_leads: int = 50, motif: str | None = None) -> SimulationResult: + config = _make_config(seed=seed, n_leads=n_leads) + narrative = _make_narrative() + graph = sample_hidden_graph(seed, motif_family_name=motif) + pop = build_population(config, narrative, graph) + return simulate_world(config, pop, graph) + + +# --------------------------------------------------------------------------- +# LeadSimState unit tests +# --------------------------------------------------------------------------- + + +class TestLeadSimState: + def test_initial_not_terminal(self) -> None: + s = LeadSimState("lead_000001", "mql") + assert not s.is_terminal + + def test_mark_converted(self) -> None: + s = LeadSimState("lead_000001", "mql") + s.mark_converted(10) + assert s.converted + assert s.conversion_day == 10 + assert s.current_stage == "closed_won" + assert s.is_terminal + + def test_mark_churned(self) -> None: + s = LeadSimState("lead_000001", "mql") + s.mark_churned(5) + assert s.churned + assert s.churn_day == 5 + assert s.current_stage == "closed_lost" + assert s.is_terminal + + def test_advance_stage_resets_dwell(self) -> None: + s = LeadSimState("lead_000001", "mql") + s.dwell_days = 7 + s.advance_stage("sal", 7) + assert s.current_stage == "sal" + assert s.dwell_days == 0 + + def test_advance_stage_records_sql_day(self) -> None: + s = LeadSimState("lead_000001", "sal") + s.advance_stage("sql", 12) + assert s.sql_day == 12 + + def test_advance_stage_sql_day_not_overwritten(self) -> None: + s = LeadSimState("lead_000001", "sql") + s.sql_day = 5 + # Advancing to a deeper stage should not overwrite sql_day. + s.advance_stage("demo_scheduled", 8) + assert s.sql_day == 5 + + +# --------------------------------------------------------------------------- +# SimulationResult structure +# --------------------------------------------------------------------------- + + +class TestSimulationResultTypes: + def test_result_contains_correct_types(self) -> None: + result = _run_sim() + assert all(isinstance(r, LeadRow) for r in result.leads) + assert all(isinstance(r, TouchRow) for r in result.touches) + assert all(isinstance(r, SessionRow) for r in result.sessions) + assert all(isinstance(r, SalesActivityRow) for r in result.sales_activities) + assert all(isinstance(r, OpportunityRow) for r in result.opportunities) + assert all(isinstance(r, CustomerRow) for r in result.customers) + assert all(isinstance(r, SubscriptionRow) for r in result.subscriptions) + + def test_lead_count_preserved(self) -> None: + result = _run_sim(n_leads=50) + assert len(result.leads) == 50 + + def test_touches_non_empty(self) -> None: + # With 50 leads over 90 days, some touches must be emitted. + result = _run_sim(n_leads=50) + assert len(result.touches) > 0 + + def test_sessions_non_empty(self) -> None: + result = _run_sim(n_leads=50) + assert len(result.sessions) > 0 + + +# --------------------------------------------------------------------------- +# Determinism +# --------------------------------------------------------------------------- + + +class TestDeterminism: + def test_same_seed_same_result(self) -> None: + r1 = _run_sim(seed=7) + r2 = _run_sim(seed=7) + assert len(r1.leads) == len(r2.leads) + assert len(r1.touches) == len(r2.touches) + assert len(r1.sessions) == len(r2.sessions) + assert len(r1.opportunities) == len(r2.opportunities) + assert [row.converted_within_90_days for row in r1.leads] == [ + row.converted_within_90_days for row in r2.leads + ] + + def test_different_seeds_differ(self) -> None: + r1 = _run_sim(seed=1, n_leads=200) + r2 = _run_sim(seed=2, n_leads=200) + sig1 = [ + (row.lead_id, row.converted_within_90_days, row.current_stage, row.conversion_timestamp) + for row in r1.leads + ] + sig2 = [ + (row.lead_id, row.converted_within_90_days, row.current_stage, row.conversion_timestamp) + for row in r2.leads + ] + assert ( + sig1 != sig2 + or len(r1.touches) != len(r2.touches) + or len(r1.sessions) != len(r2.sessions) + or len(r1.opportunities) != len(r2.opportunities) + ) + + +# --------------------------------------------------------------------------- +# Lead outcomes +# --------------------------------------------------------------------------- + + +class TestLeadOutcomes: + def test_converted_within_90_days_is_bool(self) -> None: + result = _run_sim() + assert all(isinstance(row.converted_within_90_days, bool) for row in result.leads) + + def test_all_leads_are_mql(self) -> None: + result = _run_sim() + assert all(row.is_mql for row in result.leads) + + def test_converted_leads_have_timestamp(self) -> None: + result = _run_sim(n_leads=100) + for lead in result.leads: + if lead.converted_within_90_days: + assert lead.conversion_timestamp is not None + else: + assert lead.conversion_timestamp is None + + def test_converted_leads_at_closed_won(self) -> None: + result = _run_sim(n_leads=100) + for lead in result.leads: + if lead.converted_within_90_days: + assert lead.current_stage == "closed_won" + + def test_some_leads_convert(self) -> None: + result = _run_sim(n_leads=200) + n_conv = sum(row.converted_within_90_days for row in result.leads) + assert n_conv > 0, "Expected at least one conversion in 200-lead sim" + + def test_some_leads_do_not_convert(self) -> None: + result = _run_sim(n_leads=200) + n_not_conv = sum(not row.converted_within_90_days for row in result.leads) + assert n_not_conv > 0, "Expected at least one non-conversion in 200-lead sim" + + def test_sql_leads_are_flagged(self) -> None: + result = _run_sim(n_leads=100) + for lead in result.leads: + if lead.current_stage in { + "sql", + "demo_scheduled", + "demo_completed", + "proposal_sent", + "negotiation", + "closed_won", + }: + assert lead.is_sql + + def test_converted_leads_also_sql(self) -> None: + result = _run_sim(n_leads=100) + for lead in result.leads: + if lead.converted_within_90_days: + assert lead.is_sql + + +# --------------------------------------------------------------------------- +# Opportunities +# --------------------------------------------------------------------------- + + +class TestOpportunities: + def test_sql_leads_have_opportunity(self) -> None: + result = _run_sim(n_leads=100) + opp_lead_ids = {o.lead_id for o in result.opportunities} + for lead in result.leads: + if lead.is_sql: + assert lead.lead_id in opp_lead_ids + + def test_non_sql_leads_no_opportunity(self) -> None: + result = _run_sim(n_leads=100) + opp_lead_ids = {o.lead_id for o in result.opportunities} + for lead in result.leads: + if not lead.is_sql: + assert lead.lead_id not in opp_lead_ids + + def test_opportunity_acv_positive(self) -> None: + result = _run_sim(n_leads=100) + assert all(o.estimated_acv > 0 for o in result.opportunities) + + def test_converted_opportunity_has_close_outcome(self) -> None: + result = _run_sim(n_leads=100) + converted_ids = {row.lead_id for row in result.leads if row.converted_within_90_days} + for opp in result.opportunities: + if opp.lead_id in converted_ids: + assert opp.close_outcome == "closed_won" + assert opp.closed_at is not None + + def test_opportunity_ids_unique(self) -> None: + result = _run_sim(n_leads=100) + ids = [o.opportunity_id for o in result.opportunities] + assert len(ids) == len(set(ids)) + + +# --------------------------------------------------------------------------- +# Customers and subscriptions +# --------------------------------------------------------------------------- + + +class TestCustomersAndSubscriptions: + def test_customer_per_conversion(self) -> None: + result = _run_sim(n_leads=100) + n_conv = sum(row.converted_within_90_days for row in result.leads) + assert len(result.customers) == n_conv + + def test_subscription_per_customer(self) -> None: + result = _run_sim(n_leads=100) + assert len(result.subscriptions) == len(result.customers) + + def test_customer_account_fk(self) -> None: + config = _make_config(n_leads=50) + narrative = _make_narrative() + graph = sample_hidden_graph(42) + pop = build_population(config, narrative, graph) + result = simulate_world(config, pop, graph) + acct_ids = {a.account_id for a in pop.accounts} + for cust in result.customers: + assert cust.account_id in acct_ids + + def test_subscription_status_active(self) -> None: + result = _run_sim(n_leads=100) + assert all(s.subscription_status == "active" for s in result.subscriptions) + + def test_subscription_plan_valid(self) -> None: + result = _run_sim(n_leads=100) + valid_plans = {"starter", "growth", "enterprise"} + assert all(s.plan_name in valid_plans for s in result.subscriptions) + + +# --------------------------------------------------------------------------- +# Touch / session / activity integrity +# --------------------------------------------------------------------------- + + +class TestEventIntegrity: + def test_touch_lead_fk(self) -> None: + config = _make_config(n_leads=50) + narrative = _make_narrative() + graph = sample_hidden_graph(42) + pop = build_population(config, narrative, graph) + result = simulate_world(config, pop, graph) + lead_ids = {row.lead_id for row in result.leads} + for touch in result.touches: + assert touch.lead_id in lead_ids + + def test_session_lead_fk(self) -> None: + config = _make_config(n_leads=50) + narrative = _make_narrative() + graph = sample_hidden_graph(42) + pop = build_population(config, narrative, graph) + result = simulate_world(config, pop, graph) + lead_ids = {row.lead_id for row in result.leads} + for sess in result.sessions: + assert sess.lead_id in lead_ids + + def test_activity_rep_id_non_empty(self) -> None: + result = _run_sim(n_leads=100) + for act in result.sales_activities: + assert act.rep_id + + def test_touch_ids_unique(self) -> None: + result = _run_sim(n_leads=50) + ids = [t.touch_id for t in result.touches] + assert len(ids) == len(set(ids)) + + def test_session_duration_positive(self) -> None: + result = _run_sim(n_leads=50) + assert all(s.session_duration_seconds > 0 for s in result.sessions) + + def test_session_page_views_positive(self) -> None: + result = _run_sim(n_leads=50) + assert all(s.page_views > 0 for s in result.sessions) + + +# --------------------------------------------------------------------------- +# Motif family variation +# --------------------------------------------------------------------------- + + +class TestMotifVariation: + @pytest.mark.parametrize( + "motif", + [ + "fit_dominant", + "intent_dominant", + "sales_execution_sensitive", + "demo_trial_mediated", + "buying_committee_friction", + ], + ) + def test_all_motifs_complete_without_error(self, motif: str) -> None: + result = _run_sim(n_leads=30, motif=motif) + assert len(result.leads) == 30 + # fit_dominant should have higher conversion than buying_committee_friction + # — don't assert exact ordering since it's stochastic at n=30. + + def test_fit_dominant_higher_conversion_than_friction(self) -> None: + # Law of large numbers: fit_dominant should convert more than friction. + fit = _run_sim(seed=99, n_leads=300, motif="fit_dominant") + friction = _run_sim(seed=99, n_leads=300, motif="buying_committee_friction") + fit_rate = sum(row.converted_within_90_days for row in fit.leads) / 300 + fric_rate = sum(row.converted_within_90_days for row in friction.leads) / 300 + assert fit_rate > fric_rate + + +# --------------------------------------------------------------------------- +# _plan_from_acv helper +# --------------------------------------------------------------------------- + + +class TestPlanFromAcv: + def test_starter(self) -> None: + assert _plan_from_acv(10_000) == "starter" + assert _plan_from_acv(29_999) == "starter" + + def test_growth(self) -> None: + assert _plan_from_acv(30_000) == "growth" + assert _plan_from_acv(79_999) == "growth" + + def test_enterprise(self) -> None: + assert _plan_from_acv(80_000) == "enterprise" + assert _plan_from_acv(200_000) == "enterprise"