Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from application.execution_service import execute_rebalance_cycle
from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime
from application.signal_snapshot import build_signal_snapshot
from notifications.events import NotificationPublisher
from notifications import renderers as notification_renderers
from quant_platform_kit.common.notification_localization import (
Expand Down Expand Up @@ -217,6 +218,15 @@ def fetch_replanned_state():
safe_haven_cash_substitute_threshold_usd=config.safe_haven_cash_substitute_threshold_usd,
)
execution = execution_result.execution
execution["signal_snapshot"] = build_signal_snapshot(
platform="longbridge",
strategy_profile=config.strategy_profile,
execution={
**execution,
"latest_price_source": "longbridge_candlesticks",
},
allocation=execution_result.allocation,
)
logs = list(execution_result.logs)
skip_logs = list(execution_result.skip_logs)
note_logs = list(execution_result.note_logs)
Expand Down Expand Up @@ -249,3 +259,4 @@ def fetch_replanned_state():
extra_notification_lines=config.extra_notification_lines,
)
)
return execution_result
1 change: 1 addition & 0 deletions application/runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def build_rebalance_config(self, *, strategy_plugin_signals=()) -> LongBridgeReb
separator=self.separator,
translator=self.translator,
with_prefix=self.with_prefix,
strategy_profile=self.strategy_profile,
strategy_display_name=self.strategy_display_name_localized,
dry_run_only=self.dry_run_only,
post_sell_refresh_attempts=self.order_poll_max_attempts,
Expand Down
1 change: 1 addition & 0 deletions application/runtime_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class LongBridgeRebalanceConfig:
separator: str
translator: Callable[..., str]
with_prefix: Callable[[str], str]
strategy_profile: str = ""
strategy_display_name: str = ""
dry_run_only: bool = False
post_sell_refresh_attempts: int = 1
Expand Down
166 changes: 166 additions & 0 deletions application/signal_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Shared signal snapshot payload helpers for platform reports."""

from __future__ import annotations

from collections.abc import Mapping
from datetime import date, datetime, timezone
from typing import Any

SIGNAL_SNAPSHOT_SCHEMA_VERSION = "signal_snapshot.v1"

_INDICATOR_FIELDS = (
"benchmark_symbol",
"benchmark_price",
"long_trend_value",
"exit_line",
"active_risk_asset",
"allocation_mode",
"trend_symbol",
"trend_price",
"trend_ma",
"trend_ma20",
"trend_ma20_slope",
"trend_rsi14",
"trend_rsi14_dynamic_threshold",
"trend_rsi14_effective_threshold",
"trend_bb_upper",
"blend_gate_volatility_delever_metric",
"blend_gate_volatility_delever_triggered",
)


def _utcnow() -> datetime:
return datetime.now(timezone.utc)


def _json_safe(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
if isinstance(value, Mapping):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, (list, tuple, set)):
return [_json_safe(item) for item in value]
return value


def _first_value(*values: Any) -> Any:
for value in values:
if value is not None and value != "":
return value
return None


def _merge_signal_sources(*sources: Mapping[str, Any] | None) -> dict[str, Any]:
merged: dict[str, Any] = {}
for source in sources:
if not isinstance(source, Mapping):
continue
annotations = source.get("execution_annotations")
if isinstance(annotations, Mapping):
merged.update(annotations)
merged.update(source)
return merged


def _normalized_numeric_mapping(value: Any) -> dict[str, float]:
if not isinstance(value, Mapping):
return {}
normalized: dict[str, float] = {}
for key, raw_value in value.items():
symbol = str(key or "").strip().upper()
if not symbol:
continue
try:
normalized[symbol] = float(raw_value)
except (TypeError, ValueError):
continue
return normalized


def _target_payload(
*,
allocation: Mapping[str, Any] | None,
explicit_target_weights: Mapping[str, Any] | None,
) -> tuple[str | None, dict[str, float], dict[str, float]]:
allocation = allocation if isinstance(allocation, Mapping) else {}
target_mode = str(allocation.get("target_mode") or "").strip() or None
targets = _normalized_numeric_mapping(explicit_target_weights or allocation.get("targets"))
if target_mode == "value":
return target_mode, {}, targets
return target_mode, targets, {}


def build_signal_snapshot(
*,
platform: str,
strategy_profile: str | None = None,
generated_at: datetime | None = None,
diagnostics: Mapping[str, Any] | None = None,
execution: Mapping[str, Any] | None = None,
allocation: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
target_weights: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
source = _merge_signal_sources(metadata, diagnostics, execution)
target_mode, normalized_weights, normalized_values = _target_payload(
allocation=allocation,
explicit_target_weights=target_weights,
)
indicators = {
field: _json_safe(source[field])
for field in _INDICATOR_FIELDS
if source.get(field) not in (None, "")
}
snapshot = {
"schema_version": SIGNAL_SNAPSHOT_SCHEMA_VERSION,
"platform": str(platform or "").strip(),
"strategy_profile": _first_value(strategy_profile, source.get("strategy_profile")),
"strategy_version": source.get("strategy_version"),
"generated_at": _json_safe(generated_at or _utcnow()),
"signal_as_of": _json_safe(
_first_value(
source.get("signal_as_of"),
source.get("signal_date"),
source.get("snapshot_as_of"),
source.get("trade_date"),
)
),
"market_date": _json_safe(
_first_value(
source.get("market_date"),
source.get("signal_date"),
source.get("snapshot_as_of"),
source.get("trade_date"),
)
),
"effective_date": _json_safe(source.get("effective_date")),
"latest_price_source": _first_value(
source.get("latest_price_source"),
source.get("price_source_mode"),
source.get("market_data_source"),
source.get("signal_source"),
),
"quote_overlay_used": source.get("quote_overlay_used"),
"data_freshness_warning": _first_value(
source.get("data_freshness_warning"),
source.get("snapshot_price_fallback_used"),
),
"signal": _first_value(
source.get("signal_display"),
source.get("signal_description"),
source.get("signal_message"),
),
"status": _first_value(
source.get("status_display"),
source.get("status_description"),
source.get("market_status"),
source.get("canary_status"),
),
"target_mode": target_mode,
"target_weights": normalized_weights,
"target_values": normalized_values,
"indicators": indicators,
}
return {key: _json_safe(value) for key, value in snapshot.items()}
19 changes: 17 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,28 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
)
if not validation_only:
publish_strategy_plugin_alerts(strategy_plugin_signals, report=report)
run_rebalance_cycle(
cycle_result = run_rebalance_cycle(
runtime=composer.build_rebalance_runtime(
silent_cycle_notifications=validation_only,
),
config=composer.build_rebalance_config(),
)
finalize_runtime_report(report, status="ok")
signal_snapshot = {}
if cycle_result is not None:
execution = dict(getattr(cycle_result, "execution", {}) or {})
signal_snapshot = dict(execution.get("signal_snapshot") or {})
if signal_snapshot:
reporting_adapters.log_event(
log_context,
"strategy_signal_snapshot",
message="Strategy signal snapshot",
**signal_snapshot,
)
finalize_runtime_report(
report,
status="ok",
diagnostics={"signal_snapshot": signal_snapshot} if signal_snapshot else None,
)
reporting_adapters.log_event(
log_context,
"strategy_cycle_completed",
Expand Down
41 changes: 41 additions & 0 deletions notifications/renderers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from collections.abc import Mapping
import re

from notifications.events import RenderedNotification
Expand Down Expand Up @@ -123,6 +124,42 @@ def _append_timing_lines(lines, *, execution, translator) -> None:
lines.extend(_build_timing_audit_lines(execution, translator=translator))


def _format_signal_snapshot_line(snapshot, *, translator) -> str:
if not isinstance(snapshot, Mapping):
return ""
market_date = str(snapshot.get("market_date") or snapshot.get("signal_as_of") or "").strip()
source = str(snapshot.get("latest_price_source") or "").strip()
overlay = snapshot.get("quote_overlay_used")
warning = snapshot.get("data_freshness_warning")
if not market_date and not source and overlay is None and warning in (None, "", False):
return ""
if _translator_uses_zh(translator):
overlay_text = "是" if overlay is True else "否" if overlay is False else "未知"
parts = [
f"日期 {market_date or '未知'}",
f"数据源 {source or '未知'}",
f"报价覆盖 {overlay_text}",
]
if warning not in (None, "", False):
parts.append(f"提示 {warning}")
return "🧾 信号快照: " + " | ".join(parts)
overlay_text = "yes" if overlay is True else "no" if overlay is False else "unknown"
parts = [
f"date {market_date or 'unknown'}",
f"source {source or 'unknown'}",
f"quote overlay {overlay_text}",
]
if warning not in (None, "", False):
parts.append(f"warning {warning}")
return "🧾 Signal snapshot: " + " | ".join(parts)


def _append_signal_snapshot_line(lines, *, execution, translator) -> None:
line = _format_signal_snapshot_line(execution.get("signal_snapshot"), translator=translator)
if line:
lines.append(line)


def _append_status_lines(lines, *, execution, translator, signal_key):
status_display = _localize_notification_text(execution.get("status_display"), translator=translator)
if status_display:
Expand Down Expand Up @@ -199,6 +236,7 @@ def render_rebalance_notification(
_append_extra_notification_lines(detailed_lines, extra_notification_lines)
_append_dashboard_block(detailed_lines, execution=execution, separator=separator)
_append_timing_lines(detailed_lines, execution=execution, translator=translator)
_append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator)
_append_status_lines(
detailed_lines,
execution=execution,
Expand All @@ -214,6 +252,7 @@ def render_rebalance_notification(
_append_extra_notification_lines(compact_lines, extra_notification_lines)
_append_dashboard_block(compact_lines, execution=execution, separator=separator)
_append_timing_lines(compact_lines, execution=execution, translator=translator)
_append_signal_snapshot_line(compact_lines, execution=execution, translator=translator)
_append_compact_status_lines(
compact_lines,
execution=execution,
Expand Down Expand Up @@ -245,6 +284,7 @@ def render_heartbeat_notification(
_append_extra_notification_lines(detailed_lines, extra_notification_lines)
_append_dashboard_block(detailed_lines, execution=execution, separator=separator)
_append_timing_lines(detailed_lines, execution=execution, translator=translator)
_append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator)
detailed_lines.append(separator)
_append_status_lines(
detailed_lines,
Expand Down Expand Up @@ -279,6 +319,7 @@ def render_heartbeat_notification(
_append_extra_notification_lines(compact_lines, extra_notification_lines)
_append_dashboard_block(compact_lines, execution=execution, separator=separator)
_append_timing_lines(compact_lines, execution=execution, translator=translator)
_append_signal_snapshot_line(compact_lines, execution=execution, translator=translator)
_append_compact_status_lines(
compact_lines,
execution=execution,
Expand Down