Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ metadata; difficulty_params threaded; student_public refused until 4c) opened
as **#126** (merged). `LTV-Pn.4c` (student_public snapshot-safety — public
relational projection: event tables ≤ observation_date, subscriptions
stateful/terminal columns dropped; manifest flags; CLAUDE.md clause;
lead-scoring byte-identical) opened as **#127**. Next: `Pn.4d` (shared bundle
orchestrator), `LTV-Po` (recipe; also recipe-driven difficulty resolution).
lead-scoring byte-identical) opened as **#127** (merged). `LTV-Pn.4d` (shared
bundle orchestrator — `render/bundle.py` `write_bundle_envelope`; both schemes
delegate bundle I/O; carried cleanup #1 discharged; all four bundles
byte-identical) opened as **#128** — **completes LTV-Pn.4**. Next: `LTV-Po`
(the `b2b_saas_ltv_v1` recipe assets + end-to-end `Generator.from_recipe(...)`;
also recipe-driven difficulty resolution + the narrative-consumption decision).
Note: `validate_bundle` is lead-scoring-coupled — scheme-aware validation is
`LTV-Pp`.

Expand Down
23 changes: 13 additions & 10 deletions docs/ltv/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protocol + registry, with the package physically reorganized into
| `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | #113 (Ph) |
| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | #117 (Pj), #118 (Pk) |
| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | #119 (Pl), #120 (Pm) |
| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b), #127 (Pn.4c) |
| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn.1…4`, `LTV-Po` | #121 (Pn.1), #122 (Pn.2), #124 (Pn.3), #125 (Pn.4a), #126 (Pn.4b), #127 (Pn.4c), #128 (Pn.4d) |
| `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | |
| `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | |

