From 3d3e0b502066aa61b3ffa2472a5988294a62cd8a Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 15 Jun 2026 18:10:55 +0300 Subject: [PATCH 1/2] refactor: shared bundle-writing orchestrator [LTV-Pn.4d] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final sub-PR of the split LTV-Pn.4 (and the last carried M2 cleanup, #1). Both schemes' write_bundle performed the same on-disk sequence — mkdir → relational tables → task splits → dataset card → feature dictionary → exposure metadata → manifest — differing only in the *content*. Lift that sequence into one place. - New render/bundle.py: write_bundle_envelope(bundle, root, *, relational, tasks, dataset_card, feature_specs, generation_scheme, redacted, motif_family, relational_snapshot_safe, structural_redactions, extra_fields, generation_timestamp) + a TaskExport(manifest, frame) record. It owns the I/O and the file-ordering the manifest's hashing depends on; it contains no scheme-specific logic. - LeadScoringScheme.write_bundle and LifecycleScheme.write_bundle now compute only their scheme-specific content (which relational frames + snapshot-safe projection, which task exports, which card, which visible features, which manifest params) and delegate to the envelope. - The dataset card needs table row counts; each scheme computes {name: len(df)} on its final frames (identical to write_relational_tables' counts — redaction drops columns, not rows) and renders the card before delegating. Byte-identity: all FOUR bundles — lead_scoring + lifecycle, each in research_instructor + student_public — verified byte-identical against a pre-refactor SHA-256 reference of every file. The orchestration moved; no output changed. Tests: new tests/render/test_bundle_envelope.py pins the envelope contract (all artefacts written, manifest passthrough fields, multiple task dirs, no metadata in snapshot-safe mode) on a minimal bundle. Full suite 1877 passed / 51 skipped; ruff + mypy clean. Co-Authored-By: Claude Opus 4.8 --- .agent-plan.md | 8 +- docs/ltv/roadmap.md | 23 ++-- leadforge/render/bundle.py | 116 +++++++++++++++++++++ leadforge/schemes/lead_scoring/__init__.py | 81 +++++--------- leadforge/schemes/lifecycle/__init__.py | 94 ++++++----------- tests/render/test_bundle_envelope.py | 106 +++++++++++++++++++ 6 files changed, 300 insertions(+), 128 deletions(-) create mode 100644 leadforge/render/bundle.py create mode 100644 tests/render/test_bundle_envelope.py diff --git a/.agent-plan.md b/.agent-plan.md index 004fff1..cdd4489 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -89,8 +89,12 @@ metadata; difficulty_params threaded; student_public refused until 4c) opened as **#126** (merged). `LTV-Pn.4c` (student_public snapshot-safety — public relational projection: event tables ≤ observation_date, subscriptions stateful/terminal columns dropped; manifest flags; CLAUDE.md clause; -lead-scoring byte-identical) opened as **#127**. Next: `Pn.4d` (shared bundle -orchestrator), `LTV-Po` (recipe; also recipe-driven difficulty resolution). +lead-scoring byte-identical) opened as **#127** (merged). `LTV-Pn.4d` (shared +bundle orchestrator — `render/bundle.py` `write_bundle_envelope`; both schemes +delegate bundle I/O; carried cleanup #1 discharged; all four bundles +byte-identical) opened as **#128** — **completes LTV-Pn.4**. Next: `LTV-Po` +(the `b2b_saas_ltv_v1` recipe assets + end-to-end `Generator.from_recipe(...)`; +also recipe-driven difficulty resolution + the narrative-consumption decision). Note: `validate_bundle` is lead-scoring-coupled — scheme-aware validation is `LTV-Pp`. diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index 4307904..5b9c2e5 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -46,7 +46,7 @@ protocol + registry, with the package physically reorganized into | `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) | | `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) | | `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) | -| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b), #127 (Pn.4c) | +| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b), #127 (Pn.4c), #128 (Pn.4d) | | `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | | `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | @@ -368,11 +368,15 @@ methods, then public-safety, then the carried orchestrator cleanup: public early task) — flag for `LTV-Po`/design-doc update; tension noted against D8's "first-class early-pLTV". - Labels: `type: feature`, `layer: exposure`, `layer: render`, `layer: docs` -- [ ] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator`. With both - schemes' `write_bundle` in hand, lift the shared orchestrator (mkdir → - relational → tasks → card → dict → exposure → manifest) with scheme render - hooks out of the two implementations (carried cleanup #1). Both bundles - byte-identical. +- [x] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator` (**PR #128**). + New `render/bundle.py` `write_bundle_envelope` runs the shared on-disk + sequence (mkdir → relational → task splits → card → dict → exposure → + manifest) given each scheme's already-computed content (final dfs, a + `TaskExport(manifest, frame)` list, rendered card, visible features, manifest + params). Both schemes' `write_bundle` now compute only their scheme-specific + content and delegate the I/O; carried cleanup #1 discharged. **All four + bundles (lead_scoring + lifecycle × instructor + public) verified + byte-identical** via the full-bundle SHA-256 harness. - Labels: `type: refactor`, `layer: render`, `layer: api` - [ ] **`LTV-Po`** — `feat(recipes): b2b_saas_ltv_v1 recipe assets`. The three recipe YAMLs (`scheme: lifecycle`); register in the recipe registry; @@ -427,10 +431,9 @@ The peer-schemes reorg deliberately defers a few cleanups to keep each M2 PR byte-identical and reviewable. They are tracked here and discharged in **`LTV-Pn.1`/`LTV-Pn.2`** (M6), where the manifest/exposure generalization makes them clean: -1. **Shared render orchestration** — `LTV-Pe` left each scheme owning its full - `write_bundle`; only `write_relational_tables` is shared. A shared bundle - orchestrator with scheme render hooks lands in **`LTV-Pn.4`**, once the - lifecycle `write_bundle` exists to reveal the real shared shape. +1. ~~**Shared render orchestration**~~ — **Done** (`LTV-Pn.4d`): + `render/bundle.py` `write_bundle_envelope` is the shared orchestrator; both + schemes delegate their bundle I/O to it. 2. ~~**`build_manifest` / `apply_exposure` are lead-scoring-coupled**~~ — **Done** (`build_manifest` in `LTV-Pn.1`; `apply_exposure` in `LTV-Pn.2` via the `write_metadata` scheme hook). diff --git a/leadforge/render/bundle.py b/leadforge/render/bundle.py new file mode 100644 index 0000000..a9bffaf --- /dev/null +++ b/leadforge/render/bundle.py @@ -0,0 +1,116 @@ +"""Shared bundle-writing envelope for all generation schemes (LTV-Pn.4d). + +Every scheme's ``write_bundle`` performs the same on-disk sequence — create the +root, write the relational tables, split each task, write the dataset card and +feature dictionary, apply exposure-mode metadata, and build + write the +manifest. Only the *content* differs (which tables, which tasks, which card), +and each scheme computes that content itself. + +:func:`write_bundle_envelope` is that shared sequence. A scheme computes its +final, exposure-projected relational frames, its per-task ``(manifest, frame)`` +exports, its rendered dataset card, and its visible feature catalog, then hands +them here. This keeps the I/O orchestration — and the file-ordering that the +manifest's hashing depends on — in one place, with no scheme-specific logic. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from leadforge.exposure.modes import apply_exposure +from leadforge.render.manifests import build_manifest, write_manifest +from leadforge.render.relational_io import write_relational_tables +from leadforge.render.tasks import write_task_splits +from leadforge.schema.dictionaries import write_feature_dictionary + +if TYPE_CHECKING: + from collections.abc import Collection, Sequence + from pathlib import Path + + import pandas as pd + + from leadforge.core.models import WorldBundle + from leadforge.schema.features import FeatureSpec + from leadforge.schema.tasks import TaskManifest + +__all__ = ["TaskExport", "write_bundle_envelope"] + + +@dataclass(frozen=True) +class TaskExport: + """One task's manifest plus the (already exposure-projected) snapshot frame + to split into train/valid/test.""" + + manifest: TaskManifest + frame: pd.DataFrame + + +def write_bundle_envelope( + bundle: WorldBundle, + root: Path, + *, + relational: dict[str, pd.DataFrame], + tasks: Sequence[TaskExport], + dataset_card: str, + feature_specs: Sequence[FeatureSpec], + generation_scheme: str, + redacted: Collection[str] = frozenset(), + motif_family: str | None = None, + relational_snapshot_safe: bool = False, + structural_redactions: dict[str, Any] | None = None, + extra_fields: dict[str, Any] | None = None, + generation_timestamp: str | None = None, +) -> None: + """Write *bundle* to *root* given the scheme's already-computed content. + + Steps (order fixed — the manifest hashes the written files, so they must + exist first): relational tables → task splits → dataset card → feature + dictionary → exposure metadata → manifest. + + Args: + bundle: The fully populated bundle (its ``spec.config`` supplies seed, + exposure mode, and the manifest's provenance fields). + root: Destination directory (created if absent). + relational: ``{table_name: DataFrame}`` already projected for the + exposure mode (snapshot-safe where required). + tasks: One :class:`TaskExport` per task directory to write. + dataset_card: Rendered ``dataset_card.md`` contents. + feature_specs: The visible feature catalog for ``feature_dictionary.csv``. + generation_scheme: Producing scheme name (recorded in the manifest). + redacted: Columns to drop from every written table/split (lead-scoring + feature redactions; empty for schemes without column redaction). + motif_family / relational_snapshot_safe / structural_redactions / + extra_fields: Passed through to :func:`build_manifest`. + generation_timestamp: ISO timestamp; defaults to now in the manifest. + """ + config = bundle.spec.config + root.mkdir(parents=True, exist_ok=True) + + table_row_counts = write_relational_tables(relational, root / "tables", redacted=redacted) + + task_row_counts: dict[str, dict[str, int]] = {} + for export in tasks: + task_row_counts[export.manifest.task_id] = write_task_splits( + export.frame, root / "tasks", seed=config.seed, task=export.manifest + ) + + (root / "dataset_card.md").write_text(dataset_card) + write_feature_dictionary(root / "feature_dictionary.csv", features=tuple(feature_specs)) + + apply_exposure(bundle, root, config.exposure_mode) + + manifest = build_manifest( + config=config, + generation_scheme=generation_scheme, + motif_family=motif_family, + table_row_counts=table_row_counts, + task_row_counts=task_row_counts, + bundle_root=root, + generation_timestamp=generation_timestamp, + redacted_columns=sorted(redacted), + relational_snapshot_safe=relational_snapshot_safe, + structural_redactions=structural_redactions, + extra_fields=extra_fields, + ) + write_manifest(manifest, root) diff --git a/leadforge/schemes/lead_scoring/__init__.py b/leadforge/schemes/lead_scoring/__init__.py index 03fdfca..870abad 100644 --- a/leadforge/schemes/lead_scoring/__init__.py +++ b/leadforge/schemes/lead_scoring/__init__.py @@ -164,11 +164,8 @@ def write_bundle( from pathlib import Path from leadforge.exposure.filters import get_filter - from leadforge.exposure.modes import apply_exposure from leadforge.narrative.dataset_card import render_dataset_card - from leadforge.render.manifests import build_manifest, write_manifest - from leadforge.render.relational_io import write_relational_tables - from leadforge.schema.dictionaries import write_feature_dictionary + from leadforge.render.bundle import TaskExport, write_bundle_envelope from leadforge.schemes.lead_scoring.artifacts import LeadScoringArtifacts from leadforge.schemes.lead_scoring.features import ( LEAD_SNAPSHOT_FEATURES, @@ -179,7 +176,6 @@ def write_bundle( to_dataframes_snapshot_safe, ) from leadforge.schemes.lead_scoring.render.snapshots import build_snapshot - from leadforge.schemes.lead_scoring.render.tasks import write_task_splits from leadforge.schemes.lead_scoring.tasks import task_manifest_for_config artifacts = bundle.artifacts @@ -189,30 +185,18 @@ def write_bundle( "Call Generator.generate() first." ) - root = Path(path) - root.mkdir(parents=True, exist_ok=True) - config = bundle.spec.config result = artifacts.simulation_result population = artifacts.population world_graph = artifacts.world_graph - # The redaction set comes from the canonical feature spec — the same - # source of truth the validator uses. It is applied uniformly to - # every published parquet file (relational tables AND task splits) so - # users doing feature engineering off the raw tables (per the - # README's "Option 3") cannot trivially reintroduce a redacted - # column by joining ``tables/leads.parquet`` to their feature set. + # The redaction set comes from the canonical feature spec — applied to + # every published parquet (relational tables AND task splits) so a user + # cannot reintroduce a redacted column by joining the raw tables. redacted = redacted_columns_for(config.exposure_mode) bundle_filter = get_filter(config.exposure_mode) - # ------------------------------------------------------------------ - # 1. Relational tables → tables/ - # - # The lead-scoring *shape* (9 tables; snapshot-safe projection for - # student_public) is decided here; the redaction-drop + parquet-write + - # row-count loop is the shared, scheme-agnostic envelope step. - # ------------------------------------------------------------------ + # Relational shape (9 tables; snapshot-safe projection for public). dfs = to_dataframes(result, population) if bundle_filter.relational_snapshot_safe: if config.snapshot_day is None: @@ -224,11 +208,11 @@ def write_bundle( "pass it explicitly." ) dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day) - table_row_counts = write_relational_tables(dfs, root / "tables", redacted=redacted) + # Row counts for the dataset card == write_relational_tables' counts + # (redaction drops columns, not rows). + table_counts = {name: len(df) for name, df in dfs.items()} - # ------------------------------------------------------------------ - # 2. Snapshot + task splits → tasks/ - # ------------------------------------------------------------------ + # Lead snapshot + single task, redacted to the exposure mode. snapshot = build_snapshot( result, population, @@ -242,43 +226,28 @@ def write_bundle( if drop_cols: snapshot = snapshot.drop(columns=drop_cols) visible_features = tuple(f for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted) - task = task_manifest_for_config(config.primary_task, config.label_window_days) - task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) - - # ------------------------------------------------------------------ - # 3. Dataset card and feature dictionary - # ------------------------------------------------------------------ - (root / "dataset_card.md").write_text( - render_dataset_card( - bundle.spec, - task_manifest=task, - table_counts=table_row_counts, - features=visible_features, - ) + + dataset_card = render_dataset_card( + bundle.spec, + task_manifest=task, + table_counts=table_counts, + features=visible_features, ) - write_feature_dictionary(root / "feature_dictionary.csv", features=visible_features) - - # ------------------------------------------------------------------ - # 4. Exposure metadata (research_instructor only) - # ------------------------------------------------------------------ - apply_exposure(bundle, root, config.exposure_mode) - - # ------------------------------------------------------------------ - # 5. Manifest - # ------------------------------------------------------------------ - manifest = build_manifest( - config=config, + + write_bundle_envelope( + bundle, + Path(path), + relational=dfs, + tasks=[TaskExport(manifest=task, frame=snapshot)], + dataset_card=dataset_card, + feature_specs=visible_features, generation_scheme=self.name, + redacted=redacted, motif_family=world_graph.motif_family, - table_row_counts=table_row_counts, - task_row_counts={task.task_id: task_row_counts}, - bundle_root=root, - generation_timestamp=generation_timestamp, - redacted_columns=sorted(redacted), relational_snapshot_safe=bundle_filter.relational_snapshot_safe, + generation_timestamp=generation_timestamp, ) - write_manifest(manifest, root) def write_metadata(self, bundle: WorldBundle, meta_dir: Path) -> None: """Write the lead-scoring hidden-truth files into *meta_dir*. diff --git a/leadforge/schemes/lifecycle/__init__.py b/leadforge/schemes/lifecycle/__init__.py index 31471bb..1e091b0 100644 --- a/leadforge/schemes/lifecycle/__init__.py +++ b/leadforge/schemes/lifecycle/__init__.py @@ -146,11 +146,7 @@ def write_bundle( from pathlib import Path from leadforge.exposure.filters import get_filter - from leadforge.exposure.modes import apply_exposure - from leadforge.render.manifests import build_manifest, write_manifest - from leadforge.render.relational_io import write_relational_tables - from leadforge.render.tasks import write_task_splits - from leadforge.schema.dictionaries import write_feature_dictionary + from leadforge.render.bundle import TaskExport, write_bundle_envelope from leadforge.schemes.lifecycle.artifacts import LifecycleArtifacts from leadforge.schemes.lifecycle.features import CUSTOMER_SNAPSHOT_FEATURES from leadforge.schemes.lifecycle.render.dataset_card import render_lifecycle_dataset_card @@ -178,16 +174,12 @@ def write_bundle( ) config = bundle.spec.config bundle_filter = get_filter(config.exposure_mode) - population = artifacts.population sim = artifacts.simulation_result - root = Path(path) - root.mkdir(parents=True, exist_ok=True) - # 1. Relational tables → tables/ - # student_public is projected snapshot-safe (event tables filtered to - # <= observation_date; subscriptions' stateful/terminal columns - # dropped). research_instructor keeps the full-horizon shape. + # Relational shape. student_public is projected snapshot-safe (event + # tables filtered to <= observation_date; subscriptions' stateful / + # terminal columns dropped); research_instructor keeps full horizon. dfs = to_dataframes(sim, population) structural_redactions: dict[str, object] | None = None if bundle_filter.relational_snapshot_safe: @@ -196,20 +188,15 @@ def write_bundle( "columns": {"subscriptions": sorted(LIFECYCLE_BANNED_SUBSCRIPTION_COLUMNS)}, "omitted_tables": [], } - table_row_counts = write_relational_tables(dfs, root / "tables") - - # 2. Regime snapshots → task directories. - # difficulty_params (None until LTV-Po resolves it) drives distortions. - # - # The early-pLTV (tenure-anchored) family is OMITTED from snapshot-safe - # public bundles: its forward window (start + early_tenure_weeks + Nd) - # precedes the relational cutoff (observation_date), so its targets are - # reconstructible by joining the public event tables (invoices between - # the early cutoff and observation_date *are* the early target window). - # One observation_date-anchored relational export cannot serve both - # regimes; the early family stays instructor-only. The calendar family - # is safe (its targets fall after observation_date, absent from the - # public relational tables). + table_counts = {name: len(df) for name, df in dfs.items()} + + # Regime snapshots → tasks. The early-pLTV (tenure-anchored) family is + # OMITTED from snapshot-safe public bundles: its forward window precedes + # the relational cutoff (observation_date), so its targets would be + # reconstructible by joining the public event tables. One + # observation_date-anchored relational export cannot serve both regimes; + # the early family stays instructor-only. The calendar family is safe + # (its targets fall after observation_date). snapshots = { CALENDAR_REGIME: build_customer_snapshot( population, sim, difficulty_params=config.difficulty_params, seed=config.seed @@ -223,48 +210,37 @@ def write_bundle( difficulty_params=config.difficulty_params, seed=config.seed, ) - # Each task is a standalone single-target split: drop every OTHER - # target column so a task's parquet cannot leak the answer's siblings - # (e.g. ltv_revenue_730d ⊇ ltv_revenue_90d). The deliberate + # Each task is a standalone single-target split: drop every OTHER target + # column so a task's parquet cannot leak the answer's siblings (e.g. + # ltv_revenue_730d ⊇ ltv_revenue_90d). The deliberate # mrr_change_full_period trap (leakage_risk but not a target) is kept. all_target_cols = {f.name for f in CUSTOMER_SNAPSHOT_FEATURES if f.is_target} - task_row_counts: dict[str, dict[str, int]] = {} - all_tasks = [] + tasks: list[TaskExport] = [] for regime, snapshot in snapshots.items(): for task in lifecycle_task_manifests(regime): other_targets = [ c for c in all_target_cols - {task.label_column} if c in snapshot.columns ] - task_df = snapshot.drop(columns=other_targets) - counts = write_task_splits(task_df, root / "tasks", seed=config.seed, task=task) - task_row_counts[task.task_id] = counts - all_tasks.append(task) - - # 3. Dataset card + feature dictionary - (root / "dataset_card.md").write_text( - render_lifecycle_dataset_card( - bundle.spec, - table_counts=table_row_counts, - tasks=tuple(all_tasks), - observation_date=population.observation_date, - ) - ) - write_feature_dictionary( - root / "feature_dictionary.csv", features=tuple(CUSTOMER_SNAPSHOT_FEATURES) - ) + tasks.append(TaskExport(manifest=task, frame=snapshot.drop(columns=other_targets))) - # 4. Exposure metadata (delegates hidden truth to write_metadata) - apply_exposure(bundle, root, config.exposure_mode) + dataset_card = render_lifecycle_dataset_card( + bundle.spec, + table_counts=table_counts, + tasks=tuple(t.manifest for t in tasks), + observation_date=population.observation_date, + ) - # 5. Manifest - manifest = build_manifest( - config=config, + write_bundle_envelope( + bundle, + Path(path), + relational=dfs, + tasks=tasks, + dataset_card=dataset_card, + feature_specs=CUSTOMER_SNAPSHOT_FEATURES, generation_scheme=self.name, motif_family=artifacts.motif_family, - table_row_counts=table_row_counts, - task_row_counts=task_row_counts, - bundle_root=root, - generation_timestamp=generation_timestamp, + relational_snapshot_safe=bundle_filter.relational_snapshot_safe, + structural_redactions=structural_redactions, extra_fields={ "observation_date": population.observation_date, # The actual exported target windows (source of truth), not @@ -272,10 +248,8 @@ def write_bundle( "forward_windows_days": list(FORWARD_WINDOWS_DAYS), "early_tenure_weeks": config.early_tenure_weeks, }, - relational_snapshot_safe=bundle_filter.relational_snapshot_safe, - structural_redactions=structural_redactions, + generation_timestamp=generation_timestamp, ) - write_manifest(manifest, root) def write_metadata(self, bundle: WorldBundle, meta_dir: Path) -> None: """Write the lifecycle hidden-truth files into *meta_dir*. diff --git a/tests/render/test_bundle_envelope.py b/tests/render/test_bundle_envelope.py new file mode 100644 index 0000000..cd4f894 --- /dev/null +++ b/tests/render/test_bundle_envelope.py @@ -0,0 +1,106 @@ +"""Tests for the shared bundle-writing envelope (LTV-Pn.4d). + +The byte-identity of both schemes' full bundles is exercised by their own +suites + the generator round-trip; these tests pin the envelope's own contract +(ordering, all artefacts written, manifest fields) directly on a minimal bundle. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pandas as pd + +from leadforge.core.models import GenerationConfig, WorldBundle, WorldSpec +from leadforge.render.bundle import TaskExport, write_bundle_envelope +from leadforge.schema.features import FeatureSpec +from leadforge.schema.tasks import SplitSpec, TaskManifest + +_TS = "2026-01-01T00:00:00+00:00" + + +def _spec() -> WorldSpec: + # student_public (the default) keeps this focused on envelope I/O: no + # metadata/ is written, so no scheme hidden-truth hook is exercised. + return WorldSpec( + config=GenerationConfig(seed=1, n_accounts=2, n_contacts=3, n_leads=4), + scheme="lead_scoring", + ) + + +def _features() -> tuple[FeatureSpec, ...]: + return ( + FeatureSpec(name="x", dtype="Int64", description="", category="lead_meta"), + FeatureSpec(name="y", dtype="Float64", description="", category="engagement"), + ) + + +def _task_frame() -> pd.DataFrame: + return pd.DataFrame({"x": list(range(20)), "y": [float(i) for i in range(20)]}) + + +def _write(tmp_path: Path, **overrides) -> Path: + # Default exposure_mode is student_public → relational_snapshot_safe path, + # so apply_exposure writes no metadata (no scheme hidden-truth hook needed). + bundle = WorldBundle(spec=_spec(), artifacts=None) + task = TaskManifest( + task_id="demo_task", + label_column="x", + label_window_days=90, + primary_table="t", + split=SplitSpec(0.5, 0.25, 0.25), + ) + kwargs = { + "relational": {"t": pd.DataFrame({"id": [1, 2, 3]})}, + "tasks": [TaskExport(manifest=task, frame=_task_frame())], + "dataset_card": "# card\n", + "feature_specs": _features(), + "generation_scheme": "lead_scoring", + "relational_snapshot_safe": True, # student_public default → no metadata hook + "generation_timestamp": _TS, + } + kwargs.update(overrides) + out = tmp_path / "b" + write_bundle_envelope(bundle, out, **kwargs) + return out + + +def test_envelope_writes_all_artifacts(tmp_path) -> None: + out = _write(tmp_path) + assert (out / "manifest.json").is_file() + assert (out / "dataset_card.md").read_text() == "# card\n" + assert (out / "feature_dictionary.csv").is_file() + assert (out / "tables" / "t.parquet").is_file() + for split in ("train", "valid", "test"): + assert (out / "tasks" / "demo_task" / f"{split}.parquet").is_file() + assert (out / "tasks" / "demo_task" / "task_manifest.json").is_file() + # student_public → no metadata + assert not (out / "metadata").exists() + + +def test_envelope_manifest_records_passthrough_fields(tmp_path) -> None: + out = _write( + tmp_path, + motif_family="fit_dominant", + extra_fields={"observation_date": "2026-06-01"}, + ) + m = json.loads((out / "manifest.json").read_text()) + assert m["generation_scheme"] == "lead_scoring" + assert m["motif_family"] == "fit_dominant" + assert m["observation_date"] == "2026-06-01" + assert m["tables"]["t"]["row_count"] == 3 + assert set(m["tasks"]["demo_task"]) >= {"train_rows", "valid_rows", "test_rows"} + # 20 rows split 0.5/0.25/0.25. + assert m["tasks"]["demo_task"]["train_rows"] == 10 + + +def test_envelope_writes_multiple_task_dirs(tmp_path) -> None: + t1 = TaskManifest("a", "x", 90, "t", SplitSpec(0.5, 0.25, 0.25)) + t2 = TaskManifest("b", "y", 90, "t", SplitSpec(0.5, 0.25, 0.25), task_type="regression") + out = _write( + tmp_path, + tasks=[TaskExport(t1, _task_frame()), TaskExport(t2, _task_frame())], + ) + dirs = {p.name for p in (out / "tasks").iterdir() if p.is_dir()} + assert dirs == {"a", "b"} From 641888c2ab669266c8abd23dc515a76513feab0a Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Mon, 15 Jun 2026 19:48:37 +0300 Subject: [PATCH 2/2] docs(lead_scoring): clarify the tasks wrapper is off the write path [LTV-Pn.4d] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review of the orchestrator extraction: lead-scoring's write_bundle now writes tasks through the shared bundle envelope, so the schemes.lead_scoring.render.tasks wrapper is no longer on the write-bundle path — its docstring still claimed it kept the default "so existing call sites are unchanged," which is now stale. The wrapper is not dead: it remains a tested convenience (its 7 test consumers rely on the CONVERTED_WITHIN_90_DAYS default), so removing it would only churn tests to pass an explicit task and reverse the deliberate Pn.3 layout. Docstring updated to state it's the scheme's default-task helper, off the write path since Pn.4d. No behaviour change. Co-Authored-By: Claude Opus 4.8 --- leadforge/schemes/lead_scoring/render/tasks.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/leadforge/schemes/lead_scoring/render/tasks.py b/leadforge/schemes/lead_scoring/render/tasks.py index a431189..3928428 100644 --- a/leadforge/schemes/lead_scoring/render/tasks.py +++ b/leadforge/schemes/lead_scoring/render/tasks.py @@ -2,8 +2,13 @@ The deterministic shuffle/split/write logic is scheme-agnostic and lives in :func:`leadforge.render.tasks.write_task_splits` (lifted there in LTV-Pn.3, -byte-identical for this scheme). This wrapper preserves the lead-scoring -default task so existing call sites are unchanged. +byte-identical for this scheme). This wrapper is a convenience that defaults +the task to :data:`CONVERTED_WITHIN_90_DAYS`. + +Since LTV-Pn.4d the lead-scoring ``write_bundle`` writes tasks through the +shared bundle envelope (:func:`leadforge.render.bundle.write_bundle_envelope`, +which calls the shared writer with an explicit task), so this wrapper is no +longer on the write-bundle path; it remains as the scheme's default-task helper. """ from __future__ import annotations