From fe52d1dfa0b13f24674cf7b68eab2949b92e4a5d Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Mon, 1 Jun 2026 23:15:35 +0800 Subject: [PATCH] Add monthly audit evidence details --- docs/operator_runbook.md | 1 + scripts/run_monthly_review_briefing.py | 248 ++++++++++++++++++++++++- tests/test_monthly_review_briefing.py | 29 +++ 3 files changed, 276 insertions(+), 2 deletions(-) diff --git a/docs/operator_runbook.md b/docs/operator_runbook.md index 5134221..835ae52 100644 --- a/docs/operator_runbook.md +++ b/docs/operator_runbook.md @@ -76,6 +76,7 @@ Operator-facing summary entrypoints: - `scripts/run_monthly_report_bundle.py` for the standard monthly report bundle used by Actions artifacts and AI review handoff - `scripts/write_release_heartbeat.py` for the lightweight logs-branch heartbeat record - Monthly live-pool ordering uses a deterministic tie-break: `final_score`, then `confidence`, then `liquidity_stability`, then `avg_quote_vol_180`, then `symbol` +- Monthly review evidence includes the latest official/challenger track composition, overlap against the official pool, next-candidate boundary, and the deterministic tie-break order. Boundary rules: diff --git a/scripts/run_monthly_review_briefing.py b/scripts/run_monthly_review_briefing.py index 32ab293..c7e6bb4 100644 --- a/scripts/run_monthly_review_briefing.py +++ b/scripts/run_monthly_review_briefing.py @@ -55,6 +55,13 @@ def load_optional_track_summary(path: Path) -> list[dict[str, str]]: return load_track_summary(path) +def load_optional_csv(path: Path) -> list[dict[str, str]]: + if not path.exists(): + return [] + with path.open("r", encoding="utf-8", newline="") as handle: + return list(csv.DictReader(handle)) + + def _safe_int(value: Any, default: int = 0) -> int: try: return int(value) @@ -62,6 +69,26 @@ def _safe_int(value: Any, default: int = 0) -> int: return default +def _safe_float(value: Any) -> float | None: + try: + number = float(value) + except Exception: + return None + if number != number: + return None + return number + + +def _coerce_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + return str(value).strip().lower() in {"1", "true", "yes", "y"} + + +def _display_float(value: float | None) -> str: + return f"{value:.6f}" if value is not None else "n/a" + + def resolve_as_of_date( summary: dict[str, Any], release_status_summary: dict[str, Any], @@ -82,6 +109,7 @@ def build_review_inputs(output_dir: Path | str) -> dict[str, Any]: live_pool_path = root / "live_pool.json" manifest_path = root / "release_manifest.json" track_summary_path = root / "shadow_candidate_tracks" / "track_summary.csv" + latest_ranking_path = root / "latest_ranking.csv" return { "summary": load_optional_json(summary_path), @@ -89,12 +117,14 @@ def build_review_inputs(output_dir: Path | str) -> dict[str, Any]: "live_pool": load_json(live_pool_path), "manifest": load_json(manifest_path), "track_rows": load_optional_track_summary(track_summary_path), + "latest_ranking_rows": load_optional_csv(latest_ranking_path), "paths": { "monthly_shadow_build_summary": str(summary_path), "release_status_summary": str(release_status_summary_path), "live_pool": str(live_pool_path), "release_manifest": str(manifest_path), "track_summary": str(track_summary_path), + "latest_ranking": str(latest_ranking_path), }, "availability": { "monthly_shadow_build_summary": summary_path.exists(), @@ -160,6 +190,148 @@ def derive_warnings(inputs: dict[str, Any]) -> list[str]: return warnings +def _resolve_release_index_path(output_dir: Path, track_summary_path: Path, path_value: str) -> Path | None: + raw_path = Path(path_value) + candidates: list[Path] = [] + if raw_path.is_absolute(): + candidates.append(raw_path) + else: + candidates.extend( + [ + output_dir / raw_path, + track_summary_path.parent / raw_path, + PROJECT_ROOT / raw_path, + ] + ) + path_text = str(raw_path) + if path_text.startswith("data/output/"): + candidates.insert(0, output_dir / path_text.removeprefix("data/output/")) + + for candidate in candidates: + if candidate.exists(): + return candidate + return None + + +def _latest_track_release( + output_dir: Path, + track_summary_path: Path, + track_row: dict[str, str], + as_of_date: str, + official_symbols: list[str], +) -> dict[str, Any]: + index_path = _resolve_release_index_path(output_dir, track_summary_path, track_row.get("release_index_path", "")) + preview = { + "available": False, + "release_index_path": str(index_path) if index_path is not None else str(track_row.get("release_index_path", "")), + "as_of_date": "", + "version": "", + "symbols": [], + "overlap_with_official": 0, + "only_in_track": [], + "only_in_official": list(official_symbols), + "pool_stability": None, + "pool_churn": None, + } + if index_path is None: + return preview + + rows = load_optional_csv(index_path) + if not rows: + return preview + + matching = [row for row in rows if str(row.get("as_of_date", "")).strip() == as_of_date] + release_row = matching[-1] if matching else rows[-1] + symbols = [symbol for symbol in str(release_row.get("symbols", "")).split("|") if symbol] + official_set = set(official_symbols) + symbol_set = set(symbols) + + preview.update( + { + "available": True, + "as_of_date": str(release_row.get("as_of_date", "")), + "version": str(release_row.get("version", "")), + "symbols": symbols, + "overlap_with_official": len(symbol_set & official_set), + "only_in_track": [symbol for symbol in symbols if symbol not in official_set], + "only_in_official": [symbol for symbol in official_symbols if symbol not in symbol_set], + "pool_stability": _safe_float(release_row.get("pool_stability")), + "pool_churn": _safe_float(release_row.get("pool_churn")), + } + ) + return preview + + +def build_track_release_previews(inputs: dict[str, Any], as_of_date: str, official_symbols: list[str]) -> dict[str, Any]: + root = Path(inputs["paths"]["latest_ranking"]).parent + track_summary_path = Path(inputs["paths"]["track_summary"]) + track_map = {row.get("track_id", ""): row for row in inputs["track_rows"]} + previews: dict[str, Any] = {} + for track_id in ("official_baseline", "challenger_topk_60"): + row = track_map.get(track_id) + if row is None: + previews[track_id] = {"available": False, "symbols": []} + continue + previews[track_id] = _latest_track_release(root, track_summary_path, row, as_of_date, official_symbols) + return previews + + +def _ranking_row_summary(row: dict[str, str] | None) -> dict[str, Any] | None: + if row is None: + return None + return { + "rank": _safe_int(row.get("current_rank")), + "symbol": str(row.get("symbol", "")), + "final_score": _safe_float(row.get("final_score")), + "confidence": _safe_float(row.get("confidence")), + "liquidity_stability": _safe_float(row.get("liquidity_stability")), + "avg_quote_vol_180": _safe_float(row.get("avg_quote_vol_180")), + "selected_flag": _coerce_bool(row.get("selected_flag")), + } + + +def build_selection_boundary(ranking_rows: list[dict[str, str]], pool_size: int) -> dict[str, Any]: + tie_break_order = ["final_score", "confidence", "liquidity_stability", "avg_quote_vol_180", "symbol"] + if not ranking_rows: + return { + "available": False, + "tie_break_order": tie_break_order, + "selected_cutoff": None, + "next_candidate": None, + "score_gap_to_next": None, + } + + def sort_key(row: dict[str, str]) -> tuple[float, float, float, float, float, str]: + rank_value = _safe_float(row.get("current_rank")) + if rank_value is not None and rank_value > 0: + return (0.0, rank_value, 0.0, 0.0, 0.0, str(row.get("symbol", "")).upper()) + return ( + 1.0, + -(_safe_float(row.get("final_score")) or float("-inf")), + -(_safe_float(row.get("confidence")) or float("-inf")), + -(_safe_float(row.get("liquidity_stability")) or float("-inf")), + -(_safe_float(row.get("avg_quote_vol_180")) or float("-inf")), + str(row.get("symbol", "")).upper(), + ) + + ordered = sorted(ranking_rows, key=sort_key) + selected_rows = [row for row in ordered if _coerce_bool(row.get("selected_flag"))] + cutoff_index = min(max(len(selected_rows), pool_size), len(ordered)) + selected_cutoff = ordered[cutoff_index - 1] if cutoff_index else None + next_candidate = ordered[cutoff_index] if cutoff_index < len(ordered) else None + cutoff_score = _safe_float(selected_cutoff.get("final_score")) if selected_cutoff is not None else None + next_score = _safe_float(next_candidate.get("final_score")) if next_candidate is not None else None + score_gap = cutoff_score - next_score if cutoff_score is not None and next_score is not None else None + + return { + "available": True, + "tie_break_order": tie_break_order, + "selected_cutoff": _ranking_row_summary(selected_cutoff), + "next_candidate": _ranking_row_summary(next_candidate), + "score_gap_to_next": score_gap, + } + + def require_shadow_outputs(inputs: dict[str, Any]) -> None: availability = inputs["availability"] missing_items: list[str] = [] @@ -200,6 +372,10 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]: release_official = release_status_summary.get("official_release", {}) shadow_available = bool(track_rows) as_of_date = resolve_as_of_date(summary, release_status_summary, live_pool) + official_symbols = list(release_official.get("symbols", live_pool.get("symbols", []))) + pool_size = _safe_int( + official_baseline.get("pool_size", release_official.get("pool_size", live_pool.get("pool_size", 0))) + ) return { "generated_at_utc": datetime.now(timezone.utc).isoformat(), @@ -209,8 +385,8 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]: "profile": str(official_baseline.get("profile", official_track.get("profile_name", "baseline_blended_rank"))), "version": str(official_baseline.get("version", release_official.get("version", live_pool.get("version", "")))), "mode": str(official_baseline.get("mode", release_official.get("mode", live_pool.get("mode", "")))), - "pool_size": _safe_int(official_baseline.get("pool_size", release_official.get("pool_size", live_pool.get("pool_size", 0)))), - "symbols": list(release_official.get("symbols", live_pool.get("symbols", []))), + "pool_size": pool_size, + "symbols": official_symbols, "source_project": str(release_official.get("source_project", live_pool.get("source_project", ""))), }, "publish": { @@ -239,6 +415,8 @@ def build_review_payload(inputs: dict[str, Any]) -> dict[str, Any]: "release_index_path": str(challenger_track.get("release_index_path", "")), }, }, + "track_release_previews": build_track_release_previews(inputs, as_of_date, official_symbols), + "selection_boundary": build_selection_boundary(inputs["latest_ranking_rows"], pool_size), "shadow_analysis_available": shadow_available, "warnings": warnings, "operator_checklist": [ @@ -256,6 +434,8 @@ def render_review_markdown(payload: dict[str, Any]) -> str: official = payload["official_baseline"] publish = payload["publish"] tracks = payload["tracks"] + previews = payload["track_release_previews"] + boundary = payload["selection_boundary"] official_track_line = ( f"releases={tracks['official_baseline']['release_count']} first={tracks['official_baseline']['first_as_of_date']} " f"last={tracks['official_baseline']['last_as_of_date']} status={tracks['official_baseline']['candidate_status']}" @@ -271,6 +451,11 @@ def render_review_markdown(payload: dict[str, Any]) -> str: warning_lines = "\n".join(f"- {item}" for item in payload["warnings"]) if payload["warnings"] else "- none" checklist_lines = "\n".join(f"{idx}. {item}" for idx, item in enumerate(payload["operator_checklist"], start=1)) symbols = ", ".join(official["symbols"]) if official["symbols"] else "n/a" + comparison_lines = "\n".join( + render_track_preview_line(track_id, preview) + for track_id, preview in previews.items() + ) + boundary_lines = render_boundary_lines(boundary) return f"""# Monthly Review @@ -299,6 +484,14 @@ def render_review_markdown(payload: dict[str, Any]) -> str: - official_baseline: {official_track_line} - challenger_topk_60: {challenger_track_line} +## Track release comparison + +{comparison_lines} + +## Selection boundary + +{boundary_lines} + ## Warnings {warning_lines} @@ -309,9 +502,58 @@ def render_review_markdown(payload: dict[str, Any]) -> str: """ +def render_track_preview_line(track_id: str, preview: dict[str, Any]) -> str: + if not preview.get("available"): + return f"- {track_id}: release preview unavailable" + symbols = ", ".join(preview.get("symbols", [])) or "n/a" + only_in_track = ", ".join(preview.get("only_in_track", [])) or "none" + only_in_official = ", ".join(preview.get("only_in_official", [])) or "none" + return ( + f"- {track_id}: as_of={preview['as_of_date']} symbols={symbols} " + f"overlap_with_official={preview['overlap_with_official']} " + f"only_in_track={only_in_track} only_in_official={only_in_official} " + f"stability={_display_float(preview.get('pool_stability'))} churn={_display_float(preview.get('pool_churn'))}" + ) + + +def render_boundary_lines(boundary: dict[str, Any]) -> str: + tie_break = " > ".join(boundary["tie_break_order"]) + if not boundary.get("available"): + return f"- ranking preview unavailable\n- deterministic tie-break order: {tie_break}" + + cutoff = boundary.get("selected_cutoff") + next_candidate = boundary.get("next_candidate") + cutoff_line = ( + f"rank={cutoff['rank']} symbol={cutoff['symbol']} score={_display_float(cutoff['final_score'])}" + if cutoff + else "n/a" + ) + next_line = ( + f"rank={next_candidate['rank']} symbol={next_candidate['symbol']} score={_display_float(next_candidate['final_score'])}" + if next_candidate + else "n/a" + ) + return "\n".join( + [ + f"- selected cutoff: {cutoff_line}", + f"- next candidate: {next_line}", + f"- score gap to next: {_display_float(boundary.get('score_gap_to_next'))}", + f"- deterministic tie-break order: {tie_break}", + ] + ) + + def render_review_prompt(payload: dict[str, Any]) -> str: questions = "\n".join(f"{idx}. {item}" for idx, item in enumerate(payload["review_questions"], start=1)) warnings = "\n".join(f"- {item}" for item in payload["warnings"]) if payload["warnings"] else "- none" + challenger = payload["track_release_previews"].get("challenger_topk_60", {}) + challenger_symbols = ", ".join(challenger.get("symbols", [])) if challenger.get("available") else "n/a" + next_candidate = payload["selection_boundary"].get("next_candidate") + next_candidate_text = ( + f"{next_candidate['symbol']} score={_display_float(next_candidate['final_score'])}" + if next_candidate + else "n/a" + ) return f"""Monthly release review prompt Context: @@ -326,6 +568,8 @@ def render_review_prompt(payload: dict[str, Any]) -> str: - official version: {payload['official_baseline']['version']} - official mode: {payload['official_baseline']['mode']} - official symbols: {", ".join(payload['official_baseline']['symbols']) or 'n/a'} +- challenger_topk_60 symbols: {challenger_symbols} +- next candidate after selected pool: {next_candidate_text} Warnings: {warnings} diff --git a/tests/test_monthly_review_briefing.py b/tests/test_monthly_review_briefing.py index 1dceea3..e51115c 100644 --- a/tests/test_monthly_review_briefing.py +++ b/tests/test_monthly_review_briefing.py @@ -85,6 +85,28 @@ def write_fixture_files( "official_baseline,baseline_blended_rank,blended_rank_pct,official_baseline,official_reference,64,2020-12-31,2026-03-13,official/release_index.csv\n" f"challenger_topk_60,challenger_topk_60,future_topk_label_60,shadow_candidate,shadow_candidate,64,2020-12-31,{challenger_last_as_of_date},challenger/release_index.csv\n" ) + (shadow_dir / "official").mkdir() + (shadow_dir / "official" / "release_index.csv").write_text( + "version,as_of_date,pool_size,symbols,pool_stability,pool_churn\n" + "2026-03-13-core_major,2026-03-13,5,TRXUSDT|ETHUSDT|BCHUSDT|NEARUSDT|SOLUSDT,0.8,0.2\n", + encoding="utf-8", + ) + (shadow_dir / "challenger").mkdir() + (shadow_dir / "challenger" / "release_index.csv").write_text( + "version,as_of_date,pool_size,symbols,pool_stability,pool_churn\n" + "2026-03-13-core_major,2026-03-13,5,TRXUSDT|ETHUSDT|BCHUSDT|XRPUSDT|DOGEUSDT,0.6,0.4\n", + encoding="utf-8", + ) + (output_dir / "latest_ranking.csv").write_text( + "as_of_date,symbol,final_score,confidence,liquidity_stability,avg_quote_vol_180,selected_flag,current_rank\n" + "2026-03-13,TRXUSDT,0.90,0.70,0.90,1000,true,1\n" + "2026-03-13,ETHUSDT,0.80,0.60,0.80,900,true,2\n" + "2026-03-13,BCHUSDT,0.70,0.50,0.70,800,true,3\n" + "2026-03-13,NEARUSDT,0.60,0.40,0.60,700,true,4\n" + "2026-03-13,SOLUSDT,0.50,0.30,0.50,600,true,5\n" + "2026-03-13,XRPUSDT,0.49,0.20,0.40,500,false,6\n", + encoding="utf-8", + ) return output_dir def test_build_review_payload_reports_ok_when_outputs_align(self) -> None: @@ -101,6 +123,13 @@ def test_build_review_payload_reports_ok_when_outputs_align(self) -> None: self.assertEqual(payload["status"], "ok") self.assertEqual(payload["official_baseline"]["pool_size"], 5) self.assertEqual(payload["tracks"]["challenger_topk_60"]["release_count"], 64) + self.assertEqual( + payload["track_release_previews"]["challenger_topk_60"]["symbols"], + ["TRXUSDT", "ETHUSDT", "BCHUSDT", "XRPUSDT", "DOGEUSDT"], + ) + self.assertEqual(payload["track_release_previews"]["challenger_topk_60"]["overlap_with_official"], 3) + self.assertEqual(payload["selection_boundary"]["next_candidate"]["symbol"], "XRPUSDT") + self.assertAlmostEqual(payload["selection_boundary"]["score_gap_to_next"], 0.01) self.assertEqual(payload["warnings"], []) def test_build_review_payload_warns_when_track_dates_do_not_align(self) -> None: