Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class ExecutionCycleResult:
skip_logs: tuple[str, ...]
note_logs: tuple[str, ...]
action_done: bool
dry_run_orders: tuple[dict, ...] = ()
quote_snapshots: tuple[dict, ...] = ()


DEFAULT_SAFE_HAVEN_CASH_SUBSTITUTE_THRESHOLD_USD = 1000.0
Expand All @@ -257,6 +259,24 @@ def _noop_sleep(_seconds):
return None


def _coerce_order_quantity(value):
try:
return float(str(value).replace(",", "").strip())
except (TypeError, ValueError):
return value


def _serialize_quote_snapshot(snapshot) -> dict:
return {
"symbol": str(getattr(snapshot, "symbol", "") or "").strip().upper(),
"as_of": str(getattr(snapshot, "as_of", "") or ""),
"last_price": float(getattr(snapshot, "last_price", 0.0) or 0.0),
"bid_price": getattr(snapshot, "bid_price", None),
"ask_price": getattr(snapshot, "ask_price", None),
"currency": str(getattr(snapshot, "currency", "") or "").strip(),
}


def _safe_haven_cash_symbols(*, portfolio: dict, allocation: dict) -> tuple[str, ...]:
symbols: list[str] = []
for symbol in allocation.get("safe_haven_symbols", ()):
Expand Down Expand Up @@ -414,9 +434,12 @@ def _sell_order_quantity(
)


def safe_quote_last_price(symbol, *, market_data_port, notify_issue):
def safe_quote_last_price(symbol, *, market_data_port, notify_issue, quote_recorder=None):
try:
return float(market_data_port.get_quote(symbol).last_price)
snapshot = market_data_port.get_quote(symbol)
if quote_recorder is not None:
quote_recorder(snapshot)
return float(snapshot.last_price)
except Exception as exc:
notify_issue("Quote failed", f"Symbol: {symbol}\n{exc}")
return None
Expand Down Expand Up @@ -565,6 +588,8 @@ def execute_rebalance_cycle(
logs: list[str] = []
skip_logs: list[str] = []
note_logs: list[str] = []
dry_run_orders: list[dict] = []
quote_snapshots_by_symbol: dict[str, dict] = {}
small_account_cash_note_keys: set[str] = set()
action_done = False
sell_submitted = False
Expand All @@ -576,6 +601,12 @@ def execute_rebalance_cycle(
def market_symbol(symbol):
return _market_symbol(symbol, symbol_suffix=symbol_suffix)

def record_quote_snapshot(snapshot) -> None:
payload = _serialize_quote_snapshot(snapshot)
symbol = payload.get("symbol")
if symbol:
quote_snapshots_by_symbol[symbol] = payload

strategy_assets = tuple(allocation["strategy_symbols"])
market_values = dict(portfolio["market_values"])
quantities = dict(portfolio["quantities"])
Expand Down Expand Up @@ -678,6 +709,18 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
price_text = f"${price:.2f}" if price is not None else translator("order_type_market")
side_key = "side_buy" if str(side).lower() == "buy" else "side_sell"
order_type_key = "order_type_limit" if order_type == "limit" else "order_type_market"
order_payload = {
"symbol": str(symbol or "").strip().upper(),
"side": str(side or "").strip().lower(),
"quantity": _coerce_order_quantity(quantity),
"order_type": str(order_type or "").strip().lower(),
"status": "dry_run",
}
if price is not None:
order_payload["price"] = round(float(price), 4)
if order_type == "limit":
order_payload["limit_price"] = round(float(price), 4)
dry_run_orders.append(order_payload)
message = translator(
"dry_run_order",
side=translator(side_key),
Expand All @@ -697,6 +740,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
market_symbol(symbol),
market_data_port=market_data_port,
notify_issue=notify_issue,
quote_recorder=record_quote_snapshot,
)
if price is None:
continue
Expand Down Expand Up @@ -786,6 +830,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
market_symbol(cash_sweep_symbol),
market_data_port=market_data_port,
notify_issue=notify_issue,
quote_recorder=record_quote_snapshot,
)
if sweep_price is not None and sweep_price > 0.0:
funding_needs = []
Expand All @@ -794,6 +839,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
market_symbol(buy_symbol),
market_data_port=market_data_port,
notify_issue=notify_issue,
quote_recorder=record_quote_snapshot,
)
if buy_price is None:
continue
Expand Down Expand Up @@ -937,6 +983,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
market_symbol(symbol),
market_data_port=market_data_port,
notify_issue=notify_issue,
quote_recorder=record_quote_snapshot,
)
if price is None:
continue
Expand Down Expand Up @@ -1057,6 +1104,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
market_symbol(cash_sweep_symbol),
market_data_port=market_data_port,
notify_issue=notify_issue,
quote_recorder=record_quote_snapshot,
)
if cash_sweep_price is not None and cash_sweep_price > 0.0 and investable_cash > cash_sweep_price * 2:
substitution_threshold = max(
Expand Down Expand Up @@ -1110,4 +1158,6 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
skip_logs=tuple(skip_logs),
note_logs=tuple(note_logs),
action_done=action_done,
dry_run_orders=tuple(dry_run_orders),
quote_snapshots=tuple(quote_snapshots_by_symbol.values()),
)
12 changes: 9 additions & 3 deletions application/runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def send_tg_message(self, message: str) -> None:
)
sender(message)