Expand Down Expand Up @@ -368,11 +368,15 @@ methods, then public-safety, then the carried orchestrator cleanup:
public early task) — flag for `LTV-Po`/design-doc update; tension noted
against D8's "first-class early-pLTV".
- Labels: `type: feature`, `layer: exposure`, `layer: render`, `layer: docs`
- [ ] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator`. With both
schemes' `write_bundle` in hand, lift the shared orchestrator (mkdir →
relational → tasks → card → dict → exposure → manifest) with scheme render
hooks out of the two implementations (carried cleanup #1). Both bundles
byte-identical.
- [x] **`LTV-Pn.4d`** — `refactor: shared bundle orchestrator` (**PR #128**).
New `render/bundle.py` `write_bundle_envelope` runs the shared on-disk
sequence (mkdir → relational → task splits → card → dict → exposure →
manifest) given each scheme's already-computed content (final dfs, a
`TaskExport(manifest, frame)` list, rendered card, visible features, manifest
params). Both schemes' `write_bundle` now compute only their scheme-specific
content and delegate the I/O; carried cleanup #1 discharged. **All four
bundles (lead_scoring + lifecycle × instructor + public) verified
byte-identical** via the full-bundle SHA-256 harness.
- Labels: `type: refactor`, `layer: render`, `layer: api`
- [ ] **`LTV-Po`** — `feat(recipes): b2b_saas_ltv_v1 recipe assets`. The three
recipe YAMLs (`scheme: lifecycle`); register in the recipe registry;
Expand Down Expand Up @@ -427,10 +431,9 @@ The peer-schemes reorg deliberately defers a few cleanups to keep each M2 PR
byte-identical and reviewable. They are tracked here and discharged in
**`LTV-Pn.1`/`LTV-Pn.2`** (M6), where the manifest/exposure generalization makes them clean:

1. **Shared render orchestration** — `LTV-Pe` left each scheme owning its full
`write_bundle`; only `write_relational_tables` is shared. A shared bundle
orchestrator with scheme render hooks lands in **`LTV-Pn.4`**, once the
lifecycle `write_bundle` exists to reveal the real shared shape.
1. ~~**Shared render orchestration**~~ — **Done** (`LTV-Pn.4d`):
`render/bundle.py` `write_bundle_envelope` is the shared orchestrator; both
schemes delegate their bundle I/O to it.
2. ~~**`build_manifest` / `apply_exposure` are lead-scoring-coupled**~~ —
**Done** (`build_manifest` in `LTV-Pn.1`; `apply_exposure` in `LTV-Pn.2` via
the `write_metadata` scheme hook).
Expand Down
116 changes: 116 additions & 0 deletions leadforge/render/bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Shared bundle-writing envelope for all generation schemes (LTV-Pn.4d).

Every scheme's ``write_bundle`` performs the same on-disk sequence — create the
root, write the relational tables, split each task, write the dataset card and
feature dictionary, apply exposure-mode metadata, and build + write the
manifest. Only the *content* differs (which tables, which tasks, which card),
and each scheme computes that content itself.

:func:`write_bundle_envelope` is that shared sequence. A scheme computes its
final, exposure-projected relational frames, its per-task ``(manifest, frame)``
exports, its rendered dataset card, and its visible feature catalog, then hands
them here. This keeps the I/O orchestration — and the file-ordering that the
manifest's hashing depends on — in one place, with no scheme-specific logic.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from leadforge.exposure.modes import apply_exposure
from leadforge.render.manifests import build_manifest, write_manifest
from leadforge.render.relational_io import write_relational_tables
from leadforge.render.tasks import write_task_splits
from leadforge.schema.dictionaries import write_feature_dictionary

if TYPE_CHECKING:
from collections.abc import Collection, Sequence
from pathlib import Path

import pandas as pd

from leadforge.core.models import WorldBundle
from leadforge.schema.features import FeatureSpec
from leadforge.schema.tasks import TaskManifest

__all__ = ["TaskExport", "write_bundle_envelope"]


@dataclass(frozen=True)
class TaskExport:
"""One task's manifest plus the (already exposure-projected) snapshot frame
to split into train/valid/test."""

manifest: TaskManifest
frame: pd.DataFrame


def write_bundle_envelope(
bundle: WorldBundle,
root: Path,
*,
relational: dict[str, pd.DataFrame],
tasks: Sequence[TaskExport],
dataset_card: str,
feature_specs: Sequence[FeatureSpec],
generation_scheme: str,
redacted: Collection[str] = frozenset(),
motif_family: str | None = None,
relational_snapshot_safe: bool = False,
structural_redactions: dict[str, Any] | None = None,
extra_fields: dict[str, Any] | None = None,
generation_timestamp: str | None = None,
) -> None:
"""Write *bundle* to *root* given the scheme's already-computed content.

Steps (order fixed — the manifest hashes the written files, so they must
exist first): relational tables → task splits → dataset card → feature
dictionary → exposure metadata → manifest.

Args:
bundle: The fully populated bundle (its ``spec.config`` supplies seed,
exposure mode, and the manifest's provenance fields).
root: Destination directory (created if absent).
relational: ``{table_name: DataFrame}`` already projected for the
exposure mode (snapshot-safe where required).
tasks: One :class:`TaskExport` per task directory to write.
dataset_card: Rendered ``dataset_card.md`` contents.
feature_specs: The visible feature catalog for ``feature_dictionary.csv``.
generation_scheme: Producing scheme name (recorded in the manifest).
redacted: Columns to drop from every written table/split (lead-scoring
feature redactions; empty for schemes without column redaction).
motif_family / relational_snapshot_safe / structural_redactions /
extra_fields: Passed through to :func:`build_manifest`.
generation_timestamp: ISO timestamp; defaults to now in the manifest.
"""
config = bundle.spec.config
root.mkdir(parents=True, exist_ok=True)

table_row_counts = write_relational_tables(relational, root / "tables", redacted=redacted)

task_row_counts: dict[str, dict[str, int]] = {}
for export in tasks:
task_row_counts[export.manifest.task_id] = write_task_splits(
export.frame, root / "tasks", seed=config.seed, task=export.manifest
)

(root / "dataset_card.md").write_text(dataset_card)
write_feature_dictionary(root / "feature_dictionary.csv", features=tuple(feature_specs))

apply_exposure(bundle, root, config.exposure_mode)

manifest = build_manifest(
config=config,
generation_scheme=generation_scheme,
motif_family=motif_family,
table_row_counts=table_row_counts,
task_row_counts=task_row_counts,
bundle_root=root,
generation_timestamp=generation_timestamp,
redacted_columns=sorted(redacted),
relational_snapshot_safe=relational_snapshot_safe,
structural_redactions=structural_redactions,
extra_fields=extra_fields,
)
write_manifest(manifest, root)
81 changes: 25 additions & 56 deletions leadforge/schemes/lead_scoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,8 @@ def write_bundle(
from pathlib import Path

from leadforge.exposure.filters import get_filter
from leadforge.exposure.modes import apply_exposure
from leadforge.narrative.dataset_card import render_dataset_card
from leadforge.render.manifests import build_manifest, write_manifest
from leadforge.render.relational_io import write_relational_tables
from leadforge.schema.dictionaries import write_feature_dictionary
from leadforge.render.bundle import TaskExport, write_bundle_envelope
from leadforge.schemes.lead_scoring.artifacts import LeadScoringArtifacts
from leadforge.schemes.lead_scoring.features import (
LEAD_SNAPSHOT_FEATURES,
Expand All @@ -179,7 +176,6 @@ def write_bundle(
to_dataframes_snapshot_safe,
)
from leadforge.schemes.lead_scoring.render.snapshots import build_snapshot
from leadforge.schemes.lead_scoring.render.tasks import write_task_splits
from leadforge.schemes.lead_scoring.tasks import task_manifest_for_config

artifacts = bundle.artifacts
Expand All @@ -189,30 +185,18 @@ def write_bundle(
"Call Generator.generate() first."
)

root = Path(path)
root.mkdir(parents=True, exist_ok=True)

config = bundle.spec.config
result = artifacts.simulation_result
population = artifacts.population
world_graph = artifacts.world_graph

# The redaction set comes from the canonical feature spec — the same
# source of truth the validator uses. It is applied uniformly to
# every published parquet file (relational tables AND task splits) so
# users doing feature engineering off the raw tables (per the
# README's "Option 3") cannot trivially reintroduce a redacted
# column by joining ``tables/leads.parquet`` to their feature set.
# The redaction set comes from the canonical feature spec — applied to
# every published parquet (relational tables AND task splits) so a user
# cannot reintroduce a redacted column by joining the raw tables.
redacted = redacted_columns_for(config.exposure_mode)
bundle_filter = get_filter(config.exposure_mode)

# ------------------------------------------------------------------
# 1. Relational tables → tables/
#
# The lead-scoring *shape* (9 tables; snapshot-safe projection for
# student_public) is decided here; the redaction-drop + parquet-write +
# row-count loop is the shared, scheme-agnostic envelope step.
# ------------------------------------------------------------------
# Relational shape (9 tables; snapshot-safe projection for public).
dfs = to_dataframes(result, population)
if bundle_filter.relational_snapshot_safe:
if config.snapshot_day is None:
Expand All @@ -224,11 +208,11 @@ def write_bundle(
"pass it explicitly."
)
dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day)
table_row_counts = write_relational_tables(dfs, root / "tables", redacted=redacted)
# Row counts for the dataset card == write_relational_tables' counts
# (redaction drops columns, not rows).
table_counts = {name: len(df) for name, df in dfs.items()}

# ------------------------------------------------------------------
# 2. Snapshot + task splits → tasks/
# ------------------------------------------------------------------
# Lead snapshot + single task, redacted to the exposure mode.
snapshot = build_snapshot(
result,
population,
Expand All @@ -242,43 +226,28 @@ def write_bundle(
if drop_cols:
snapshot = snapshot.drop(columns=drop_cols)
visible_features = tuple(f for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted)

task = task_manifest_for_config(config.primary_task, config.label_window_days)
task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task)

# ------------------------------------------------------------------
# 3. Dataset card and feature dictionary
# ------------------------------------------------------------------
(root / "dataset_card.md").write_text(
render_dataset_card(
bundle.spec,
task_manifest=task,
table_counts=table_row_counts,
features=visible_features,
)

dataset_card = render_dataset_card(
bundle.spec,
task_manifest=task,
table_counts=table_counts,
features=visible_features,
)
write_feature_dictionary(root / "feature_dictionary.csv", features=visible_features)

# ------------------------------------------------------------------
# 4. Exposure metadata (research_instructor only)
# ------------------------------------------------------------------
apply_exposure(bundle, root, config.exposure_mode)

# ------------------------------------------------------------------
# 5. Manifest
# ------------------------------------------------------------------
manifest = build_manifest(
config=config,

write_bundle_envelope(
bundle,
Path(path),
relational=dfs,
tasks=[TaskExport(manifest=task, frame=snapshot)],
dataset_card=dataset_card,
feature_specs=visible_features,
generation_scheme=self.name,
redacted=redacted,
motif_family=world_graph.motif_family,
table_row_counts=table_row_counts,
task_row_counts={task.task_id: task_row_counts},
bundle_root=root,
generation_timestamp=generation_timestamp,
redacted_columns=sorted(redacted),
relational_snapshot_safe=bundle_filter.relational_snapshot_safe,
generation_timestamp=generation_timestamp,
)
write_manifest(manifest, root)

def write_metadata(self, bundle: WorldBundle, meta_dir: Path) -> None:
"""Write the lead-scoring hidden-truth files into *meta_dir*.
Expand Down
9 changes: 7 additions & 2 deletions leadforge/schemes/lead_scoring/render/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

The deterministic shuffle/split/write logic is scheme-agnostic and lives in
:func:`leadforge.render.tasks.write_task_splits` (lifted there in LTV-Pn.3,
byte-identical for this scheme). This wrapper preserves the lead-scoring
default task so existing call sites are unchanged.
byte-identical for this scheme). This wrapper is a convenience that defaults
the task to :data:`CONVERTED_WITHIN_90_DAYS`.

Since LTV-Pn.4d the lead-scoring ``write_bundle`` writes tasks through the
shared bundle envelope (:func:`leadforge.render.bundle.write_bundle_envelope`,
which calls the shared writer with an explicit task), so this wrapper is no
longer on the write-bundle path; it remains as the scheme's default-task helper.
"""

from __future__ import annotations
Expand Down
Loading
Loading