diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 7e6b54e..4374cc6 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -7,6 +7,7 @@ from application.execution_service import execute_rebalance_cycle from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime +from application.signal_snapshot import build_signal_snapshot from notifications.events import NotificationPublisher from notifications import renderers as notification_renderers from quant_platform_kit.common.notification_localization import ( @@ -217,6 +218,15 @@ def fetch_replanned_state(): safe_haven_cash_substitute_threshold_usd=config.safe_haven_cash_substitute_threshold_usd, ) execution = execution_result.execution + execution["signal_snapshot"] = build_signal_snapshot( + platform="longbridge", + strategy_profile=config.strategy_profile, + execution={ + **execution, + "latest_price_source": "longbridge_candlesticks", + }, + allocation=execution_result.allocation, + ) logs = list(execution_result.logs) skip_logs = list(execution_result.skip_logs) note_logs = list(execution_result.note_logs) @@ -249,3 +259,4 @@ def fetch_replanned_state(): extra_notification_lines=config.extra_notification_lines, ) ) + return execution_result diff --git a/application/runtime_composer.py b/application/runtime_composer.py index 08e04b6..d393ebe 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -182,6 +182,7 @@ def build_rebalance_config(self, *, strategy_plugin_signals=()) -> LongBridgeReb separator=self.separator, translator=self.translator, with_prefix=self.with_prefix, + strategy_profile=self.strategy_profile, strategy_display_name=self.strategy_display_name_localized, dry_run_only=self.dry_run_only, post_sell_refresh_attempts=self.order_poll_max_attempts, diff --git a/application/runtime_dependencies.py b/application/runtime_dependencies.py index d3bad10..4280d4f 100644 --- a/application/runtime_dependencies.py +++ b/application/runtime_dependencies.py @@ -16,6 +16,7 @@ class LongBridgeRebalanceConfig: separator: str translator: Callable[..., str] with_prefix: Callable[[str], str] + strategy_profile: str = "" strategy_display_name: str = "" dry_run_only: bool = False post_sell_refresh_attempts: int = 1 diff --git a/application/signal_snapshot.py b/application/signal_snapshot.py new file mode 100644 index 0000000..4e0463d --- /dev/null +++ b/application/signal_snapshot.py @@ -0,0 +1,166 @@ +"""Shared signal snapshot payload helpers for platform reports.""" + +from __future__ import annotations + +from collections.abc import Mapping +from datetime import date, datetime, timezone +from typing import Any + +SIGNAL_SNAPSHOT_SCHEMA_VERSION = "signal_snapshot.v1" + +_INDICATOR_FIELDS = ( + "benchmark_symbol", + "benchmark_price", + "long_trend_value", + "exit_line", + "active_risk_asset", + "allocation_mode", + "trend_symbol", + "trend_price", + "trend_ma", + "trend_ma20", + "trend_ma20_slope", + "trend_rsi14", + "trend_rsi14_dynamic_threshold", + "trend_rsi14_effective_threshold", + "trend_bb_upper", + "blend_gate_volatility_delever_metric", + "blend_gate_volatility_delever_triggered", +) + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _json_safe(value: Any) -> Any: + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, date): + return value.isoformat() + if isinstance(value, Mapping): + return {str(key): _json_safe(item) for key, item in value.items()} + if isinstance(value, (list, tuple, set)): + return [_json_safe(item) for item in value] + return value + + +def _first_value(*values: Any) -> Any: + for value in values: + if value is not None and value != "": + return value + return None + + +def _merge_signal_sources(*sources: Mapping[str, Any] | None) -> dict[str, Any]: + merged: dict[str, Any] = {} + for source in sources: + if not isinstance(source, Mapping): + continue + annotations = source.get("execution_annotations") + if isinstance(annotations, Mapping): + merged.update(annotations) + merged.update(source) + return merged + + +def _normalized_numeric_mapping(value: Any) -> dict[str, float]: + if not isinstance(value, Mapping): + return {} + normalized: dict[str, float] = {} + for key, raw_value in value.items(): + symbol = str(key or "").strip().upper() + if not symbol: + continue + try: + normalized[symbol] = float(raw_value) + except (TypeError, ValueError): + continue + return normalized + + +def _target_payload( + *, + allocation: Mapping[str, Any] | None, + explicit_target_weights: Mapping[str, Any] | None, +) -> tuple[str | None, dict[str, float], dict[str, float]]: + allocation = allocation if isinstance(allocation, Mapping) else {} + target_mode = str(allocation.get("target_mode") or "").strip() or None + targets = _normalized_numeric_mapping(explicit_target_weights or allocation.get("targets")) + if target_mode == "value": + return target_mode, {}, targets + return target_mode, targets, {} + + +def build_signal_snapshot( + *, + platform: str, + strategy_profile: str | None = None, + generated_at: datetime | None = None, + diagnostics: Mapping[str, Any] | None = None, + execution: Mapping[str, Any] | None = None, + allocation: Mapping[str, Any] | None = None, + metadata: Mapping[str, Any] | None = None, + target_weights: Mapping[str, Any] | None = None, +) -> dict[str, Any]: + source = _merge_signal_sources(metadata, diagnostics, execution) + target_mode, normalized_weights, normalized_values = _target_payload( + allocation=allocation, + explicit_target_weights=target_weights, + ) + indicators = { + field: _json_safe(source[field]) + for field in _INDICATOR_FIELDS + if source.get(field) not in (None, "") + } + snapshot = { + "schema_version": SIGNAL_SNAPSHOT_SCHEMA_VERSION, + "platform": str(platform or "").strip(), + "strategy_profile": _first_value(strategy_profile, source.get("strategy_profile")), + "strategy_version": source.get("strategy_version"), + "generated_at": _json_safe(generated_at or _utcnow()), + "signal_as_of": _json_safe( + _first_value( + source.get("signal_as_of"), + source.get("signal_date"), + source.get("snapshot_as_of"), + source.get("trade_date"), + ) + ), + "market_date": _json_safe( + _first_value( + source.get("market_date"), + source.get("signal_date"), + source.get("snapshot_as_of"), + source.get("trade_date"), + ) + ), + "effective_date": _json_safe(source.get("effective_date")), + "latest_price_source": _first_value( + source.get("latest_price_source"), + source.get("price_source_mode"), + source.get("market_data_source"), + source.get("signal_source"), + ), + "quote_overlay_used": source.get("quote_overlay_used"), + "data_freshness_warning": _first_value( + source.get("data_freshness_warning"), + source.get("snapshot_price_fallback_used"), + ), + "signal": _first_value( + source.get("signal_display"), + source.get("signal_description"), + source.get("signal_message"), + ), + "status": _first_value( + source.get("status_display"), + source.get("status_description"), + source.get("market_status"), + source.get("canary_status"), + ), + "target_mode": target_mode, + "target_weights": normalized_weights, + "target_values": normalized_values, + "indicators": indicators, + } + return {key: _json_safe(value) for key, value in snapshot.items()} diff --git a/main.py b/main.py index 597f7d5..7bee189 100644 --- a/main.py +++ b/main.py @@ -309,13 +309,28 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali ) if not validation_only: publish_strategy_plugin_alerts(strategy_plugin_signals, report=report) - run_rebalance_cycle( + cycle_result = run_rebalance_cycle( runtime=composer.build_rebalance_runtime( silent_cycle_notifications=validation_only, ), config=composer.build_rebalance_config(), ) - finalize_runtime_report(report, status="ok") + signal_snapshot = {} + if cycle_result is not None: + execution = dict(getattr(cycle_result, "execution", {}) or {}) + signal_snapshot = dict(execution.get("signal_snapshot") or {}) + if signal_snapshot: + reporting_adapters.log_event( + log_context, + "strategy_signal_snapshot", + message="Strategy signal snapshot", + **signal_snapshot, + ) + finalize_runtime_report( + report, + status="ok", + diagnostics={"signal_snapshot": signal_snapshot} if signal_snapshot else None, + ) reporting_adapters.log_event( log_context, "strategy_cycle_completed", diff --git a/notifications/renderers.py b/notifications/renderers.py index def436e..147e013 100644 --- a/notifications/renderers.py +++ b/notifications/renderers.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Mapping import re from notifications.events import RenderedNotification @@ -123,6 +124,42 @@ def _append_timing_lines(lines, *, execution, translator) -> None: lines.extend(_build_timing_audit_lines(execution, translator=translator)) +def _format_signal_snapshot_line(snapshot, *, translator) -> str: + if not isinstance(snapshot, Mapping): + return "" + market_date = str(snapshot.get("market_date") or snapshot.get("signal_as_of") or "").strip() + source = str(snapshot.get("latest_price_source") or "").strip() + overlay = snapshot.get("quote_overlay_used") + warning = snapshot.get("data_freshness_warning") + if not market_date and not source and overlay is None and warning in (None, "", False): + return "" + if _translator_uses_zh(translator): + overlay_text = "是" if overlay is True else "否" if overlay is False else "未知" + parts = [ + f"日期 {market_date or '未知'}", + f"数据源 {source or '未知'}", + f"报价覆盖 {overlay_text}", + ] + if warning not in (None, "", False): + parts.append(f"提示 {warning}") + return "🧾 信号快照: " + " | ".join(parts) + overlay_text = "yes" if overlay is True else "no" if overlay is False else "unknown" + parts = [ + f"date {market_date or 'unknown'}", + f"source {source or 'unknown'}", + f"quote overlay {overlay_text}", + ] + if warning not in (None, "", False): + parts.append(f"warning {warning}") + return "🧾 Signal snapshot: " + " | ".join(parts) + + +def _append_signal_snapshot_line(lines, *, execution, translator) -> None: + line = _format_signal_snapshot_line(execution.get("signal_snapshot"), translator=translator) + if line: + lines.append(line) + + def _append_status_lines(lines, *, execution, translator, signal_key): status_display = _localize_notification_text(execution.get("status_display"), translator=translator) if status_display: @@ -199,6 +236,7 @@ def render_rebalance_notification( _append_extra_notification_lines(detailed_lines, extra_notification_lines) _append_dashboard_block(detailed_lines, execution=execution, separator=separator) _append_timing_lines(detailed_lines, execution=execution, translator=translator) + _append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator) _append_status_lines( detailed_lines, execution=execution, @@ -214,6 +252,7 @@ def render_rebalance_notification( _append_extra_notification_lines(compact_lines, extra_notification_lines) _append_dashboard_block(compact_lines, execution=execution, separator=separator) _append_timing_lines(compact_lines, execution=execution, translator=translator) + _append_signal_snapshot_line(compact_lines, execution=execution, translator=translator) _append_compact_status_lines( compact_lines, execution=execution, @@ -245,6 +284,7 @@ def render_heartbeat_notification( _append_extra_notification_lines(detailed_lines, extra_notification_lines) _append_dashboard_block(detailed_lines, execution=execution, separator=separator) _append_timing_lines(detailed_lines, execution=execution, translator=translator) + _append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator) detailed_lines.append(separator) _append_status_lines( detailed_lines, @@ -279,6 +319,7 @@ def render_heartbeat_notification( _append_extra_notification_lines(compact_lines, extra_notification_lines) _append_dashboard_block(compact_lines, execution=execution, separator=separator) _append_timing_lines(compact_lines, execution=execution, translator=translator) + _append_signal_snapshot_line(compact_lines, execution=execution, translator=translator) _append_compact_status_lines( compact_lines, execution=execution,