def build_notification_adapters(self):
def build_notification_adapters(self, *, delivery_events: list[dict[str, Any]] | None = None):
return self.notification_adapter_builder(
with_prefix=self.with_prefix,
send_message=self.send_tg_message,
Expand All @@ -109,6 +109,7 @@ def build_notification_adapters(self):
order_poll_max_attempts=self.order_poll_max_attempts,
sleeper=self.sleeper,
log_message=lambda message: self.printer(self.with_prefix(message), flush=True),
delivery_events=delivery_events,
)

def build_reporting_adapters(self):
Expand Down Expand Up @@ -152,8 +153,13 @@ def build_reporting_adapters(self):
printer=lambda line: self.printer(line, flush=True),
)

def build_rebalance_runtime(self, *, silent_cycle_notifications: bool = False) -> LongBridgeRebalanceRuntime:
notification_adapters = self.build_notification_adapters()
def build_rebalance_runtime(
self,
*,
silent_cycle_notifications: bool = False,
notification_delivery_events: list[dict[str, Any]] | None = None,
) -> LongBridgeRebalanceRuntime:
notification_adapters = self.build_notification_adapters(delivery_events=notification_delivery_events)
notifications = (
CallableNotificationPort(lambda _message: None)
if silent_cycle_notifications
Expand Down
26 changes: 22 additions & 4 deletions application/runtime_notification_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import hashlib
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
Expand All @@ -24,6 +25,7 @@ class LongBridgeNotificationAdapters:
notify_issue: Callable[[str, str], None]
post_submit_order: Callable[[Any, Any, Any], None]
cycle_publisher: NotificationPublisher
delivery_events: list[dict[str, Any]]

def publish_cycle_notification(self, *, detailed_text: str, compact_text: str) -> None:
self.cycle_publisher.publish(
Expand All @@ -44,18 +46,33 @@ def build_runtime_notification_adapters(
order_poll_max_attempts: int,
sleeper: Callable[[float], None],
log_message: Callable[[str], None] | None = None,
delivery_events: list[dict[str, Any]] | None = None,
) -> LongBridgeNotificationAdapters:
recorded_delivery_events = delivery_events if delivery_events is not None else []

def send_recorded_message(message: str) -> None:
send_message(message)
compact = str(message or "")
recorded_delivery_events.append(
{
"sink": "telegram",
"delivery_status": "sent",
Comment on lines +54 to +59

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Don't mark no-op Telegram sends as sent

When the default LongBridge sender is used with missing Telegram credentials (or when it swallows a post failure), send_message returns normally without delivering anything (notifications/telegram.py returns early for empty token/chat and catches exceptions). Because this wrapper unconditionally appends delivery_status: "sent" after any normal return, a dry-run report can claim validation_status: passed and include a sent delivery event even though no notification left the process; the delivery log should only record sent after the sender can confirm delivery, or should distinguish attempted/skipped/failed sends.

Useful? React with 👍 / 👎.

"compact_text_sha256": hashlib.sha256(compact.encode("utf-8")).hexdigest(),
"compact_text_length": len(compact),
}
)

cycle_publisher = NotificationPublisher(
log_message=log_message or (lambda message: print(with_prefix(message), flush=True)),
send_message=send_message,
send_message=send_recorded_message,
)
notify_issue = build_issue_notifier(
with_prefix_fn=with_prefix,
send_tg_message_fn=send_message,
send_tg_message_fn=send_recorded_message,
)
order_event_publisher = NotificationPublisher(
log_message=lambda _message: None,
send_message=send_message,
send_message=send_recorded_message,
)

def publish_order_event(event: OrderLifecycleEvent) -> None:
Expand All @@ -81,8 +98,9 @@ def post_submit_order(trade_context, order_intent, report) -> None:
)

return LongBridgeNotificationAdapters(
notification_port=CallableNotificationPort(send_message),
notification_port=CallableNotificationPort(send_recorded_message),
notify_issue=notify_issue,
post_submit_order=post_submit_order,
cycle_publisher=cycle_publisher,
delivery_events=recorded_delivery_events,
)
96 changes: 93 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,71 @@ def _split_env_list(value: str | None) -> tuple[str, ...]:
)


