Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operator_runbook.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Operator-facing summary entrypoints:
- `scripts/run_monthly_report_bundle.py` for the standard monthly report bundle used by Actions artifacts and AI review handoff
- `scripts/write_release_heartbeat.py` for the lightweight logs-branch heartbeat record
- Monthly live-pool ordering uses a deterministic tie-break: `final_score`, then `confidence`, then `liquidity_stability`, then `avg_quote_vol_180`, then `symbol`
- Monthly review evidence includes the latest official/challenger track composition, overlap against the official pool, next-candidate boundary, and the deterministic tie-break order.

Boundary rules:

Expand Down
248 changes: 246 additions & 2 deletions scripts/run_monthly_review_briefing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,40 @@ def load_optional_track_summary(path: Path) -> list[dict[str, str]]:
return load_track_summary(path)


def load_optional_csv(path: Path) -> list[dict[str, str]]:
if not path.exists():
return []
with path.open("r", encoding="utf-8", newline="") as handle:
return list(csv.DictReader(handle))


def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default


def _safe_float(value: Any) -> float | None:
try:
number = float(value)
except Exception:
return None
if number != number:
return None
return number


def _coerce_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "y"}


def _display_float(value: float | None) -> str:
return f"{value:.6f}" if value is not None else "n/a"


def resolve_as_of_date(
summary: dict[str, Any],
release_status_summary: dict[str, Any],
Expand All @@ -82,19 +109,22 @@ def build_review_inputs(output_dir: Path | str) -> dict[str, Any]:
live_pool_path = root / "live_pool.json"
manifest_path = root / "release_manifest.json"
track_summary_path = root / "shadow_candidate_tracks" / "track_summary.csv"
latest_ranking_path = root / "latest_ranking.csv"

return {
"summary": load_optional_json(summary_path),
"release_status_summary": load_optional_json(release_status_summary_path),
"live_pool": load_json(live_pool_path),
"manifest": load_json(manifest_path),
"track_rows": load_optional_track_summary(track_summary_path),
"latest_ranking_rows": load_optional_csv(latest_ranking_path),
"paths": {
"monthly_shadow_build_summary": str(summary_path),
"release_status_summary": str(release_status_summary_path),
"live_pool": str(live_pool_path),
"release_manifest": str(manifest_path),
"track_summary": str(track_summary_path),
"latest_ranking": str(latest_ranking_path),
},
"availability": {
"monthly_shadow_build_summary": summary_path.exists(),
Expand Down Expand Up @@ -160,6 +190,148 @@ def derive_warnings(inputs: dict[str, Any]) -> list[str]:
return warnings


def _resolve_release_index_path(output_dir: Path, track_summary_path: Path, path_value: str) -> Path | None:
raw_path = Path(path_value)
candidates: list[Path] = []
if raw_path.is_absolute():
candidates.append(raw_path)
else:
candidates.extend(
[
output_dir / raw_path,
track_summary_path.parent / raw_path,
PROJECT_ROOT / raw_path,
]
)
path_text = str(raw_path)
if path_text.startswith("data/output/"):
candidates.insert(0, output_dir / path_text.removeprefix("data/output/"))

for candidate in candidates:
if candidate.exists():
return candidate
return None


def _latest_track_release(
output_dir: Path,
track_summary_path: Path,
track_row: dict[str, str],
as_of_date: str,
official_symbols: list[str],
) -> dict[str, Any]:
index_path = _resolve_release_index_path(output_dir, track_summary_path, track_row.get("release_index_path", ""))
preview = {
"available": False,
"release_index_path": str(index_path) if index_path is not None else str(track_row.get("release_index_path", "")),
"as_of_date": "",
"version": "",
"symbols": [],
"overlap_with_official": 0,
"only_in_track": [],
"only_in_official": list(official_symbols),
"pool_stability": None,
"pool_churn": None,
}
if index_path is None:
return preview

rows = load_optional_csv(index_path)
if not rows:
return preview

matching = [row for row in rows if str(row.get("as_of_date", "")).strip() == as_of_date]
release_row = matching[-1] if matching else rows[-1]
symbols = [symbol for symbol in str(release_row.get("symbols", "")).split("|") if symbol]
official_set = set(official_symbols)
symbol_set = set(symbols)

preview.update(
{
"available": True,
"as_of_date": str(release_row.get("as_of_date", "")),
"version": str(release_row.get("version", "")),
"symbols": symbols,
"overlap_with_official": len(symbol_set & official_set),
"only_in_track": [symbol for symbol in symbols if symbol not in official_set],
"only_in_official": [symbol for symbol in official_symbols if symbol not in symbol_set],
"pool_stability": _safe_float(release_row.get("pool_stability")),
"pool_churn": _safe_float(release_row.get("pool_churn")),
}
)
return preview


def build_track_release_previews(inputs: dict[str, Any], as_of_date: str, official_symbols: list[str]) -> dict[str, Any]:
root = Path(inputs["paths"]["latest_ranking"]).parent
track_summary_path = Path(inputs["paths"]["track_summary"])
track_map = {row.get("track_id", ""): row for row in inputs["track_rows"]}
previews: dict[str, Any] = {}
for track_id in ("official_baseline", "challenger_topk_60"):
row = track_map.get(track_id)
if row is None:
previews[track_id] = {"available": False, "symbols": []}
continue
previews[track_id] = _latest_track_release(root, track_summary_path, row, as_of_date, official_symbols)
return previews


def _ranking_row_summary(row: dict[str, str] | None) -> dict[str, Any] | None:
if row is None:
return None
return {
"rank": _safe_int(row.get("current_rank")),
"symbol": str(row.get("symbol", "")),
"final_score": _safe_float(row.get("final_score")),
"confidence": _safe_float(row.get("confidence")),
"liquidity_stability": _safe_float(row.get("liquidity_stability")),
"avg_quote_vol_180": _safe_float(row.get("avg_quote_vol_180")),
"selected_flag": _coerce_bool(row.get("selected_flag")),
}


def build_selection_boundary(ranking_rows: list[dict[str, str]], pool_size: int) -> dict[str, Any]:
tie_break_order = ["final_score", "confidence", "liquidity_stability", "avg_quote_vol_180", "symbol"]
if not ranking_rows:
return {
"available": False,
"tie_break_order": tie_break_order,
"selected_cutoff": None,
"next_candidate": None,
"score_gap_to_next": None,
}

def sort_key(row: dict[str, str]) -> tuple[float, float, float, float, float, str]:
rank_value = _safe_float(row.get("current_rank"))
if rank_value is not None and rank_value > 0:
return (0.0, rank_value, 0.0, 0.0, 0.0, str(row.get("symbol", "")).upper())
return (
1.0,
-(_safe_float(row.get("final_score")) or float("-inf")),
-(_safe_float(row.get("confidence")) or float("-inf")),
-(_safe_float(row.get("liquidity_stability")) or float("-inf")),
-(_safe_float(row.get("avg_quote_vol_180")) or float("-inf")),
str(row.get("symbol", "")).upper(),
)

ordered = sorted(ranking_rows, key=sort_key)
selected_rows = [row for row in ordered if _coerce_bool(row.get("selected_flag"))]
cutoff_index = min(max(len(selected_rows), pool_size), len(ordered))
selected_cutoff = ordered[cutoff_index - 1] if cutoff_index else None
next_candidate = ordered[cutoff_index] if cutoff_index < len(ordered) else None
cutoff_score = _safe_float(selected_cutoff.get("final_score")) if selected_cutoff is not None else None
next_score = _safe_float(next_candidate.get("final_score")) if next_candidate is not None else None
score_gap = cutoff_score - next_score if cutoff_score is not None and next_score is not None else None

return {
"available": True,
"tie_break_order": tie_break_order,
"selected_cutoff": _ranking_row_summary(selected_cutoff),
"next_candidate": _ranking_row_summary(next_candidate),
"score_gap_to_next": score_gap,
}


def require_shadow_outputs(inputs: dict[str, Any]) -> None:
availability = inputs["availability"]
missing_items: list[str] = []
Expand Down Expand Up @@ -200,6 +372,10 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]:
release_official = release_status_summary.get("official_release", {})
shadow_available = bool(track_rows)
as_of_date = resolve_as_of_date(summary, release_status_summary, live_pool)
official_symbols = list(release_official.get("symbols", live_pool.get("symbols", [])))
pool_size = _safe_int(
official_baseline.get("pool_size", release_official.get("pool_size", live_pool.get("pool_size", 0)))
)

return {
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
Expand All @@ -209,8 +385,8 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]:
"profile": str(official_baseline.get("profile", official_track.get("profile_name", "baseline_blended_rank"))),
"version": str(official_baseline.get("version", release_official.get("version", live_pool.get("version", "")))),
"mode": str(official_baseline.get("mode", release_official.get("mode", live_pool.get("mode", "")))),
"pool_size": _safe_int(official_baseline.get("pool_size", release_official.get("pool_size", live_pool.get("pool_size", 0)))),
"symbols": list(release_official.get("symbols", live_pool.get("symbols", []))),
"pool_size": pool_size,
"symbols": official_symbols,
"source_project": str(release_official.get("source_project", live_pool.get("source_project", ""))),
},
"publish": {
Expand Down Expand Up @@ -239,6 +415,8 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]:
"release_index_path": str(challenger_track.get("release_index_path", "")),
},
},
"track_release_previews": build_track_release_previews(inputs, as_of_date, official_symbols),
"selection_boundary": build_selection_boundary(inputs["latest_ranking_rows"], pool_size),
"shadow_analysis_available": shadow_available,
"warnings": warnings,
"operator_checklist": [
Expand All @@ -256,6 +434,8 @@ def render_review_markdown(payload: dict[str, Any]) -> str:
official = payload["official_baseline"]
publish = payload["publish"]
tracks = payload["tracks"]
previews = payload["track_release_previews"]
boundary = payload["selection_boundary"]
official_track_line = (
f"releases={tracks['official_baseline']['release_count']} first={tracks['official_baseline']['first_as_of_date']} "
f"last={tracks['official_baseline']['last_as_of_date']} status={tracks['official_baseline']['candidate_status']}"
Expand All @@ -271,6 +451,11 @@ def render_review_markdown(payload: dict[str, Any]) -> str:
warning_lines = "\n".join(f"- {item}" for item in payload["warnings"]) if payload["warnings"] else "- none"
checklist_lines = "\n".join(f"{idx}. {item}" for idx, item in enumerate(payload["operator_checklist"], start=1))
symbols = ", ".join(official["symbols"]) if official["symbols"] else "n/a"
comparison_lines = "\n".join(
render_track_preview_line(track_id, preview)
for track_id, preview in previews.items()
)
boundary_lines = render_boundary_lines(boundary)

return f"""# Monthly Review

Expand Down Expand Up @@ -299,6 +484,14 @@ def render_review_markdown(payload: dict[str, Any]) -> str:
- official_baseline: {official_track_line}
- challenger_topk_60: {challenger_track_line}

## Track release comparison

{comparison_lines}

## Selection boundary

{boundary_lines}

## Warnings

{warning_lines}
Expand All @@ -309,9 +502,58 @@ def render_review_markdown(payload: dict[str, Any]) -> str:
"""


def render_track_preview_line(track_id: str, preview: dict[str, Any]) -> str:
if not preview.get("available"):
return f"- {track_id}: release preview unavailable"
symbols = ", ".join(preview.get("symbols", [])) or "n/a"
only_in_track = ", ".join(preview.get("only_in_track", [])) or "none"
only_in_official = ", ".join(preview.get("only_in_official", [])) or "none"
return (
f"- {track_id}: as_of={preview['as_of_date']} symbols={symbols} "
f"overlap_with_official={preview['overlap_with_official']} "
f"only_in_track={only_in_track} only_in_official={only_in_official} "
f"stability={_display_float(preview.get('pool_stability'))} churn={_display_float(preview.get('pool_churn'))}"
)


def render_boundary_lines(boundary: dict[str, Any]) -> str:
tie_break = " > ".join(boundary["tie_break_order"])
if not boundary.get("available"):
return f"- ranking preview unavailable\n- deterministic tie-break order: {tie_break}"

cutoff = boundary.get("selected_cutoff")
next_candidate = boundary.get("next_candidate")
cutoff_line = (
f"rank={cutoff['rank']} symbol={cutoff['symbol']} score={_display_float(cutoff['final_score'])}"
if cutoff
else "n/a"
)
next_line = (
f"rank={next_candidate['rank']} symbol={next_candidate['symbol']} score={_display_float(next_candidate['final_score'])}"
if next_candidate
else "n/a"
)
return "\n".join(
[
f"- selected cutoff: {cutoff_line}",
f"- next candidate: {next_line}",
f"- score gap to next: {_display_float(boundary.get('score_gap_to_next'))}",
f"- deterministic tie-break order: {tie_break}",
]
)


def render_review_prompt(payload: dict[str, Any]) -> str:
questions = "\n".join(f"{idx}. {item}" for idx, item in enumerate(payload["review_questions"], start=1))
warnings = "\n".join(f"- {item}" for item in payload["warnings"]) if payload["warnings"] else "- none"
challenger = payload["track_release_previews"].get("challenger_topk_60", {})
challenger_symbols = ", ".join(challenger.get("symbols", [])) if challenger.get("available") else "n/a"
next_candidate = payload["selection_boundary"].get("next_candidate")
next_candidate_text = (
f"{next_candidate['symbol']} score={_display_float(next_candidate['final_score'])}"
if next_candidate
else "n/a"
)
return f"""Monthly release review prompt

Context:
Expand All @@ -326,6 +568,8 @@ def render_review_prompt(payload: dict[str, Any]) -> str:
- official version: {payload['official_baseline']['version']}
- official mode: {payload['official_baseline']['mode']}
- official symbols: {", ".join(payload['official_baseline']['symbols']) or 'n/a'}
- challenger_topk_60 symbols: {challenger_symbols}
- next candidate after selected pool: {next_candidate_text}

Warnings:
{warnings}
Expand Down
29 changes: 29 additions & 0 deletions tests/test_monthly_review_briefing.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@ def write_fixture_files(
"official_baseline,baseline_blended_rank,blended_rank_pct,official_baseline,official_reference,64,2020-12-31,2026-03-13,official/release_index.csv\n"
f"challenger_topk_60,challenger_topk_60,future_topk_label_60,shadow_candidate,shadow_candidate,64,2020-12-31,{challenger_last_as_of_date},challenger/release_index.csv\n"
)
(shadow_dir / "official").mkdir()
(shadow_dir / "official" / "release_index.csv").write_text(
"version,as_of_date,pool_size,symbols,pool_stability,pool_churn\n"
"2026-03-13-core_major,2026-03-13,5,TRXUSDT|ETHUSDT|BCHUSDT|NEARUSDT|SOLUSDT,0.8,0.2\n",
encoding="utf-8",
)
(shadow_dir / "challenger").mkdir()
(shadow_dir / "challenger" / "release_index.csv").write_text(
"version,as_of_date,pool_size,symbols,pool_stability,pool_churn\n"
"2026-03-13-core_major,2026-03-13,5,TRXUSDT|ETHUSDT|BCHUSDT|XRPUSDT|DOGEUSDT,0.6,0.4\n",
encoding="utf-8",
)
(output_dir / "latest_ranking.csv").write_text(
"as_of_date,symbol,final_score,confidence,liquidity_stability,avg_quote_vol_180,selected_flag,current_rank\n"
"2026-03-13,TRXUSDT,0.90,0.70,0.90,1000,true,1\n"
"2026-03-13,ETHUSDT,0.80,0.60,0.80,900,true,2\n"
"2026-03-13,BCHUSDT,0.70,0.50,0.70,800,true,3\n"
"2026-03-13,NEARUSDT,0.60,0.40,0.60,700,true,4\n"
"2026-03-13,SOLUSDT,0.50,0.30,0.50,600,true,5\n"
"2026-03-13,XRPUSDT,0.49,0.20,0.40,500,false,6\n",
encoding="utf-8",
)
return output_dir

def test_build_review_payload_reports_ok_when_outputs_align(self) -> None:
Expand All @@ -101,6 +123,13 @@ def test_build_review_payload_reports_ok_when_outputs_align(self) -> None:
self.assertEqual(payload["status"], "ok")
self.assertEqual(payload["official_baseline"]["pool_size"], 5)
self.assertEqual(payload["tracks"]["challenger_topk_60"]["release_count"], 64)
self.assertEqual(
payload["track_release_previews"]["challenger_topk_60"]["symbols"],
["TRXUSDT", "ETHUSDT", "BCHUSDT", "XRPUSDT", "DOGEUSDT"],
)
self.assertEqual(payload["track_release_previews"]["challenger_topk_60"]["overlap_with_official"], 3)
self.assertEqual(payload["selection_boundary"]["next_candidate"]["symbol"], "XRPUSDT")
self.assertAlmostEqual(payload["selection_boundary"]["score_gap_to_next"], 0.01)
self.assertEqual(payload["warnings"], [])

def test_build_review_payload_warns_when_track_dates_do_not_align(self) -> None:
Expand Down