Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions application/rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from collections.abc import Mapping
from datetime import datetime

from notifications.events import NotificationPublisher, RenderedNotification

_ZH_REASON_REPLACEMENTS = (
("feature snapshot guard blocked execution", "特征快照校验阻止执行"),
("feature snapshot required", "需要特征快照"),
Expand Down Expand Up @@ -303,6 +305,10 @@ def run_strategy(
sleeper=_noop_sleep,
):
print(with_prefix(f"[{datetime.now()}] Starting strategy..."), flush=True)
notification_publisher = NotificationPublisher(
log_message=lambda message: print(with_prefix(message), flush=True),
send_message=send_tg_message,
)

token = refresh_token_if_needed(
fetch_token_from_secret(project_id, secret_name),
Expand Down Expand Up @@ -636,8 +642,12 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
)
compact_lines.extend([separator, translator("order_logs_title"), formatted_logs])
compact_tg_message = "\n".join(compact_lines)
print(with_prefix(detailed_tg_message), flush=True)
send_tg_message(compact_tg_message)
notification_publisher.publish(
RenderedNotification(
detailed_text=detailed_tg_message,
compact_text=compact_tg_message,
)
)
else:
no_trade_lines = [
translator("heartbeat_title"),
Expand Down Expand Up @@ -703,8 +713,12 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
compact_no_trade_lines.extend([separator, translator("notes_title")])
compact_no_trade_lines.extend(f" - {log}" for log in note_logs)
compact_no_trade_message = "\n".join(compact_no_trade_lines)
print(with_prefix(no_trade_message), flush=True)
send_tg_message(compact_no_trade_message)
notification_publisher.publish(
RenderedNotification(
detailed_text=no_trade_message,
compact_text=compact_no_trade_message,
)
)


def safe_quote_last_price(quote_context, symbol, *, fetch_last_price, notify_issue):
Expand Down
21 changes: 19 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
send_order_status_message as notifications_send_order_status_message,
submit_order_with_alert as notifications_submit_order_with_alert,
)
from notifications.events import NotificationPublisher, RenderedNotification
from notifications.telegram import (
build_issue_notifier,
build_prefixer,
Expand Down Expand Up @@ -131,6 +132,20 @@ def with_prefix(message: str) -> str:
def send_tg_message(message):
return build_sender(TG_TOKEN, TG_CHAT_ID, with_prefix_fn=with_prefix)(message)


def publish_notification(*, detailed_text, compact_text):
publisher = NotificationPublisher(
log_message=lambda message: print(with_prefix(message), flush=True),
send_message=send_tg_message,
)
publisher.publish(
RenderedNotification(
detailed_text=detailed_text,
compact_text=compact_text,
)
)


def notify_issue(title, detail):
return build_issue_notifier(with_prefix_fn=with_prefix, send_tg_message_fn=send_tg_message)(title, detail)

Expand Down Expand Up @@ -447,8 +462,10 @@ def run_strategy():
error_message=str(exc),
)
err = traceback.format_exc()
print(with_prefix(f"Strategy error:\n{err}"), flush=True)
send_tg_message(f"{t('error_title')}\n{err}")
publish_notification(
detailed_text=f"Strategy error:\n{err}",
compact_text=f"{t('error_title')}\n{err}",
)
finally:
try:
report_path = persist_execution_report(report)
Expand Down
44 changes: 44 additions & 0 deletions notifications/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Notification event envelope and delivery helpers."""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass


@dataclass(frozen=True)
class RenderedNotification:
"""Rendered notification payload split by sink."""

detailed_text: str
compact_text: str


@dataclass(frozen=True)
class NotificationPublisher:
"""Publish rendered notifications to the configured sinks."""

log_message: Callable[[str], None]
send_message: Callable[[str], None]

def publish(self, notification: RenderedNotification) -> None:
publish_rendered_notification(
notification,
log_message=self.log_message,
send_message=self.send_message,
)


def publish_rendered_notification(
notification: RenderedNotification,
*,
log_message: Callable[[str], None],
send_message: Callable[[str], None],
) -> None:
"""Write the detailed log copy and send the compact user notification."""
detailed = str(notification.detailed_text or "").strip()
compact = str(notification.compact_text or "").strip()
if detailed:
log_message(detailed)
if compact:
send_message(compact)
15 changes: 13 additions & 2 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import requests

from notifications.events import NotificationPublisher, RenderedNotification


SIGNAL_ICONS = {
"hold": "💎",
Expand Down Expand Up @@ -223,9 +225,18 @@ def send_tg_message(message):


def build_issue_notifier(*, with_prefix_fn, send_tg_message_fn):
publisher = NotificationPublisher(
log_message=lambda message: print(with_prefix_fn(message), flush=True),
send_message=send_tg_message_fn,
)

def notify_issue(title, detail):
message = f"{title}\n{detail}"
print(with_prefix_fn(message), flush=True)
send_tg_message_fn(message)
publisher.publish(
RenderedNotification(
detailed_text=message,
compact_text=message,
)
)

return notify_issue
60 changes: 60 additions & 0 deletions tests/test_notification_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest

from notifications.events import (
NotificationPublisher,
RenderedNotification,
publish_rendered_notification,
)


class NotificationEventsTests(unittest.TestCase):
def test_publish_rendered_notification_splits_log_and_send_sinks(self):
logged = []
sent = []

publish_rendered_notification(
RenderedNotification(
detailed_text="detailed copy",
compact_text="compact copy",
),
log_message=logged.append,
send_message=sent.append,
)

self.assertEqual(logged, ["detailed copy"])
self.assertEqual(sent, ["compact copy"])

def test_publish_rendered_notification_skips_empty_sinks(self):
logged = []
sent = []

publish_rendered_notification(
RenderedNotification(detailed_text=" ", compact_text=""),
log_message=logged.append,
send_message=sent.append,
)

self.assertEqual(logged, [])
self.assertEqual(sent, [])

def test_notification_publisher_uses_configured_sinks(self):
logged = []
sent = []
publisher = NotificationPublisher(
log_message=logged.append,
send_message=sent.append,
)

publisher.publish(
RenderedNotification(
detailed_text="detailed copy",
compact_text="compact copy",
)
)

self.assertEqual(logged, ["detailed copy"])
self.assertEqual(sent, ["compact copy"])


if __name__ == "__main__":
unittest.main()