def _summarize_cycle_result_for_report(cycle_result, *, dry_run: bool) -> dict:
if cycle_result is None:
return {
"action_done": False,
"order_events_count": 0,
"orders_previewed_count": 0,
"orders_skipped_count": 0,
"notes_count": 0,
"dry_run_order_preview_available": False,
}
logs = tuple(getattr(cycle_result, "logs", ()) or ())
skip_logs = tuple(getattr(cycle_result, "skip_logs", ()) or ())
note_logs = tuple(getattr(cycle_result, "note_logs", ()) or ())
dry_run_orders = tuple(getattr(cycle_result, "dry_run_orders", ()) or ())
quote_snapshots = tuple(getattr(cycle_result, "quote_snapshots", ()) or ())
order_events_count = len(logs)
orders_previewed_count = len(dry_run_orders) if dry_run_orders else (order_events_count if dry_run else 0)
summary = {
"action_done": bool(getattr(cycle_result, "action_done", False)),
"order_events_count": order_events_count,
"orders_previewed_count": orders_previewed_count,
"orders_skipped_count": len(skip_logs),
"notes_count": len(note_logs),
"dry_run_order_preview_available": bool(dry_run and orders_previewed_count > 0),
}
if dry_run_orders:
summary["orders_previewed"] = [dict(order) for order in dry_run_orders]
if quote_snapshots:
summary["quote_snapshot"] = {
"quotes": [dict(snapshot) for snapshot in quote_snapshots],
}
return summary


def _build_notification_delivery_log_for_report(
*,
platform: str,
strategy_profile: str,
run_id: str,
dry_run: bool,
orders_previewed_count: int,
delivery_events: list[dict],
) -> dict:
events = [dict(event) for event in delivery_events if dict(event).get("delivery_status") == "sent"]
if not dry_run or orders_previewed_count <= 0 or not events:
return {}
return {
"notification_schema_version": "hk_live_enablement_notification.v1",
"notification_event_type": "hk_snapshot_live_enablement_dry_run",
"notification_correlation_id": str(run_id or ""),
"locales": ["en", "zh-Hans"],
"profile": str(strategy_profile or ""),
"platform": str(platform or ""),
"validation_status": "passed",
"orders_previewed": int(orders_previewed_count),
"delivery_events": events,
"notification_contains_profile": True,
"notification_contains_platform": True,
"notification_contains_validation_status": True,
"notification_contains_order_preview_summary": True,
"notification_redacts_sensitive_fields": True,
"redaction_policy": "raw notification text is not persisted; only sha256 and length are recorded",
}


signal_text = build_signal_text(t)
strategy_display_name = build_strategy_display_name(t)(
STRATEGY_PROFILE,
Expand Down Expand Up @@ -421,16 +486,40 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
)
if not validation_only:
publish_strategy_plugin_alerts(strategy_plugin_signals, report=report)
cycle_result = run_rebalance_cycle(
runtime=composer.build_rebalance_runtime(
notification_delivery_events: list[dict] = []
try:
rebalance_runtime = composer.build_rebalance_runtime(
silent_cycle_notifications=validation_only,
notification_delivery_events=notification_delivery_events,
)
except TypeError as exc:
if "notification_delivery_events" not in str(exc):
raise
rebalance_runtime = composer.build_rebalance_runtime(
silent_cycle_notifications=validation_only,
),
)
cycle_result = run_rebalance_cycle(
runtime=rebalance_runtime,
config=composer.build_rebalance_config(strategy_plugin_signals=strategy_plugin_signals),
)
signal_snapshot = {}
if cycle_result is not None:
execution = dict(getattr(cycle_result, "execution", {}) or {})
signal_snapshot = dict(execution.get("signal_snapshot") or {})
execution_summary = _summarize_cycle_result_for_report(
cycle_result,
dry_run=bool(report.get("dry_run")),
)
notification_delivery_log = _build_notification_delivery_log_for_report(
platform="longbridge",
strategy_profile=STRATEGY_PROFILE,
run_id=str(report.get("run_id") or ""),
dry_run=bool(report.get("dry_run")),
orders_previewed_count=int(execution_summary.get("orders_previewed_count") or 0),
delivery_events=notification_delivery_events,
)
if notification_delivery_log:
execution_summary["notification_delivery_log"] = notification_delivery_log
if signal_snapshot:
reporting_adapters.log_event(
log_context,
Expand All @@ -441,6 +530,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
finalize_runtime_report(
report,
status="ok",
summary=execution_summary,
diagnostics={"signal_snapshot": signal_snapshot} if signal_snapshot else None,
)
reporting_adapters.log_event(
Expand Down
5 changes: 5 additions & 0 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ def estimate_max_purchase_quantity(_trade_context, symbol, **_kwargs):
self.assertIn("02800.HK", joined_logs)
self.assertIn("03033.HK", joined_logs)
self.assertNotIn(".US", joined_logs)
self.assertGreaterEqual(len(result.dry_run_orders), 1)
self.assertTrue(all(order["status"] == "dry_run" for order in result.dry_run_orders))
self.assertTrue(all(order["symbol"].endswith(".HK") for order in result.dry_run_orders))
self.assertTrue(result.quote_snapshots)
self.assertTrue(all(snapshot["symbol"].endswith(".HK") for snapshot in result.quote_snapshots))

def test_run_strategy_prefers_portfolio_port_runtime_path(self):
sent_messages = []
Expand Down
Loading