From 5141311041d322552a97170b12a9cfb3c6c0113f Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 26 May 2026 03:58:16 +0800 Subject: [PATCH] Add strategy plugin Telegram alert channel --- README.md | 4 +- README.zh-CN.md | 4 +- pyproject.toml | 2 +- src/quant_platform_kit/common/__init__.py | 2 + .../common/strategy_plugins.py | 2 + .../notifications/__init__.py | 16 + .../notifications/strategy_plugin_alerts.py | 50 ++- .../notifications/strategy_plugin_telegram.py | 408 ++++++++++++++++++ .../notifications/telegram.py | 141 +++++- .../test_strategy_plugin_alert_dispatcher.py | 61 ++- ..._strategy_plugin_telegram_notifications.py | 170 ++++++++ tests/test_strategy_plugins.py | 2 + 12 files changed, 831 insertions(+), 31 deletions(-) create mode 100644 src/quant_platform_kit/notifications/strategy_plugin_telegram.py create mode 100644 tests/test_strategy_plugin_telegram_notifications.py diff --git a/README.md b/README.md index b320520..67c1638 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ It contains: - narrow ports for market data, portfolio snapshots, order execution, notifications, and state - reusable broker adapter utilities - strategy loading, strategy-plugin, and alert-message contracts -- optional strategy-plugin alert channels for email, SMS, and push providers +- optional strategy-plugin alert channels for email, SMS, push, and Telegram providers - synthetic-data tests for public behavior It does not contain private runtime wiring or generated strategy outputs. @@ -49,7 +49,7 @@ Strategy plugins are sidecar artifacts that platform repositories may read when Generated plugin artifacts and platform-specific notification routing stay with the producing pipeline or consuming platform repository. Tests in this repository use synthetic price history and synthetic payloads only. -Plugin alert delivery is provider-neutral at the platform boundary. Platform repositories pass runtime settings into `publish_strategy_plugin_alerts`; this repository handles configured `email`, `sms`, and `push` channels without coupling plugin logic to a broker platform. +Plugin alert delivery is provider-neutral at the platform boundary. Platform repositories pass runtime settings into `publish_strategy_plugin_alerts`; this repository handles configured `email`, `sms`, `push`, and `telegram` channels without coupling plugin logic to a broker platform. ## Package Layout diff --git a/README.zh-CN.md b/README.zh-CN.md index 6ec7579..aae02a7 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -14,7 +14,7 @@ - 市场数据、持仓快照、订单执行、通知、状态存储等窄接口 - 可复用的券商适配工具 - 策略加载、策略插件、告警消息契约 -- 可选的策略插件 email、SMS 和 push 告警通道 +- 可选的策略插件 email、SMS、push 和 Telegram 告警通道 - 使用合成数据的公开测试 它不包含私有运行时接线和生成的策略输出。 @@ -49,7 +49,7 @@ QuantPlatformKit 生成的插件 artifact 和平台专属通知路由由生成它的 pipeline 或消费它的平台仓库管理。这个仓库的测试只使用合成价格历史和合成 payload。 -插件告警发送在平台边界保持 provider-neutral。平台仓库只把 runtime settings 传入 `publish_strategy_plugin_alerts`;这个仓库负责按配置发送 `email`、`sms` 和 `push`,不让插件逻辑耦合某个券商平台。 +插件告警发送在平台边界保持 provider-neutral。平台仓库只把 runtime settings 传入 `publish_strategy_plugin_alerts`;这个仓库负责按配置发送 `email`、`sms`、`push` 和 `telegram`,不让插件逻辑耦合某个券商平台。 ## 目录结构 diff --git a/pyproject.toml b/pyproject.toml index e0eadb0..f5ba67b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "quant-platform-kit" -version = "0.7.29" +version = "0.7.30" description = "Shared broker adapters, domain models, execution ports, and notification utilities for QuantStrategyLab strategies." readme = "README.md" requires-python = ">=3.9" diff --git a/src/quant_platform_kit/common/__init__.py b/src/quant_platform_kit/common/__init__.py index a27819f..0ac16fd 100644 --- a/src/quant_platform_kit/common/__init__.py +++ b/src/quant_platform_kit/common/__init__.py @@ -46,6 +46,7 @@ STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, + STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM, STRATEGY_PLUGIN_ALERT_ACTIONS, STRATEGY_PLUGIN_NON_ALERT_ROUTES, SUPPORTED_STRATEGY_PLUGIN_MODES, @@ -89,6 +90,7 @@ "STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL", "STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH", "STRATEGY_PLUGIN_ALERT_CHANNEL_SMS", + "STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM", "STRATEGY_PLUGIN_ALERT_ACTIONS", "STRATEGY_PLUGIN_NON_ALERT_ROUTES", "SUPPORTED_STRATEGY_PLUGIN_MODES", diff --git a/src/quant_platform_kit/common/strategy_plugins.py b/src/quant_platform_kit/common/strategy_plugins.py index 8aa5c90..d9372b1 100644 --- a/src/quant_platform_kit/common/strategy_plugins.py +++ b/src/quant_platform_kit/common/strategy_plugins.py @@ -15,6 +15,7 @@ STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL = "email" STRATEGY_PLUGIN_ALERT_CHANNEL_SMS = "sms" STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH = "push" +STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM = "telegram" SUPPORTED_STRATEGY_PLUGIN_MODES = frozenset({PLUGIN_MODE_SHADOW}) DEFAULT_PLUGIN_ARTIFACT_CACHE_DIR = Path(tempfile.gettempdir()) / "quant_strategy_plugin_artifacts" STRATEGY_PLUGIN_NON_ALERT_ROUTES = frozenset({"no_action"}) @@ -73,6 +74,7 @@ def supports_strategy(self, strategy: str) -> bool: STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM, ), ) } diff --git a/src/quant_platform_kit/notifications/__init__.py b/src/quant_platform_kit/notifications/__init__.py index a10a06b..f3046b9 100644 --- a/src/quant_platform_kit/notifications/__init__.py +++ b/src/quant_platform_kit/notifications/__init__.py @@ -4,6 +4,7 @@ from .events import NotificationPublisher, RenderedNotification, publish_rendered_notification from .push import parse_push_recipients, send_ntfy_push, send_pushover_push, send_strategy_plugin_push from .sms import normalize_sms_recipient, parse_sms_recipients, send_twilio_sms +from .telegram import parse_telegram_chat_ids, send_strategy_plugin_telegram, send_telegram_message from .strategy_plugin_alerts import ( StrategyPluginAlertChannelStores, StrategyPluginAlertPublishResult, @@ -32,6 +33,13 @@ StrategyPluginPushSettings, publish_strategy_plugin_push_alerts, ) +from .strategy_plugin_telegram import ( + StrategyPluginTelegramAlertDelivery, + StrategyPluginTelegramAlertMarkerStore, + StrategyPluginTelegramAlertPublishResult, + StrategyPluginTelegramSettings, + publish_strategy_plugin_telegram_alerts, +) __all__ = [ "NotificationPublisher", @@ -47,6 +55,10 @@ "StrategyPluginPushAlertMarkerStore", "StrategyPluginPushAlertPublishResult", "StrategyPluginPushSettings", + "StrategyPluginTelegramAlertDelivery", + "StrategyPluginTelegramAlertMarkerStore", + "StrategyPluginTelegramAlertPublishResult", + "StrategyPluginTelegramSettings", "StrategyPluginSmsAlertDelivery", "StrategyPluginSmsAlertMarkerStore", "StrategyPluginSmsAlertPublishResult", @@ -56,14 +68,18 @@ "parse_email_recipients", "parse_push_recipients", "parse_sms_recipients", + "parse_telegram_chat_ids", "publish_rendered_notification", "publish_strategy_plugin_alerts", "publish_strategy_plugin_email_alerts", "publish_strategy_plugin_push_alerts", "publish_strategy_plugin_sms_alerts", + "publish_strategy_plugin_telegram_alerts", "send_ntfy_push", "send_pushover_push", "send_smtp_email", "send_strategy_plugin_push", + "send_strategy_plugin_telegram", + "send_telegram_message", "send_twilio_sms", ] diff --git a/src/quant_platform_kit/notifications/strategy_plugin_alerts.py b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py index e22a538..cf0a4b8 100644 --- a/src/quant_platform_kit/notifications/strategy_plugin_alerts.py +++ b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py @@ -11,6 +11,7 @@ from .email import send_smtp_email from .push import send_strategy_plugin_push from .sms import send_twilio_sms +from .telegram import send_strategy_plugin_telegram from .strategy_plugin_email import ( StrategyPluginEmailAlertMarkerStore, StrategyPluginEmailAlertPublishResult, @@ -30,12 +31,19 @@ StrategyPluginPushSettings, publish_strategy_plugin_push_alerts, ) +from .strategy_plugin_telegram import ( + StrategyPluginTelegramAlertMarkerStore, + StrategyPluginTelegramAlertPublishResult, + StrategyPluginTelegramSettings, + publish_strategy_plugin_telegram_alerts, +) _DEFAULT_ALERT_STATE_DIR = "/tmp/quant_strategy_plugin_alerts" _CHANNEL_EMAIL = "email" _CHANNEL_SMS = "sms" _CHANNEL_PUSH = "push" -_SUPPORTED_CHANNELS = frozenset({_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH}) +_CHANNEL_TELEGRAM = "telegram" +_SUPPORTED_CHANNELS = frozenset({_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH, _CHANNEL_TELEGRAM}) @dataclass(frozen=True) @@ -45,6 +53,7 @@ class StrategyPluginAlertChannelStores: email: StrategyPluginEmailAlertMarkerStore | object | None = None sms: StrategyPluginSmsAlertMarkerStore | object | None = None push: StrategyPluginPushAlertMarkerStore | object | None = None + telegram: StrategyPluginTelegramAlertMarkerStore | object | None = None @classmethod def from_mapping( @@ -57,6 +66,7 @@ def from_mapping( email=value.get(_CHANNEL_EMAIL), sms=value.get(_CHANNEL_SMS), push=value.get(_CHANNEL_PUSH), + telegram=value.get(_CHANNEL_TELEGRAM), ) @@ -107,6 +117,12 @@ def build_channel_stores(self) -> StrategyPluginAlertChannelStores: gcp_project_id=self.gcp_project_id, client_factory=self.client_factory, ), + telegram=StrategyPluginTelegramAlertMarkerStore( + local_dir=self.local_dir, + gcs_prefix_uri=self.gcs_prefix_uri, + gcp_project_id=self.gcp_project_id, + client_factory=self.client_factory, + ), ) @@ -117,6 +133,7 @@ class StrategyPluginAlertPublishResult: email_result: StrategyPluginEmailAlertPublishResult | None = None sms_result: StrategyPluginSmsAlertPublishResult | None = None push_result: StrategyPluginPushAlertPublishResult | None = None + telegram_result: StrategyPluginTelegramAlertPublishResult | None = None @property def attempted_count(self) -> int: @@ -147,6 +164,8 @@ def to_report_fields(self) -> dict[str, Any]: fields.update(self.sms_result.to_report_fields()) if self.push_result is not None: fields.update(self.push_result.to_report_fields()) + if self.telegram_result is not None: + fields.update(self.telegram_result.to_report_fields()) return fields def to_summary_fields(self) -> dict[str, int]: @@ -159,6 +178,8 @@ def to_summary_fields(self) -> dict[str, int]: fields["strategy_plugin_alert_sms_sent_count"] = self.sms_result.sent_count if self.push_result is not None: fields["strategy_plugin_alert_push_sent_count"] = self.push_result.sent_count + if self.telegram_result is not None: + fields["strategy_plugin_alert_telegram_sent_count"] = self.telegram_result.sent_count return fields def attach_to_report(self, report: dict[str, Any]) -> None: @@ -170,12 +191,18 @@ def _results( ) -> tuple[ StrategyPluginEmailAlertPublishResult | StrategyPluginSmsAlertPublishResult - | StrategyPluginPushAlertPublishResult, + | StrategyPluginPushAlertPublishResult + | StrategyPluginTelegramAlertPublishResult, ..., ]: return tuple( result - for result in (self.email_result, self.sms_result, self.push_result) + for result in ( + self.email_result, + self.sms_result, + self.push_result, + self.telegram_result, + ) if result is not None ) @@ -187,6 +214,7 @@ def publish_strategy_plugin_alerts( StrategyPluginEmailSettings | StrategyPluginSmsSettings | StrategyPluginPushSettings + | StrategyPluginTelegramSettings | object ), translator: Callable[..., str] | None = None, @@ -198,6 +226,7 @@ def publish_strategy_plugin_alerts( send_email_notification: Callable[..., bool] = send_smtp_email, send_sms_notification: Callable[..., bool] = send_twilio_sms, send_push_notification: Callable[..., bool] = send_strategy_plugin_push, + send_telegram_notification: Callable[..., bool] = send_strategy_plugin_telegram, log_message: Callable[..., Any] = print, ) -> StrategyPluginAlertPublishResult: """Publish strategy plugin alerts through the configured notification channels.""" @@ -207,6 +236,7 @@ def publish_strategy_plugin_alerts( email_result = None sms_result = None push_result = None + telegram_result = None if _CHANNEL_EMAIL in selected_channels: email_result = publish_strategy_plugin_email_alerts( signals, @@ -240,10 +270,22 @@ def publish_strategy_plugin_alerts( send_notification=send_push_notification, log_message=log_message, ) + if _CHANNEL_TELEGRAM in selected_channels: + telegram_result = publish_strategy_plugin_telegram_alerts( + signals, + telegram_settings=notification_settings, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_store=stores.telegram, + send_notification=send_telegram_notification, + log_message=log_message, + ) return StrategyPluginAlertPublishResult( email_result=email_result, sms_result=sms_result, push_result=push_result, + telegram_result=telegram_result, ) @@ -287,7 +329,7 @@ def _resolve_channels( if raw_channels is None: raw_channels = _get_value(notification_settings, "crisis_alert_channels", None) if raw_channels in (None, "", (), []): - raw_channels = (_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH) + raw_channels = (_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH, _CHANNEL_TELEGRAM) return _normalize_channels(raw_channels) diff --git a/src/quant_platform_kit/notifications/strategy_plugin_telegram.py b/src/quant_platform_kit/notifications/strategy_plugin_telegram.py new file mode 100644 index 0000000..67559c3 --- /dev/null +++ b/src/quant_platform_kit/notifications/strategy_plugin_telegram.py @@ -0,0 +1,408 @@ +"""Telegram notification helpers for strategy plugin alerts.""" + +from __future__ import annotations + +import json +import tempfile +from collections.abc import Callable, Mapping, Sequence +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from quant_platform_kit.common.strategy_plugins import ( + StrategyPluginAlertMessage, + build_strategy_plugin_alert_messages, +) + +from .telegram import ( + DEFAULT_TELEGRAM_BOT_API_BASE_URL, + parse_telegram_chat_ids, + send_strategy_plugin_telegram, +) + +_DEFAULT_TELEGRAM_BODY_MAX_CHARS = 3900 + + +@dataclass(frozen=True) +class StrategyPluginTelegramSettings: + chat_ids: tuple[str, ...] = () + bot_token: str | None = field(default=None, repr=False) + api_base_url: str = DEFAULT_TELEGRAM_BOT_API_BASE_URL + parse_mode: str | None = None + disable_web_page_preview: bool = True + body_max_chars: int = _DEFAULT_TELEGRAM_BODY_MAX_CHARS + timeout: float = 10.0 + + @classmethod + def from_object(cls, value: object) -> "StrategyPluginTelegramSettings": + if isinstance(value, cls): + return value + return cls( + chat_ids=tuple( + parse_telegram_chat_ids(_get_value(value, "crisis_alert_telegram_chat_ids", ())) + ), + bot_token=_first_non_empty(_get_value(value, "crisis_alert_telegram_bot_token")), + api_base_url=( + _first_non_empty(_get_value(value, "crisis_alert_telegram_api_base_url")) + or DEFAULT_TELEGRAM_BOT_API_BASE_URL + ), + parse_mode=_first_non_empty(_get_value(value, "crisis_alert_telegram_parse_mode")), + disable_web_page_preview=_coerce_bool( + _get_value(value, "crisis_alert_telegram_disable_web_page_preview"), + default=True, + ), + body_max_chars=_coerce_int( + _get_value(value, "crisis_alert_telegram_body_max_chars"), + _DEFAULT_TELEGRAM_BODY_MAX_CHARS, + ), + ) + + def missing_fields(self) -> tuple[str, ...]: + missing: list[str] = [] + if not parse_telegram_chat_ids(self.chat_ids): + missing.append("CRISIS_ALERT_TELEGRAM_CHAT_IDS") + if not str(self.bot_token or "").strip(): + missing.append("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + return tuple(missing) + + @property + def is_configured(self) -> bool: + return not self.missing_fields() + + +@dataclass(frozen=True) +class StrategyPluginTelegramAlertDelivery: + alert_key: str + subject: str + status: str + reason: str | None = None + error: str | None = None + metadata: Mapping[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + payload = { + "alert_key": self.alert_key, + "subject": self.subject, + "status": self.status, + "reason": self.reason, + "error": self.error, + **dict(self.metadata or {}), + } + return {key: value for key, value in payload.items() if value not in (None, "", (), [])} + + +@dataclass(frozen=True) +class StrategyPluginTelegramAlertPublishResult: + deliveries: tuple[StrategyPluginTelegramAlertDelivery, ...] = () + + @property + def attempted_count(self) -> int: + return len(self.deliveries) + + @property + def sent_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "sent") + + @property + def skipped_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "skipped") + + @property + def failed_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "failed") + + def to_report_fields(self, *, prefix: str = "strategy_plugin_alert_telegram") -> dict[str, Any]: + return { + f"{prefix}_attempted_count": self.attempted_count, + f"{prefix}_sent_count": self.sent_count, + f"{prefix}_skipped_count": self.skipped_count, + f"{prefix}_failed_count": self.failed_count, + f"{prefix}_deliveries": [delivery.to_dict() for delivery in self.deliveries], + } + + +@dataclass(frozen=True) +class StrategyPluginTelegramAlertMarkerStore: + local_dir: str | Path | None = None + gcs_prefix_uri: str | None = None + gcp_project_id: str | None = None + namespace: str = "strategy_plugin_telegram_alerts" + client_factory: Any = None + + def has_alert(self, alert_key: str) -> bool: + if self.gcs_prefix_uri and self._gcs_blob(alert_key, namespace=self.namespace).exists(): + return True + if self.local_dir and self._local_path(alert_key, namespace=self.namespace).exists(): + return True + return False + + def record_alert( + self, + alert_key: str, + *, + metadata: Mapping[str, Any] | None = None, + ) -> None: + payload = { + "schema_version": "strategy_plugin_telegram_alert_marker.v1", + "alert_key": str(alert_key), + "recorded_at": datetime.now(timezone.utc).isoformat(), + "metadata": dict(metadata or {}), + } + encoded = json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + if self.gcs_prefix_uri: + self._gcs_blob(alert_key, namespace=self.namespace).upload_from_string( + encoded, + content_type="application/json", + ) + return + if self.local_dir: + path = self._local_path(alert_key, namespace=self.namespace) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(encoded, encoding="utf-8") + + def _local_path(self, alert_key: str, *, namespace: str) -> Path: + root = Path(self.local_dir or tempfile.gettempdir()).expanduser() + return root / namespace / f"{_clean_relative_key(alert_key)}.json" + + def _gcs_blob(self, alert_key: str, *, namespace: str): + bucket_name, prefix = _parse_gcs_uri(str(self.gcs_prefix_uri or "")) + object_name = "/".join( + part.strip("/") + for part in (prefix, namespace, f"{_clean_relative_key(alert_key)}.json") + if part and part.strip("/") + ) + if self.client_factory is None: + try: + from google.cloud import storage # type: ignore + except ImportError as exc: + raise RuntimeError("google-cloud-storage is required for GCS alert markers") from exc + client_factory = storage.Client + else: + client_factory = self.client_factory + client = client_factory(project=self.gcp_project_id) if self.gcp_project_id else client_factory() + return client.bucket(bucket_name).blob(object_name) + + +def publish_strategy_plugin_telegram_alerts( + signals: Sequence[object], + *, + telegram_settings: StrategyPluginTelegramSettings | object, + translator: Callable[..., str] | None = None, + strategy_label: str | None = None, + context_label: str | None = None, + alert_store: StrategyPluginTelegramAlertMarkerStore | object | None = None, + send_notification: Callable[..., bool] = send_strategy_plugin_telegram, + log_message: Callable[..., Any] = print, +) -> StrategyPluginTelegramAlertPublishResult: + settings = StrategyPluginTelegramSettings.from_object(telegram_settings) + messages = build_strategy_plugin_alert_messages( + signals, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_namespace="strategy_plugin_telegram_alert", + ) + deliveries: list[StrategyPluginTelegramAlertDelivery] = [] + missing_fields = settings.missing_fields() + if missing_fields: + for message in messages: + deliveries.append( + _delivery( + message, + status="skipped", + reason="missing_telegram_config", + error=",".join(missing_fields), + ) + ) + result = StrategyPluginTelegramAlertPublishResult(tuple(deliveries)) + _log_publish_result(result, log_message=log_message) + return result + + for message in messages: + alert_key = message.alert_key or _fallback_alert_key(message) + try: + duplicate = _store_has_alert(alert_store, alert_key) + store_error = None + except Exception as exc: + duplicate = False + store_error = f"alert_store_check_failed:{type(exc).__name__}: {exc}" + if duplicate: + deliveries.append(_delivery(message, status="skipped", reason="duplicate_alert")) + continue + sent, send_error = _send_message(send_notification, message, settings) + if not sent: + deliveries.append(_delivery(message, status="failed", reason="send_failed", error=send_error)) + continue + record_error = _store_record_error(alert_store, alert_key, message) + combined_error = "; ".join(error for error in (store_error, record_error) if error) + deliveries.append(_delivery(message, status="sent", error=combined_error or None)) + result = StrategyPluginTelegramAlertPublishResult(tuple(deliveries)) + _log_publish_result(result, log_message=log_message) + return result + + +def _delivery( + message: StrategyPluginAlertMessage, + *, + status: str, + reason: str | None = None, + error: str | None = None, +) -> StrategyPluginTelegramAlertDelivery: + return StrategyPluginTelegramAlertDelivery( + alert_key=message.alert_key or _fallback_alert_key(message), + subject=message.subject, + status=status, + reason=reason, + error=error, + metadata=message.metadata, + ) + + +def _send_message( + send_notification: Callable[..., bool], + message: StrategyPluginAlertMessage, + settings: StrategyPluginTelegramSettings, +) -> tuple[bool, str | None]: + try: + sent = send_notification( + title=message.subject, + body=_build_telegram_body(message, max_chars=settings.body_max_chars), + chat_ids=settings.chat_ids, + bot_token=settings.bot_token, + api_base_url=settings.api_base_url, + parse_mode=settings.parse_mode, + disable_web_page_preview=settings.disable_web_page_preview, + timeout=settings.timeout, + ) + except Exception as exc: + return False, f"{type(exc).__name__}: {exc}" + return bool(sent), None + + +def _build_telegram_body(message: StrategyPluginAlertMessage, *, max_chars: int) -> str: + body = str(message.body or "").strip() or str(message.subject or "").strip() + limit = max(80, min(4096, int(max_chars or _DEFAULT_TELEGRAM_BODY_MAX_CHARS))) + if len(body) <= limit: + return body + return body[: max(0, limit - 3)].rstrip() + "..." + + +def _store_has_alert(alert_store: object | None, alert_key: str) -> bool: + if alert_store is None: + return False + checker = getattr(alert_store, "has_alert", None) + if checker is None: + return False + return bool(checker(alert_key)) + + +def _store_record_error( + alert_store: object | None, + alert_key: str, + message: StrategyPluginAlertMessage, +) -> str | None: + if alert_store is None: + return None + recorder = getattr(alert_store, "record_alert", None) + if recorder is None: + return None + try: + recorder( + alert_key, + metadata={ + "subject": message.subject, + **dict(message.metadata or {}), + }, + ) + except Exception as exc: + return f"alert_store_record_failed:{type(exc).__name__}: {exc}" + return None + + +def _log_publish_result( + result: StrategyPluginTelegramAlertPublishResult, + *, + log_message: Callable[..., Any], +) -> None: + if result.attempted_count <= 0: + return + _call_log_message( + log_message, + ( + "strategy_plugin_alert_telegram_result " + f"attempted={result.attempted_count} " + f"sent={result.sent_count} " + f"skipped={result.skipped_count} " + f"failed={result.failed_count}" + ), + ) + + +def _call_log_message(log_message: Callable[..., Any], text: str) -> None: + try: + log_message(text, flush=True) + except TypeError: + log_message(text) + + +def _get_value(value: object, name: str, default: Any = None) -> Any: + if isinstance(value, Mapping): + return value.get(name, default) + return getattr(value, name, default) + + +def _first_non_empty(*values: Any) -> str | None: + for value in values: + text = str(value or "").strip() + if text: + return text + return None + + +def _coerce_bool(value: Any, *, default: bool) -> bool: + if value in (None, ""): + return default + text = str(value).strip().lower() + if text in {"1", "true", "yes", "y", "on"}: + return True + if text in {"0", "false", "no", "n", "off"}: + return False + return default + + +def _coerce_int(value: Any, default: int) -> int: + text = str(value or "").strip() + if not text: + return default + try: + return int(text) + except (TypeError, ValueError): + return default + + +def _fallback_alert_key(message: StrategyPluginAlertMessage) -> str: + return "strategy_plugin_telegram_alert/" + _clean_relative_key(message.subject or "unknown") + + +def _clean_relative_key(value: str) -> str: + parts = [] + for raw_part in str(value or "").replace("\\", "/").split("/"): + cleaned = "".join( + char if char.isalnum() or char in {"-", "_", "."} else "-" + for char in raw_part.strip() + ).strip("-._") + if cleaned: + parts.append(cleaned[:100]) + return "/".join(parts) or "unknown" + + +def _parse_gcs_uri(uri: str) -> tuple[str, str]: + raw_uri = str(uri or "").strip() + if not raw_uri.startswith("gs://"): + raise ValueError(f"gcs uri must start with gs://, got: {uri!r}") + remainder = raw_uri[5:] + bucket_name, _, object_prefix = remainder.partition("/") + if not bucket_name: + raise ValueError(f"gcs uri must include a bucket name, got: {uri!r}") + return bucket_name, object_prefix.strip("/") diff --git a/src/quant_platform_kit/notifications/telegram.py b/src/quant_platform_kit/notifications/telegram.py index be2fe3b..0a5b4d9 100644 --- a/src/quant_platform_kit/notifications/telegram.py +++ b/src/quant_platform_kit/notifications/telegram.py @@ -1,25 +1,138 @@ +"""Telegram Bot API notification helpers.""" + from __future__ import annotations import json import urllib.parse import urllib.request +from collections.abc import Sequence +from typing import Any + + +DEFAULT_TELEGRAM_BOT_API_BASE_URL = "https://api.telegram.org" + + +def parse_telegram_chat_ids(raw_value: str | Sequence[str] | None) -> tuple[str, ...]: + if raw_value is None: + return () + if isinstance(raw_value, str): + values = raw_value.replace(";", ",").replace("\n", ",").split(",") + else: + values = raw_value + chat_ids = [] + seen = set() + for value in values: + chat_id = str(value or "").strip() + if not chat_id or chat_id in seen: + continue + chat_ids.append(chat_id) + seen.add(chat_id) + return tuple(chat_ids) + + +def send_strategy_plugin_telegram( + *, + title: str, + body: str, + chat_ids: Sequence[str], + bot_token: str | None, + api_base_url: str | None = None, + parse_mode: str | None = None, + disable_web_page_preview: bool = True, + timeout: float = 10.0, + opener: Any = None, + printer=print, +) -> bool: + message = _build_message_text(title=title, body=body) + return send_telegram_message( + text=message, + chat_ids=chat_ids, + bot_token=bot_token, + api_base_url=api_base_url, + parse_mode=parse_mode, + disable_web_page_preview=disable_web_page_preview, + timeout=timeout, + opener=opener, + printer=printer, + ) -def send_telegram_message(token: str, chat_id: str, message: str, *, timeout: int = 15) -> None: - if not token.strip(): +def send_telegram_message( + bot_token: str | None = None, + chat_ids: str | Sequence[str] | None = None, + text: str | None = None, + *, + api_base_url: str | None = None, + parse_mode: str | None = None, + disable_web_page_preview: bool = True, + timeout: float = 10.0, + opener: Any = None, + printer=print, +) -> bool: + resolved_chat_ids = parse_telegram_chat_ids(chat_ids) + token = str(bot_token or "").strip() + message = str(text or "").strip() + if not token: raise ValueError("token must not be empty.") - if not chat_id.strip(): + if not resolved_chat_ids: raise ValueError("chat_id must not be empty.") - if not message.strip(): + if not message: raise ValueError("message must not be empty.") - body = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode("utf-8") - request = urllib.request.Request( - url=f"https://api.telegram.org/bot{token}/sendMessage", - data=body, - method="POST", - ) - with urllib.request.urlopen(request, timeout=timeout) as response: - payload = json.loads(response.read().decode("utf-8")) - if not payload.get("ok"): - raise RuntimeError(f"telegram api returned not ok: {payload}") + request_opener = opener or urllib.request.urlopen + endpoint = _telegram_send_message_endpoint(api_base_url, token) + all_sent = True + for chat_id in resolved_chat_ids: + payload: dict[str, object] = { + "chat_id": chat_id, + "text": message, + "disable_web_page_preview": bool(disable_web_page_preview), + } + text_parse_mode = str(parse_mode or "").strip() + if text_parse_mode: + payload["parse_mode"] = text_parse_mode + request = urllib.request.Request( + endpoint, + data=json.dumps(payload, ensure_ascii=False).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + if not _request_succeeded(request_opener, request, timeout, printer, chat_id): + all_sent = False + return all_sent + + +def _request_succeeded( + request_opener: Any, + request: urllib.request.Request, + timeout: float, + printer, + chat_id: str, +) -> bool: + try: + with request_opener(request, timeout=timeout) as response: + status = getattr(response, "status", None) + if status is None: + status = response.getcode() + status = int(status) + except Exception as exc: + printer(f"Telegram send failed for {chat_id}: {exc}", flush=True) + return False + if status < 200 or status >= 300: + printer(f"Telegram send failed for {chat_id}: HTTP {status}", flush=True) + return False + return True + + +def _telegram_send_message_endpoint(api_base_url: str | None, bot_token: str) -> str: + base_url = str(api_base_url or DEFAULT_TELEGRAM_BOT_API_BASE_URL).rstrip("/") + encoded_token = urllib.parse.quote(str(bot_token), safe=":") + return f"{base_url}/bot{encoded_token}/sendMessage" + + +def _build_message_text(*, title: str, body: str) -> str: + text_title = str(title or "").strip() + text_body = str(body or "").strip() + if text_title and text_body: + return f"{text_title}\n\n{text_body}" + return text_body or text_title diff --git a/tests/test_strategy_plugin_alert_dispatcher.py b/tests/test_strategy_plugin_alert_dispatcher.py index 263c955..9736e67 100644 --- a/tests/test_strategy_plugin_alert_dispatcher.py +++ b/tests/test_strategy_plugin_alert_dispatcher.py @@ -9,6 +9,7 @@ from quant_platform_kit.notifications.strategy_plugin_email import StrategyPluginEmailSettings from quant_platform_kit.notifications.strategy_plugin_push import StrategyPluginPushSettings from quant_platform_kit.notifications.strategy_plugin_sms import StrategyPluginSmsSettings +from quant_platform_kit.notifications.strategy_plugin_telegram import StrategyPluginTelegramSettings def _alert_signal(): @@ -34,6 +35,8 @@ class _NotificationSettings: crisis_alert_push_provider = "ntfy" crisis_alert_push_recipients = "risk-topic" crisis_alert_push_priority = "5" + crisis_alert_telegram_chat_ids = "123456" + crisis_alert_telegram_bot_token = "bot-token" class StrategyPluginAlertDispatcherTests(unittest.TestCase): @@ -41,6 +44,7 @@ def test_publish_strategy_plugin_alerts_dispatches_enabled_channels(self): emails = [] sms_messages = [] push_messages = [] + telegram_messages = [] with tempfile.TemporaryDirectory() as tmp_dir: result = publish_strategy_plugin_alerts( @@ -52,20 +56,24 @@ def test_publish_strategy_plugin_alerts_dispatches_enabled_channels(self): send_email_notification=lambda **kwargs: emails.append(kwargs) or True, send_sms_notification=lambda **kwargs: sms_messages.append(kwargs) or True, send_push_notification=lambda **kwargs: push_messages.append(kwargs) or True, + send_telegram_notification=lambda **kwargs: telegram_messages.append(kwargs) or True, log_message=lambda *_args, **_kwargs: None, ) - self.assertEqual(result.sent_count, 3) + self.assertEqual(result.sent_count, 4) self.assertEqual(result.failed_count, 0) self.assertIsNotNone(result.email_result) self.assertIsNotNone(result.sms_result) self.assertIsNotNone(result.push_result) + self.assertIsNotNone(result.telegram_result) self.assertEqual(result.email_result.sent_count, 1) self.assertEqual(result.sms_result.sent_count, 1) self.assertEqual(result.push_result.sent_count, 1) + self.assertEqual(result.telegram_result.sent_count, 1) self.assertEqual(emails[0]["recipients"], ("risk@example.com",)) self.assertEqual(sms_messages[0]["recipients"], ("+15165480265",)) self.assertEqual(push_messages[0]["recipients"], ("risk-topic",)) + self.assertEqual(telegram_messages[0]["chat_ids"], ("123456",)) def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(self): settings = _NotificationSettings() @@ -81,6 +89,7 @@ def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(sel send_email_notification=lambda **_kwargs: True, send_sms_notification=lambda **_kwargs: True, send_push_notification=lambda **_kwargs: True, + send_telegram_notification=lambda **_kwargs: True, log_message=lambda *_args, **_kwargs: None, ) second = publish_strategy_plugin_alerts( @@ -92,18 +101,21 @@ def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(sel send_email_notification=lambda **_kwargs: True, send_sms_notification=lambda **_kwargs: True, send_push_notification=lambda **_kwargs: True, + send_telegram_notification=lambda **_kwargs: True, log_message=lambda *_args, **_kwargs: None, ) - self.assertEqual(first.sent_count, 3) + self.assertEqual(first.sent_count, 4) self.assertEqual(second.sent_count, 0) - self.assertEqual(second.skipped_count, 3) + self.assertEqual(second.skipped_count, 4) self.assertIsNotNone(second.email_result) self.assertEqual(second.email_result.deliveries[0].reason, "duplicate_alert") self.assertIsNotNone(second.sms_result) self.assertEqual(second.sms_result.deliveries[0].reason, "duplicate_alert") self.assertIsNotNone(second.push_result) self.assertEqual(second.push_result.deliveries[0].reason, "duplicate_alert") + self.assertIsNotNone(second.telegram_result) + self.assertEqual(second.telegram_result.deliveries[0].reason, "duplicate_alert") def test_publish_strategy_plugin_alerts_can_target_one_channel(self): sms_messages = [] @@ -123,6 +135,7 @@ def test_publish_strategy_plugin_alerts_can_target_one_channel(self): state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), send_email_notification=lambda **_kwargs: self.fail("email should not run"), send_sms_notification=lambda **kwargs: sms_messages.append(kwargs) or True, + send_telegram_notification=lambda **_kwargs: self.fail("telegram should not run"), log_message=lambda *_args, **_kwargs: None, ) @@ -155,14 +168,43 @@ def test_publish_strategy_plugin_alerts_can_target_push_channel(self): self.assertIsNone(result.email_result) self.assertIsNone(result.sms_result) self.assertIsNotNone(result.push_result) + self.assertIsNone(result.telegram_result) self.assertEqual(result.sent_count, 1) self.assertEqual(push_messages[0]["provider"], "ntfy") + def test_publish_strategy_plugin_alerts_can_target_telegram_channel(self): + telegram_messages = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=StrategyPluginTelegramSettings( + chat_ids=("123456",), + bot_token="bot-token", + ), + channels=("telegram",), + strategy_label="TQQQ", + context_label="ibkr / live-slot-a", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **_kwargs: self.fail("email should not run"), + send_sms_notification=lambda **_kwargs: self.fail("sms should not run"), + send_push_notification=lambda **_kwargs: self.fail("push should not run"), + send_telegram_notification=lambda **kwargs: telegram_messages.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertIsNone(result.email_result) + self.assertIsNone(result.sms_result) + self.assertIsNone(result.push_result) + self.assertIsNotNone(result.telegram_result) + self.assertEqual(result.sent_count, 1) + self.assertEqual(telegram_messages[0]["chat_ids"], ("123456",)) + def test_publish_strategy_plugin_alerts_reads_channels_from_settings(self): settings = _NotificationSettings() - settings.crisis_alert_channels = "email,push" + settings.crisis_alert_channels = "email,telegram" emails = [] - push_messages = [] + telegram_messages = [] with tempfile.TemporaryDirectory() as tmp_dir: result = publish_strategy_plugin_alerts( @@ -173,16 +215,18 @@ def test_publish_strategy_plugin_alerts_reads_channels_from_settings(self): state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), send_email_notification=lambda **kwargs: emails.append(kwargs) or True, send_sms_notification=lambda **_kwargs: self.fail("sms should not run"), - send_push_notification=lambda **kwargs: push_messages.append(kwargs) or True, + send_push_notification=lambda **_kwargs: self.fail("push should not run"), + send_telegram_notification=lambda **kwargs: telegram_messages.append(kwargs) or True, log_message=lambda *_args, **_kwargs: None, ) self.assertEqual(result.sent_count, 2) self.assertIsNotNone(result.email_result) self.assertIsNone(result.sms_result) - self.assertIsNotNone(result.push_result) + self.assertIsNone(result.push_result) + self.assertIsNotNone(result.telegram_result) self.assertEqual(len(emails), 1) - self.assertEqual(len(push_messages), 1) + self.assertEqual(len(telegram_messages), 1) def test_publish_strategy_plugin_alerts_attach_to_report(self): with tempfile.TemporaryDirectory() as tmp_dir: @@ -209,6 +253,7 @@ def test_publish_strategy_plugin_alerts_attach_to_report(self): self.assertEqual(report["diagnostics"]["strategy_plugin_alert_sent_count"], 1) self.assertEqual(report["diagnostics"]["strategy_plugin_alert_email_sent_count"], 1) self.assertNotIn("strategy_plugin_alert_sms_sent_count", report["summary"]) + self.assertNotIn("strategy_plugin_alert_telegram_sent_count", report["summary"]) def test_publish_strategy_plugin_alerts_rejects_unknown_channel(self): with tempfile.TemporaryDirectory() as tmp_dir: diff --git a/tests/test_strategy_plugin_telegram_notifications.py b/tests/test_strategy_plugin_telegram_notifications.py new file mode 100644 index 0000000..47260d7 --- /dev/null +++ b/tests/test_strategy_plugin_telegram_notifications.py @@ -0,0 +1,170 @@ +import json +import tempfile +import unittest +from types import SimpleNamespace + +from quant_platform_kit.notifications.strategy_plugin_telegram import ( + StrategyPluginTelegramSettings, + publish_strategy_plugin_telegram_alerts, +) +from quant_platform_kit.notifications.telegram import ( + parse_telegram_chat_ids, + send_strategy_plugin_telegram, + send_telegram_message, +) + + +def _alert_signal(): + return SimpleNamespace( + strategy="tqqq_growth_income", + plugin="crisis_response_shadow", + effective_mode="shadow", + as_of="2026-05-24", + canonical_route="true_crisis", + suggested_action="defend", + would_trade_if_enabled=True, + ) + + +class _FakeResponse: + status = 200 + + def __enter__(self): + return self + + def __exit__(self, *_args): + return False + + +class StrategyPluginTelegramNotificationTests(unittest.TestCase): + def test_parse_telegram_chat_ids_accepts_common_separators(self): + self.assertEqual( + parse_telegram_chat_ids("123; -456\n@risk_channel,123"), + ("123", "-456", "@risk_channel"), + ) + + def test_send_telegram_message_posts_json_for_each_chat(self): + requests = [] + + def opener(request, timeout): + requests.append((request, timeout)) + return _FakeResponse() + + sent = send_telegram_message( + text="危机通知", + chat_ids=("123", "@risk_channel"), + bot_token="123456:ABC", + api_base_url="https://telegram.example.test", + timeout=3.0, + opener=opener, + ) + + self.assertTrue(sent) + self.assertEqual(len(requests), 2) + first_request, timeout = requests[0] + self.assertEqual(timeout, 3.0) + self.assertEqual( + first_request.full_url, + "https://telegram.example.test/bot123456:ABC/sendMessage", + ) + payload = json.loads(first_request.data.decode("utf-8")) + self.assertEqual(payload["chat_id"], "123") + self.assertEqual(payload["text"], "危机通知") + self.assertTrue(payload["disable_web_page_preview"]) + + def test_send_strategy_plugin_telegram_combines_title_and_body(self): + requests = [] + + def opener(request, timeout): + requests.append(request) + return _FakeResponse() + + sent = send_strategy_plugin_telegram( + title="标题", + body="正文", + chat_ids=("123",), + bot_token="token", + api_base_url="https://telegram.example.test", + opener=opener, + ) + + self.assertTrue(sent) + payload = json.loads(requests[0].data.decode("utf-8")) + self.assertEqual(payload["text"], "标题\n\n正文") + + def test_strategy_plugin_telegram_settings_from_object(self): + settings = StrategyPluginTelegramSettings.from_object( + SimpleNamespace( + crisis_alert_telegram_chat_ids="123; @risk", + crisis_alert_telegram_bot_token="bot-token", + crisis_alert_telegram_api_base_url="https://telegram.example.test", + crisis_alert_telegram_parse_mode="HTML", + crisis_alert_telegram_disable_web_page_preview="false", + crisis_alert_telegram_body_max_chars="500", + ) + ) + + self.assertEqual(settings.chat_ids, ("123", "@risk")) + self.assertEqual(settings.bot_token, "bot-token") + self.assertEqual(settings.api_base_url, "https://telegram.example.test") + self.assertEqual(settings.parse_mode, "HTML") + self.assertFalse(settings.disable_web_page_preview) + self.assertEqual(settings.body_max_chars, 500) + self.assertEqual(settings.missing_fields(), ()) + + def test_publish_strategy_plugin_telegram_alerts_sends_and_dedupes(self): + calls = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + settings = StrategyPluginTelegramSettings( + chat_ids=("123",), + bot_token="bot-token", + ) + first = publish_strategy_plugin_telegram_alerts( + [_alert_signal()], + telegram_settings=settings, + strategy_label="TQQQ", + context_label="ibkr / live-slot-a", + alert_store=SimpleNamespace( + has_alert=lambda _key: False, + record_alert=lambda _key, **_kwargs: None, + ), + send_notification=lambda **kwargs: calls.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + second = publish_strategy_plugin_telegram_alerts( + [_alert_signal()], + telegram_settings=settings, + strategy_label="TQQQ", + context_label="ibkr / live-slot-a", + alert_store=SimpleNamespace( + has_alert=lambda _key: True, + record_alert=lambda _key, **_kwargs: self.fail("duplicate should not record"), + ), + send_notification=lambda **_kwargs: self.fail("duplicate should not send"), + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertEqual(first.sent_count, 1) + self.assertEqual(second.skipped_count, 1) + self.assertEqual(second.deliveries[0].reason, "duplicate_alert") + self.assertEqual(calls[0]["chat_ids"], ("123",)) + self.assertIn("ibkr / live-slot-a", calls[0]["body"]) + self.assertTrue(tmp_dir) + + def test_publish_strategy_plugin_telegram_alerts_skips_when_missing_config(self): + result = publish_strategy_plugin_telegram_alerts( + [_alert_signal()], + telegram_settings=SimpleNamespace(), + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertEqual(result.sent_count, 0) + self.assertEqual(result.skipped_count, 1) + self.assertEqual(result.deliveries[0].reason, "missing_telegram_config") + self.assertIn("CRISIS_ALERT_TELEGRAM_CHAT_IDS", result.deliveries[0].error) + self.assertIn("CRISIS_ALERT_TELEGRAM_BOT_TOKEN", result.deliveries[0].error) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_strategy_plugins.py b/tests/test_strategy_plugins.py index 9c8bee1..3aa9363 100644 --- a/tests/test_strategy_plugins.py +++ b/tests/test_strategy_plugins.py @@ -11,6 +11,7 @@ STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, + STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM, StrategyPluginDefinition, build_strategy_plugin_alert_messages, build_strategy_plugin_notification_lines, @@ -96,6 +97,7 @@ def test_default_plugin_definition_limits_crisis_response_to_supported_strategie STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + STRATEGY_PLUGIN_ALERT_CHANNEL_TELEGRAM, ), ) validate_strategy_plugin_compatibility(