From b960bff1535480ef027c5c4893587250a178df03 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Mon, 25 May 2026 23:44:51 +0800 Subject: [PATCH] Add push channel for strategy plugin alerts --- README.md | 5 +- README.zh-CN.md | 5 +- pyproject.toml | 2 +- src/quant_platform_kit/common/__init__.py | 4 + .../common/strategy_plugins.py | 8 +- .../notifications/__init__.py | 17 + src/quant_platform_kit/notifications/push.py | 227 ++++++++++ .../notifications/strategy_plugin_alerts.py | 87 +++- .../notifications/strategy_plugin_push.py | 420 ++++++++++++++++++ .../test_strategy_plugin_alert_dispatcher.py | 72 ++- ...test_strategy_plugin_push_notifications.py | 235 ++++++++++ tests/test_strategy_plugins.py | 11 +- 12 files changed, 1077 insertions(+), 16 deletions(-) create mode 100644 src/quant_platform_kit/notifications/push.py create mode 100644 src/quant_platform_kit/notifications/strategy_plugin_push.py create mode 100644 tests/test_strategy_plugin_push_notifications.py diff --git a/README.md b/README.md index 7470477..b320520 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It contains: - narrow ports for market data, portfolio snapshots, order execution, notifications, and state - reusable broker adapter utilities - strategy loading, strategy-plugin, and alert-message contracts +- optional strategy-plugin alert channels for email, SMS, and push providers - synthetic-data tests for public behavior It does not contain private runtime wiring or generated strategy outputs. @@ -44,10 +45,12 @@ Strategy code should not branch on a broker platform, and platform code should n ## Strategy Plugins -Strategy plugins are sidecar artifacts that platform repositories may read when a strategy profile opts in. This repository defines the public plugin contract, compatibility checks, alert-message building, and duplicate-suppression helpers. +Strategy plugins are sidecar artifacts that platform repositories may read when a strategy profile opts in. This repository defines the public plugin contract, compatibility checks, alert-message building, optional alert delivery helpers, and duplicate-suppression helpers. Generated plugin artifacts and platform-specific notification routing stay with the producing pipeline or consuming platform repository. Tests in this repository use synthetic price history and synthetic payloads only. +Plugin alert delivery is provider-neutral at the platform boundary. Platform repositories pass runtime settings into `publish_strategy_plugin_alerts`; this repository handles configured `email`, `sms`, and `push` channels without coupling plugin logic to a broker platform. + ## Package Layout ```text diff --git a/README.zh-CN.md b/README.zh-CN.md index f993b33..6ec7579 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -14,6 +14,7 @@ - 市场数据、持仓快照、订单执行、通知、状态存储等窄接口 - 可复用的券商适配工具 - 策略加载、策略插件、告警消息契约 +- 可选的策略插件 email、SMS 和 push 告警通道 - 使用合成数据的公开测试 它不包含私有运行时接线和生成的策略输出。 @@ -44,10 +45,12 @@ QuantPlatformKit ## 策略插件 -策略插件是平台仓库按需读取的 sidecar artifact。这个仓库只定义公开插件契约、兼容性校验、告警消息构造和重复告警抑制 helper。 +策略插件是平台仓库按需读取的 sidecar artifact。这个仓库只定义公开插件契约、兼容性校验、告警消息构造、可选告警发送 helper 和重复告警抑制 helper。 生成的插件 artifact 和平台专属通知路由由生成它的 pipeline 或消费它的平台仓库管理。这个仓库的测试只使用合成价格历史和合成 payload。 +插件告警发送在平台边界保持 provider-neutral。平台仓库只把 runtime settings 传入 `publish_strategy_plugin_alerts`;这个仓库负责按配置发送 `email`、`sms` 和 `push`,不让插件逻辑耦合某个券商平台。 + ## 目录结构 ```text diff --git a/pyproject.toml b/pyproject.toml index dd9b678..e0eadb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "quant-platform-kit" -version = "0.7.28" +version = "0.7.29" description = "Shared broker adapters, domain models, execution ports, and notification utilities for QuantStrategyLab strategies." readme = "README.md" requires-python = ">=3.9" diff --git a/src/quant_platform_kit/common/__init__.py b/src/quant_platform_kit/common/__init__.py index 6bdcf9c..a27819f 100644 --- a/src/quant_platform_kit/common/__init__.py +++ b/src/quant_platform_kit/common/__init__.py @@ -44,6 +44,8 @@ PLUGIN_CRISIS_RESPONSE_SHADOW, PLUGIN_MODE_SHADOW, STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, + STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, STRATEGY_PLUGIN_ALERT_ACTIONS, STRATEGY_PLUGIN_NON_ALERT_ROUTES, SUPPORTED_STRATEGY_PLUGIN_MODES, @@ -85,6 +87,8 @@ "STAGE_RECONCILED", "STAGE_SUBMITTED", "STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL", + "STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH", + "STRATEGY_PLUGIN_ALERT_CHANNEL_SMS", "STRATEGY_PLUGIN_ALERT_ACTIONS", "STRATEGY_PLUGIN_NON_ALERT_ROUTES", "SUPPORTED_STRATEGY_PLUGIN_MODES", diff --git a/src/quant_platform_kit/common/strategy_plugins.py b/src/quant_platform_kit/common/strategy_plugins.py index 8d05d02..8aa5c90 100644 --- a/src/quant_platform_kit/common/strategy_plugins.py +++ b/src/quant_platform_kit/common/strategy_plugins.py @@ -13,6 +13,8 @@ PLUGIN_CRISIS_RESPONSE_SHADOW = "crisis_response_shadow" PLUGIN_MODE_SHADOW = "shadow" STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL = "email" +STRATEGY_PLUGIN_ALERT_CHANNEL_SMS = "sms" +STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH = "push" SUPPORTED_STRATEGY_PLUGIN_MODES = frozenset({PLUGIN_MODE_SHADOW}) DEFAULT_PLUGIN_ARTIFACT_CACHE_DIR = Path(tempfile.gettempdir()) / "quant_strategy_plugin_artifacts" STRATEGY_PLUGIN_NON_ALERT_ROUTES = frozenset({"no_action"}) @@ -67,7 +69,11 @@ def supports_strategy(self, strategy: str) -> bool: plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, supported_strategies=CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES, supported_modes=SUPPORTED_STRATEGY_PLUGIN_MODES, - alert_channels=(STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL,), + alert_channels=( + STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, + STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, + STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + ), ) } diff --git a/src/quant_platform_kit/notifications/__init__.py b/src/quant_platform_kit/notifications/__init__.py index 73b02fd..a10a06b 100644 --- a/src/quant_platform_kit/notifications/__init__.py +++ b/src/quant_platform_kit/notifications/__init__.py @@ -2,6 +2,7 @@ from .email import parse_email_recipients, send_smtp_email from .events import NotificationPublisher, RenderedNotification, publish_rendered_notification +from .push import parse_push_recipients, send_ntfy_push, send_pushover_push, send_strategy_plugin_push from .sms import normalize_sms_recipient, parse_sms_recipients, send_twilio_sms from .strategy_plugin_alerts import ( StrategyPluginAlertChannelStores, @@ -24,6 +25,13 @@ StrategyPluginSmsSettings, publish_strategy_plugin_sms_alerts, ) +from .strategy_plugin_push import ( + StrategyPluginPushAlertDelivery, + StrategyPluginPushAlertMarkerStore, + StrategyPluginPushAlertPublishResult, + StrategyPluginPushSettings, + publish_strategy_plugin_push_alerts, +) __all__ = [ "NotificationPublisher", @@ -35,6 +43,10 @@ "StrategyPluginEmailAlertMarkerStore", "StrategyPluginEmailAlertPublishResult", "StrategyPluginEmailSettings", + "StrategyPluginPushAlertDelivery", + "StrategyPluginPushAlertMarkerStore", + "StrategyPluginPushAlertPublishResult", + "StrategyPluginPushSettings", "StrategyPluginSmsAlertDelivery", "StrategyPluginSmsAlertMarkerStore", "StrategyPluginSmsAlertPublishResult", @@ -42,11 +54,16 @@ "build_strategy_plugin_alert_context_label", "normalize_sms_recipient", "parse_email_recipients", + "parse_push_recipients", "parse_sms_recipients", "publish_rendered_notification", "publish_strategy_plugin_alerts", "publish_strategy_plugin_email_alerts", + "publish_strategy_plugin_push_alerts", "publish_strategy_plugin_sms_alerts", + "send_ntfy_push", + "send_pushover_push", "send_smtp_email", + "send_strategy_plugin_push", "send_twilio_sms", ] diff --git a/src/quant_platform_kit/notifications/push.py b/src/quant_platform_kit/notifications/push.py new file mode 100644 index 0000000..01bb4c9 --- /dev/null +++ b/src/quant_platform_kit/notifications/push.py @@ -0,0 +1,227 @@ +"""Mobile push notification helpers.""" + +from __future__ import annotations + +import urllib.parse +import urllib.request +from collections.abc import Sequence +from email.header import Header +from typing import Any + + +PUSH_PROVIDER_NTFY = "ntfy" +PUSH_PROVIDER_PUSHOVER = "pushover" +DEFAULT_NTFY_API_BASE_URL = "https://ntfy.sh" +DEFAULT_PUSHOVER_API_BASE_URL = "https://api.pushover.net" + + +def parse_push_recipients(raw_value: str | Sequence[str] | None) -> tuple[str, ...]: + if raw_value is None: + return () + if isinstance(raw_value, str): + values = raw_value.replace(";", ",").replace("\n", ",").split(",") + else: + values = raw_value + recipients = [] + seen = set() + for value in values: + recipient = str(value or "").strip() + if not recipient or recipient in seen: + continue + recipients.append(recipient) + seen.add(recipient) + return tuple(recipients) + + +def send_strategy_plugin_push( + *, + provider: str, + title: str, + body: str, + recipients: Sequence[str], + app_token: str | None = None, + access_token: str | None = None, + api_base_url: str | None = None, + device: str | None = None, + priority: str | int | None = None, + tags: str | None = None, + timeout: float = 10.0, + opener: Any = None, + printer=print, +) -> bool: + normalized_provider = str(provider or "").strip().lower() + if normalized_provider == PUSH_PROVIDER_PUSHOVER: + return send_pushover_push( + title=title, + body=body, + recipients=recipients, + app_token=app_token, + api_base_url=api_base_url or DEFAULT_PUSHOVER_API_BASE_URL, + device=device, + priority=priority, + timeout=timeout, + opener=opener, + printer=printer, + ) + if normalized_provider == PUSH_PROVIDER_NTFY: + return send_ntfy_push( + title=title, + body=body, + recipients=recipients, + access_token=access_token, + api_base_url=api_base_url or DEFAULT_NTFY_API_BASE_URL, + priority=priority, + tags=tags, + timeout=timeout, + opener=opener, + printer=printer, + ) + printer(f"Push send failed: unsupported provider {provider!r}", flush=True) + return False + + +def send_pushover_push( + *, + title: str, + body: str, + recipients: Sequence[str], + app_token: str | None, + api_base_url: str = DEFAULT_PUSHOVER_API_BASE_URL, + device: str | None = None, + priority: str | int | None = None, + timeout: float = 10.0, + opener: Any = None, + printer=print, +) -> bool: + resolved_recipients = parse_push_recipients(recipients) + token = str(app_token or "").strip() + message = str(body or "").strip() + if not resolved_recipients or not token or not message: + return False + + request_opener = opener or urllib.request.urlopen + endpoint = _pushover_messages_endpoint(api_base_url) + all_sent = True + for recipient in resolved_recipients: + payload = { + "token": token, + "user": recipient, + "message": message, + } + text_title = str(title or "").strip() + if text_title: + payload["title"] = text_title + text_device = str(device or "").strip() + if text_device: + payload["device"] = text_device + text_priority = str(priority or "").strip() + if text_priority: + payload["priority"] = text_priority + data = urllib.parse.urlencode(payload).encode("utf-8") + request = urllib.request.Request( + endpoint, + data=data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + method="POST", + ) + if not _request_succeeded(request_opener, request, timeout, printer, recipient): + all_sent = False + return all_sent + + +def send_ntfy_push( + *, + title: str, + body: str, + recipients: Sequence[str], + access_token: str | None = None, + api_base_url: str = DEFAULT_NTFY_API_BASE_URL, + priority: str | int | None = None, + tags: str | None = None, + timeout: float = 10.0, + opener: Any = None, + printer=print, +) -> bool: + resolved_recipients = parse_push_recipients(recipients) + message = str(body or "").strip() + if not resolved_recipients or not message: + return False + + request_opener = opener or urllib.request.urlopen + token = str(access_token or "").strip() + all_sent = True + for recipient in resolved_recipients: + headers = { + "Content-Type": "text/plain; charset=utf-8", + } + text_title = str(title or "").strip() + if text_title: + headers["Title"] = _encode_http_header(text_title) + text_priority = str(priority or "").strip() + if text_priority: + headers["Priority"] = text_priority + text_tags = str(tags or "").strip() + if text_tags: + headers["Tags"] = _encode_http_header(text_tags) + if token: + headers["Authorization"] = f"Bearer {token}" + request = urllib.request.Request( + _ntfy_topic_endpoint(api_base_url, recipient), + data=message.encode("utf-8"), + headers=headers, + method="POST", + ) + if not _request_succeeded(request_opener, request, timeout, printer, recipient): + all_sent = False + return all_sent + + +def _request_succeeded( + request_opener: Any, + request: urllib.request.Request, + timeout: float, + printer, + recipient: str, +) -> bool: + try: + with request_opener(request, timeout=timeout) as response: + status = getattr(response, "status", None) + if status is None: + status = response.getcode() + status = int(status) + except Exception as exc: + printer(f"Push send failed for {recipient}: {exc}", flush=True) + return False + if status < 200 or status >= 300: + printer(f"Push send failed for {recipient}: HTTP {status}", flush=True) + return False + return True + + +def _pushover_messages_endpoint(api_base_url: str) -> str: + base_url = str(api_base_url or DEFAULT_PUSHOVER_API_BASE_URL).rstrip("/") + if base_url.endswith("/1/messages.json"): + return base_url + return f"{base_url}/1/messages.json" + + +def _ntfy_topic_endpoint(api_base_url: str, recipient: str) -> str: + target = str(recipient or "").strip() + if target.startswith(("https://", "http://")): + return target + base_url = str(api_base_url or DEFAULT_NTFY_API_BASE_URL).rstrip("/") + path = "/".join( + urllib.parse.quote(part.strip(), safe="") + for part in target.strip("/").split("/") + if part.strip() + ) + return f"{base_url}/{path}" + + +def _encode_http_header(value: str) -> str: + text = str(value or "") + try: + text.encode("latin-1") + except UnicodeEncodeError: + return Header(text, "utf-8").encode() + return text diff --git a/src/quant_platform_kit/notifications/strategy_plugin_alerts.py b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py index 3d02d64..e22a538 100644 --- a/src/quant_platform_kit/notifications/strategy_plugin_alerts.py +++ b/src/quant_platform_kit/notifications/strategy_plugin_alerts.py @@ -9,6 +9,7 @@ from typing import Any from .email import send_smtp_email +from .push import send_strategy_plugin_push from .sms import send_twilio_sms from .strategy_plugin_email import ( StrategyPluginEmailAlertMarkerStore, @@ -23,11 +24,18 @@ StrategyPluginSmsSettings, publish_strategy_plugin_sms_alerts, ) +from .strategy_plugin_push import ( + StrategyPluginPushAlertMarkerStore, + StrategyPluginPushAlertPublishResult, + StrategyPluginPushSettings, + publish_strategy_plugin_push_alerts, +) _DEFAULT_ALERT_STATE_DIR = "/tmp/quant_strategy_plugin_alerts" _CHANNEL_EMAIL = "email" _CHANNEL_SMS = "sms" -_SUPPORTED_CHANNELS = frozenset({_CHANNEL_EMAIL, _CHANNEL_SMS}) +_CHANNEL_PUSH = "push" +_SUPPORTED_CHANNELS = frozenset({_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH}) @dataclass(frozen=True) @@ -36,6 +44,7 @@ class StrategyPluginAlertChannelStores: email: StrategyPluginEmailAlertMarkerStore | object | None = None sms: StrategyPluginSmsAlertMarkerStore | object | None = None + push: StrategyPluginPushAlertMarkerStore | object | None = None @classmethod def from_mapping( @@ -44,7 +53,11 @@ def from_mapping( ) -> "StrategyPluginAlertChannelStores": if value is None: return cls() - return cls(email=value.get(_CHANNEL_EMAIL), sms=value.get(_CHANNEL_SMS)) + return cls( + email=value.get(_CHANNEL_EMAIL), + sms=value.get(_CHANNEL_SMS), + push=value.get(_CHANNEL_PUSH), + ) @dataclass(frozen=True) @@ -88,6 +101,12 @@ def build_channel_stores(self) -> StrategyPluginAlertChannelStores: gcp_project_id=self.gcp_project_id, client_factory=self.client_factory, ), + push=StrategyPluginPushAlertMarkerStore( + local_dir=self.local_dir, + gcs_prefix_uri=self.gcs_prefix_uri, + gcp_project_id=self.gcp_project_id, + client_factory=self.client_factory, + ), ) @@ -97,6 +116,7 @@ class StrategyPluginAlertPublishResult: email_result: StrategyPluginEmailAlertPublishResult | None = None sms_result: StrategyPluginSmsAlertPublishResult | None = None + push_result: StrategyPluginPushAlertPublishResult | None = None @property def attempted_count(self) -> int: @@ -125,6 +145,8 @@ def to_report_fields(self) -> dict[str, Any]: fields.update(self.email_result.to_report_fields()) if self.sms_result is not None: fields.update(self.sms_result.to_report_fields()) + if self.push_result is not None: + fields.update(self.push_result.to_report_fields()) return fields def to_summary_fields(self) -> dict[str, int]: @@ -135,6 +157,8 @@ def to_summary_fields(self) -> dict[str, int]: fields["strategy_plugin_alert_email_sent_count"] = self.email_result.sent_count if self.sms_result is not None: fields["strategy_plugin_alert_sms_sent_count"] = self.sms_result.sent_count + if self.push_result is not None: + fields["strategy_plugin_alert_push_sent_count"] = self.push_result.sent_count return fields def attach_to_report(self, report: dict[str, Any]) -> None: @@ -143,10 +167,15 @@ def attach_to_report(self, report: dict[str, Any]) -> None: def _results( self, - ) -> tuple[StrategyPluginEmailAlertPublishResult | StrategyPluginSmsAlertPublishResult, ...]: + ) -> tuple[ + StrategyPluginEmailAlertPublishResult + | StrategyPluginSmsAlertPublishResult + | StrategyPluginPushAlertPublishResult, + ..., + ]: return tuple( result - for result in (self.email_result, self.sms_result) + for result in (self.email_result, self.sms_result, self.push_result) if result is not None ) @@ -154,23 +183,30 @@ def _results( def publish_strategy_plugin_alerts( signals: Sequence[object], *, - notification_settings: StrategyPluginEmailSettings | StrategyPluginSmsSettings | object, + notification_settings: ( + StrategyPluginEmailSettings + | StrategyPluginSmsSettings + | StrategyPluginPushSettings + | object + ), translator: Callable[..., str] | None = None, strategy_label: str | None = None, context_label: str | None = None, - channels: Sequence[str] | str = (_CHANNEL_EMAIL, _CHANNEL_SMS), + channels: Sequence[str] | str | None = None, state_settings: StrategyPluginAlertStateSettings | None = None, alert_stores: StrategyPluginAlertChannelStores | Mapping[str, object | None] | None = None, send_email_notification: Callable[..., bool] = send_smtp_email, send_sms_notification: Callable[..., bool] = send_twilio_sms, + send_push_notification: Callable[..., bool] = send_strategy_plugin_push, log_message: Callable[..., Any] = print, ) -> StrategyPluginAlertPublishResult: """Publish strategy plugin alerts through the configured notification channels.""" - selected_channels = _normalize_channels(channels) + selected_channels = _resolve_channels(channels, notification_settings=notification_settings) stores = _resolve_alert_stores(alert_stores=alert_stores, state_settings=state_settings) email_result = None sms_result = None + push_result = None if _CHANNEL_EMAIL in selected_channels: email_result = publish_strategy_plugin_email_alerts( signals, @@ -193,9 +229,21 @@ def publish_strategy_plugin_alerts( send_notification=send_sms_notification, log_message=log_message, ) + if _CHANNEL_PUSH in selected_channels: + push_result = publish_strategy_plugin_push_alerts( + signals, + push_settings=notification_settings, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_store=stores.push, + send_notification=send_push_notification, + log_message=log_message, + ) return StrategyPluginAlertPublishResult( email_result=email_result, sms_result=sms_result, + push_result=push_result, ) @@ -212,7 +260,11 @@ def _resolve_alert_stores( def _normalize_channels(channels: Sequence[str] | str) -> tuple[str, ...]: - raw_channels = (channels,) if isinstance(channels, str) else tuple(channels) + raw_channels = ( + channels.replace(";", ",").replace("\n", ",").split(",") + if isinstance(channels, str) + else tuple(channels) + ) normalized: list[str] = [] for channel in raw_channels: name = str(channel or "").strip().lower() @@ -226,6 +278,25 @@ def _normalize_channels(channels: Sequence[str] | str) -> tuple[str, ...]: return tuple(normalized) +def _resolve_channels( + channels: Sequence[str] | str | None, + *, + notification_settings: object, +) -> tuple[str, ...]: + raw_channels = channels + if raw_channels is None: + raw_channels = _get_value(notification_settings, "crisis_alert_channels", None) + if raw_channels in (None, "", (), []): + raw_channels = (_CHANNEL_EMAIL, _CHANNEL_SMS, _CHANNEL_PUSH) + return _normalize_channels(raw_channels) + + +def _get_value(value: object, name: str, default: Any = None) -> Any: + if isinstance(value, Mapping): + return value.get(name, default) + return getattr(value, name, default) + + __all__ = [ "StrategyPluginAlertChannelStores", "StrategyPluginAlertPublishResult", diff --git a/src/quant_platform_kit/notifications/strategy_plugin_push.py b/src/quant_platform_kit/notifications/strategy_plugin_push.py new file mode 100644 index 0000000..eac408d --- /dev/null +++ b/src/quant_platform_kit/notifications/strategy_plugin_push.py @@ -0,0 +1,420 @@ +"""Push notification helpers for strategy plugin alerts.""" + +from __future__ import annotations + +import json +import tempfile +from collections.abc import Callable, Mapping, Sequence +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from quant_platform_kit.common.strategy_plugins import ( + StrategyPluginAlertMessage, + build_strategy_plugin_alert_messages, +) + +from .push import ( + DEFAULT_NTFY_API_BASE_URL, + DEFAULT_PUSHOVER_API_BASE_URL, + PUSH_PROVIDER_NTFY, + PUSH_PROVIDER_PUSHOVER, + parse_push_recipients, + send_strategy_plugin_push, +) + +_DEFAULT_PUSH_PROVIDER = PUSH_PROVIDER_PUSHOVER +_SUPPORTED_PUSH_PROVIDERS = frozenset({PUSH_PROVIDER_PUSHOVER, PUSH_PROVIDER_NTFY}) +_DEFAULT_PUSH_BODY_MAX_CHARS = 1800 + + +@dataclass(frozen=True) +class StrategyPluginPushSettings: + recipients: tuple[str, ...] = () + provider: str = _DEFAULT_PUSH_PROVIDER + app_token: str | None = field(default=None, repr=False) + access_token: str | None = field(default=None, repr=False) + api_base_url: str | None = None + device: str | None = None + priority: str | None = None + tags: str | None = None + body_max_chars: int = _DEFAULT_PUSH_BODY_MAX_CHARS + timeout: float = 10.0 + + @classmethod + def from_object(cls, value: object) -> "StrategyPluginPushSettings": + if isinstance(value, cls): + return value + provider = ( + _first_non_empty(_get_value(value, "crisis_alert_push_provider")) + or _DEFAULT_PUSH_PROVIDER + ).lower() + return cls( + recipients=tuple( + parse_push_recipients(_get_value(value, "crisis_alert_push_recipients", ())) + ), + provider=provider, + app_token=_first_non_empty(_get_value(value, "crisis_alert_push_app_token")), + access_token=_first_non_empty(_get_value(value, "crisis_alert_push_access_token")), + api_base_url=( + _first_non_empty(_get_value(value, "crisis_alert_push_api_base_url")) + or _default_api_base_url(provider) + ), + device=_first_non_empty(_get_value(value, "crisis_alert_push_device")), + priority=_first_non_empty(_get_value(value, "crisis_alert_push_priority")), + tags=_first_non_empty(_get_value(value, "crisis_alert_push_tags")), + body_max_chars=_coerce_int( + _get_value(value, "crisis_alert_push_body_max_chars"), + _DEFAULT_PUSH_BODY_MAX_CHARS, + ), + ) + + def missing_fields(self) -> tuple[str, ...]: + missing: list[str] = [] + if self.provider not in _SUPPORTED_PUSH_PROVIDERS: + missing.append("CRISIS_ALERT_PUSH_PROVIDER=pushover or ntfy") + if not parse_push_recipients(self.recipients): + missing.append("CRISIS_ALERT_PUSH_RECIPIENTS") + if self.provider == PUSH_PROVIDER_PUSHOVER and not str(self.app_token or "").strip(): + missing.append("CRISIS_ALERT_PUSH_APP_TOKEN") + return tuple(missing) + + @property + def is_configured(self) -> bool: + return not self.missing_fields() + + +@dataclass(frozen=True) +class StrategyPluginPushAlertDelivery: + alert_key: str + subject: str + status: str + reason: str | None = None + error: str | None = None + metadata: Mapping[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + payload = { + "alert_key": self.alert_key, + "subject": self.subject, + "status": self.status, + "reason": self.reason, + "error": self.error, + **dict(self.metadata or {}), + } + return {key: value for key, value in payload.items() if value not in (None, "", (), [])} + + +@dataclass(frozen=True) +class StrategyPluginPushAlertPublishResult: + deliveries: tuple[StrategyPluginPushAlertDelivery, ...] = () + + @property + def attempted_count(self) -> int: + return len(self.deliveries) + + @property + def sent_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "sent") + + @property + def skipped_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "skipped") + + @property + def failed_count(self) -> int: + return sum(1 for delivery in self.deliveries if delivery.status == "failed") + + def to_report_fields(self, *, prefix: str = "strategy_plugin_alert_push") -> dict[str, Any]: + return { + f"{prefix}_attempted_count": self.attempted_count, + f"{prefix}_sent_count": self.sent_count, + f"{prefix}_skipped_count": self.skipped_count, + f"{prefix}_failed_count": self.failed_count, + f"{prefix}_deliveries": [delivery.to_dict() for delivery in self.deliveries], + } + + +@dataclass(frozen=True) +class StrategyPluginPushAlertMarkerStore: + local_dir: str | Path | None = None + gcs_prefix_uri: str | None = None + gcp_project_id: str | None = None + namespace: str = "strategy_plugin_push_alerts" + client_factory: Any = None + + def has_alert(self, alert_key: str) -> bool: + if self.gcs_prefix_uri and self._gcs_blob(alert_key, namespace=self.namespace).exists(): + return True + if self.local_dir and self._local_path(alert_key, namespace=self.namespace).exists(): + return True + return False + + def record_alert( + self, + alert_key: str, + *, + metadata: Mapping[str, Any] | None = None, + ) -> None: + payload = { + "schema_version": "strategy_plugin_push_alert_marker.v1", + "alert_key": str(alert_key), + "recorded_at": datetime.now(timezone.utc).isoformat(), + "metadata": dict(metadata or {}), + } + encoded = json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + if self.gcs_prefix_uri: + self._gcs_blob(alert_key, namespace=self.namespace).upload_from_string( + encoded, + content_type="application/json", + ) + return + if self.local_dir: + path = self._local_path(alert_key, namespace=self.namespace) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(encoded, encoding="utf-8") + + def _local_path(self, alert_key: str, *, namespace: str) -> Path: + root = Path(self.local_dir or tempfile.gettempdir()).expanduser() + return root / namespace / f"{_clean_relative_key(alert_key)}.json" + + def _gcs_blob(self, alert_key: str, *, namespace: str): + bucket_name, prefix = _parse_gcs_uri(str(self.gcs_prefix_uri or "")) + object_name = "/".join( + part.strip("/") + for part in (prefix, namespace, f"{_clean_relative_key(alert_key)}.json") + if part and part.strip("/") + ) + if self.client_factory is None: + try: + from google.cloud import storage # type: ignore + except ImportError as exc: + raise RuntimeError("google-cloud-storage is required for GCS alert markers") from exc + client_factory = storage.Client + else: + client_factory = self.client_factory + client = client_factory(project=self.gcp_project_id) if self.gcp_project_id else client_factory() + return client.bucket(bucket_name).blob(object_name) + + +def publish_strategy_plugin_push_alerts( + signals: Sequence[object], + *, + push_settings: StrategyPluginPushSettings | object, + translator: Callable[..., str] | None = None, + strategy_label: str | None = None, + context_label: str | None = None, + alert_store: StrategyPluginPushAlertMarkerStore | object | None = None, + send_notification: Callable[..., bool] = send_strategy_plugin_push, + log_message: Callable[..., Any] = print, +) -> StrategyPluginPushAlertPublishResult: + settings = StrategyPluginPushSettings.from_object(push_settings) + messages = build_strategy_plugin_alert_messages( + signals, + translator=translator, + strategy_label=strategy_label, + context_label=context_label, + alert_namespace="strategy_plugin_push_alert", + ) + deliveries: list[StrategyPluginPushAlertDelivery] = [] + missing_fields = settings.missing_fields() + if missing_fields: + for message in messages: + deliveries.append( + _delivery( + message, + status="skipped", + reason="missing_push_config", + error=",".join(missing_fields), + ) + ) + result = StrategyPluginPushAlertPublishResult(tuple(deliveries)) + _log_publish_result(result, log_message=log_message) + return result + + for message in messages: + alert_key = message.alert_key or _fallback_alert_key(message) + try: + duplicate = _store_has_alert(alert_store, alert_key) + store_error = None + except Exception as exc: + duplicate = False + store_error = f"alert_store_check_failed:{type(exc).__name__}: {exc}" + if duplicate: + deliveries.append(_delivery(message, status="skipped", reason="duplicate_alert")) + continue + sent, send_error = _send_message(send_notification, message, settings) + if not sent: + deliveries.append(_delivery(message, status="failed", reason="send_failed", error=send_error)) + continue + record_error = _store_record_error(alert_store, alert_key, message) + combined_error = "; ".join(error for error in (store_error, record_error) if error) + deliveries.append(_delivery(message, status="sent", error=combined_error or None)) + result = StrategyPluginPushAlertPublishResult(tuple(deliveries)) + _log_publish_result(result, log_message=log_message) + return result + + +def _delivery( + message: StrategyPluginAlertMessage, + *, + status: str, + reason: str | None = None, + error: str | None = None, +) -> StrategyPluginPushAlertDelivery: + return StrategyPluginPushAlertDelivery( + alert_key=message.alert_key or _fallback_alert_key(message), + subject=message.subject, + status=status, + reason=reason, + error=error, + metadata=message.metadata, + ) + + +def _send_message( + send_notification: Callable[..., bool], + message: StrategyPluginAlertMessage, + settings: StrategyPluginPushSettings, +) -> tuple[bool, str | None]: + try: + sent = send_notification( + provider=settings.provider, + title=message.subject, + body=_build_push_body(message, max_chars=settings.body_max_chars), + recipients=settings.recipients, + app_token=settings.app_token, + access_token=settings.access_token, + api_base_url=settings.api_base_url, + device=settings.device, + priority=settings.priority, + tags=settings.tags, + timeout=settings.timeout, + ) + except Exception as exc: + return False, f"{type(exc).__name__}: {exc}" + return bool(sent), None + + +def _build_push_body(message: StrategyPluginAlertMessage, *, max_chars: int) -> str: + body = str(message.body or "").strip() or str(message.subject or "").strip() + limit = max(80, int(max_chars or _DEFAULT_PUSH_BODY_MAX_CHARS)) + if len(body) <= limit: + return body + return body[: max(0, limit - 3)].rstrip() + "..." + + +def _store_has_alert(alert_store: object | None, alert_key: str) -> bool: + if alert_store is None: + return False + checker = getattr(alert_store, "has_alert", None) + if checker is None: + return False + return bool(checker(alert_key)) + + +def _store_record_error( + alert_store: object | None, + alert_key: str, + message: StrategyPluginAlertMessage, +) -> str | None: + if alert_store is None: + return None + recorder = getattr(alert_store, "record_alert", None) + if recorder is None: + return None + try: + recorder( + alert_key, + metadata={ + "subject": message.subject, + **dict(message.metadata or {}), + }, + ) + except Exception as exc: + return f"alert_store_record_failed:{type(exc).__name__}: {exc}" + return None + + +def _log_publish_result( + result: StrategyPluginPushAlertPublishResult, + *, + log_message: Callable[..., Any], +) -> None: + if result.attempted_count <= 0: + return + _call_log_message( + log_message, + ( + "strategy_plugin_alert_push_result " + f"attempted={result.attempted_count} " + f"sent={result.sent_count} " + f"skipped={result.skipped_count} " + f"failed={result.failed_count}" + ), + ) + + +def _call_log_message(log_message: Callable[..., Any], text: str) -> None: + try: + log_message(text, flush=True) + except TypeError: + log_message(text) + + +def _get_value(value: object, name: str, default: Any = None) -> Any: + if isinstance(value, Mapping): + return value.get(name, default) + return getattr(value, name, default) + + +def _first_non_empty(*values: Any) -> str | None: + for value in values: + text = str(value or "").strip() + if text: + return text + return None + + +def _coerce_int(value: Any, default: int) -> int: + text = str(value or "").strip() + if not text: + return default + try: + return int(text) + except (TypeError, ValueError): + return default + + +def _default_api_base_url(provider: str) -> str: + if provider == PUSH_PROVIDER_NTFY: + return DEFAULT_NTFY_API_BASE_URL + return DEFAULT_PUSHOVER_API_BASE_URL + + +def _fallback_alert_key(message: StrategyPluginAlertMessage) -> str: + return "strategy_plugin_push_alert/" + _clean_relative_key(message.subject or "unknown") + + +def _clean_relative_key(value: str) -> str: + parts = [] + for raw_part in str(value or "").replace("\\", "/").split("/"): + cleaned = "".join( + char if char.isalnum() or char in {"-", "_", "."} else "-" + for char in raw_part.strip() + ).strip("-._") + if cleaned: + parts.append(cleaned[:100]) + return "/".join(parts) or "unknown" + + +def _parse_gcs_uri(uri: str) -> tuple[str, str]: + raw_uri = str(uri or "").strip() + if not raw_uri.startswith("gs://"): + raise ValueError(f"gcs uri must start with gs://, got: {uri!r}") + remainder = raw_uri[5:] + bucket_name, _, object_prefix = remainder.partition("/") + if not bucket_name: + raise ValueError(f"gcs uri must include a bucket name, got: {uri!r}") + return bucket_name, object_prefix.strip("/") diff --git a/tests/test_strategy_plugin_alert_dispatcher.py b/tests/test_strategy_plugin_alert_dispatcher.py index ae0bdc4..263c955 100644 --- a/tests/test_strategy_plugin_alert_dispatcher.py +++ b/tests/test_strategy_plugin_alert_dispatcher.py @@ -7,6 +7,7 @@ publish_strategy_plugin_alerts, ) from quant_platform_kit.notifications.strategy_plugin_email import StrategyPluginEmailSettings +from quant_platform_kit.notifications.strategy_plugin_push import StrategyPluginPushSettings from quant_platform_kit.notifications.strategy_plugin_sms import StrategyPluginSmsSettings @@ -30,12 +31,16 @@ class _NotificationSettings: crisis_alert_sms_account_id = "AC123" crisis_alert_sms_auth_token = "secret" crisis_alert_sms_sender = "+15551234567" + crisis_alert_push_provider = "ntfy" + crisis_alert_push_recipients = "risk-topic" + crisis_alert_push_priority = "5" class StrategyPluginAlertDispatcherTests(unittest.TestCase): def test_publish_strategy_plugin_alerts_dispatches_enabled_channels(self): emails = [] sms_messages = [] + push_messages = [] with tempfile.TemporaryDirectory() as tmp_dir: result = publish_strategy_plugin_alerts( @@ -46,17 +51,21 @@ def test_publish_strategy_plugin_alerts_dispatches_enabled_channels(self): state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), send_email_notification=lambda **kwargs: emails.append(kwargs) or True, send_sms_notification=lambda **kwargs: sms_messages.append(kwargs) or True, + send_push_notification=lambda **kwargs: push_messages.append(kwargs) or True, log_message=lambda *_args, **_kwargs: None, ) - self.assertEqual(result.sent_count, 2) + self.assertEqual(result.sent_count, 3) self.assertEqual(result.failed_count, 0) self.assertIsNotNone(result.email_result) self.assertIsNotNone(result.sms_result) + self.assertIsNotNone(result.push_result) self.assertEqual(result.email_result.sent_count, 1) self.assertEqual(result.sms_result.sent_count, 1) + self.assertEqual(result.push_result.sent_count, 1) self.assertEqual(emails[0]["recipients"], ("risk@example.com",)) self.assertEqual(sms_messages[0]["recipients"], ("+15165480265",)) + self.assertEqual(push_messages[0]["recipients"], ("risk-topic",)) def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(self): settings = _NotificationSettings() @@ -71,6 +80,7 @@ def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(sel state_settings=state_settings, send_email_notification=lambda **_kwargs: True, send_sms_notification=lambda **_kwargs: True, + send_push_notification=lambda **_kwargs: True, log_message=lambda *_args, **_kwargs: None, ) second = publish_strategy_plugin_alerts( @@ -81,16 +91,19 @@ def test_publish_strategy_plugin_alerts_records_channel_dedupe_independently(sel state_settings=state_settings, send_email_notification=lambda **_kwargs: True, send_sms_notification=lambda **_kwargs: True, + send_push_notification=lambda **_kwargs: True, log_message=lambda *_args, **_kwargs: None, ) - self.assertEqual(first.sent_count, 2) + self.assertEqual(first.sent_count, 3) self.assertEqual(second.sent_count, 0) - self.assertEqual(second.skipped_count, 2) + self.assertEqual(second.skipped_count, 3) self.assertIsNotNone(second.email_result) self.assertEqual(second.email_result.deliveries[0].reason, "duplicate_alert") self.assertIsNotNone(second.sms_result) self.assertEqual(second.sms_result.deliveries[0].reason, "duplicate_alert") + self.assertIsNotNone(second.push_result) + self.assertEqual(second.push_result.deliveries[0].reason, "duplicate_alert") def test_publish_strategy_plugin_alerts_can_target_one_channel(self): sms_messages = [] @@ -118,6 +131,59 @@ def test_publish_strategy_plugin_alerts_can_target_one_channel(self): self.assertEqual(result.sent_count, 1) self.assertTrue(sms_messages) + def test_publish_strategy_plugin_alerts_can_target_push_channel(self): + push_messages = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=StrategyPluginPushSettings( + provider="ntfy", + recipients=("risk-topic",), + priority="5", + ), + channels=("push",), + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **_kwargs: self.fail("email should not run"), + send_sms_notification=lambda **_kwargs: self.fail("sms should not run"), + send_push_notification=lambda **kwargs: push_messages.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertIsNone(result.email_result) + self.assertIsNone(result.sms_result) + self.assertIsNotNone(result.push_result) + self.assertEqual(result.sent_count, 1) + self.assertEqual(push_messages[0]["provider"], "ntfy") + + def test_publish_strategy_plugin_alerts_reads_channels_from_settings(self): + settings = _NotificationSettings() + settings.crisis_alert_channels = "email,push" + emails = [] + push_messages = [] + + with tempfile.TemporaryDirectory() as tmp_dir: + result = publish_strategy_plugin_alerts( + [_alert_signal()], + notification_settings=settings, + strategy_label="TQQQ", + context_label="schwab / tqqq", + state_settings=StrategyPluginAlertStateSettings(local_dir=tmp_dir), + send_email_notification=lambda **kwargs: emails.append(kwargs) or True, + send_sms_notification=lambda **_kwargs: self.fail("sms should not run"), + send_push_notification=lambda **kwargs: push_messages.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + self.assertEqual(result.sent_count, 2) + self.assertIsNotNone(result.email_result) + self.assertIsNone(result.sms_result) + self.assertIsNotNone(result.push_result) + self.assertEqual(len(emails), 1) + self.assertEqual(len(push_messages), 1) + def test_publish_strategy_plugin_alerts_attach_to_report(self): with tempfile.TemporaryDirectory() as tmp_dir: result = publish_strategy_plugin_alerts( diff --git a/tests/test_strategy_plugin_push_notifications.py b/tests/test_strategy_plugin_push_notifications.py new file mode 100644 index 0000000..5f0483c --- /dev/null +++ b/tests/test_strategy_plugin_push_notifications.py @@ -0,0 +1,235 @@ +import urllib.parse +from types import SimpleNamespace + +from quant_platform_kit.notifications.push import ( + parse_push_recipients, + send_ntfy_push, + send_pushover_push, +) +from quant_platform_kit.notifications.strategy_plugin_push import ( + StrategyPluginPushAlertMarkerStore, + StrategyPluginPushSettings, + publish_strategy_plugin_push_alerts, +) + + +def test_parse_push_recipients_splits_and_deduplicates(): + assert parse_push_recipients("topic-a; topic-b,topic-a\nhttps://ntfy.example/risk") == ( + "topic-a", + "topic-b", + "https://ntfy.example/risk", + ) + + +def test_send_pushover_push_uses_configured_http_request(): + observed = {} + + class FakeResponse: + status = 200 + + def __enter__(self): + return self + + def __exit__(self, *_args): + return None + + def fake_open(request, timeout): + observed["url"] = request.full_url + observed["timeout"] = timeout + observed["headers"] = dict(request.header_items()) + observed["body"] = urllib.parse.parse_qs(request.data.decode("utf-8")) + return FakeResponse() + + assert send_pushover_push( + title="Crisis alert", + body="危机插件告警", + recipients=("user-key",), + app_token="app-token", + api_base_url="https://pushover.example.test", + device="iphone", + priority=1, + timeout=3.0, + opener=fake_open, + printer=lambda *_args, **_kwargs: None, + ) + + assert observed["url"] == "https://pushover.example.test/1/messages.json" + assert observed["timeout"] == 3.0 + assert observed["headers"]["Content-type"] == "application/x-www-form-urlencoded" + assert observed["body"] == { + "token": ["app-token"], + "user": ["user-key"], + "title": ["Crisis alert"], + "message": ["危机插件告警"], + "device": ["iphone"], + "priority": ["1"], + } + + +def test_send_ntfy_push_uses_configured_http_request_and_encodes_chinese_title(): + observed = {} + + class FakeResponse: + status = 200 + + def __enter__(self): + return self + + def __exit__(self, *_args): + return None + + def fake_open(request, timeout): + observed["url"] = request.full_url + observed["timeout"] = timeout + observed["headers"] = dict(request.header_items()) + observed["body"] = request.data.decode("utf-8") + return FakeResponse() + + assert send_ntfy_push( + title="危机插件告警", + body="TQQQ 防守", + recipients=("risk/topic",), + access_token="access-token", + api_base_url="https://ntfy.example.test", + priority=5, + tags="warning", + timeout=4.0, + opener=fake_open, + printer=lambda *_args, **_kwargs: None, + ) + + assert observed["url"] == "https://ntfy.example.test/risk/topic" + assert observed["timeout"] == 4.0 + assert observed["headers"]["Content-type"] == "text/plain; charset=utf-8" + assert observed["headers"]["Title"].startswith("=?utf-8?") + assert observed["headers"]["Priority"] == "5" + assert observed["headers"]["Tags"] == "warning" + assert observed["headers"]["Authorization"] == "Bearer access-token" + assert observed["body"] == "TQQQ 防守" + + +def _alert_signal(): + return SimpleNamespace( + strategy="tqqq_growth_income", + plugin="crisis_response_shadow", + effective_mode="shadow", + as_of="2026-05-24", + canonical_route="true_crisis", + suggested_action="defend", + would_trade_if_enabled=True, + ) + + +def test_publish_strategy_plugin_push_alerts_skips_missing_config(): + observed = [] + + result = publish_strategy_plugin_push_alerts( + [_alert_signal()], + push_settings=StrategyPluginPushSettings(), + strategy_label="TQQQ", + context_label="ibkr / paper / tqqq", + send_notification=lambda **_kwargs: observed.append(_kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + assert result.sent_count == 0 + assert result.skipped_count == 1 + assert result.deliveries[0].reason == "missing_push_config" + assert "CRISIS_ALERT_PUSH_RECIPIENTS" in result.deliveries[0].error + assert "CRISIS_ALERT_PUSH_APP_TOKEN" in result.deliveries[0].error + assert observed == [] + + +def test_publish_strategy_plugin_push_alerts_sends_and_records_marker(tmp_path): + observed = [] + store = StrategyPluginPushAlertMarkerStore(local_dir=tmp_path) + + result = publish_strategy_plugin_push_alerts( + [_alert_signal()], + push_settings=StrategyPluginPushSettings( + provider="ntfy", + recipients=("risk-topic",), + priority="5", + ), + strategy_label="TQQQ", + context_label="ibkr / paper / tqqq", + alert_store=store, + send_notification=lambda **kwargs: observed.append(kwargs) or True, + log_message=lambda *_args, **_kwargs: None, + ) + + assert result.sent_count == 1 + assert result.failed_count == 0 + assert result.deliveries[0].alert_key + assert observed[0]["provider"] == "ntfy" + assert observed[0]["recipients"] == ("risk-topic",) + assert observed[0]["priority"] == "5" + assert "Strategy plugin alert" in observed[0]["title"] + assert store.has_alert(result.deliveries[0].alert_key) + + +def test_publish_strategy_plugin_push_alerts_skips_duplicate_marker(tmp_path): + store = StrategyPluginPushAlertMarkerStore(local_dir=tmp_path) + settings = StrategyPluginPushSettings(provider="ntfy", recipients=("risk-topic",)) + first = publish_strategy_plugin_push_alerts( + [_alert_signal()], + push_settings=settings, + strategy_label="TQQQ", + context_label="ibkr / paper / tqqq", + alert_store=store, + send_notification=lambda **_kwargs: True, + log_message=lambda *_args, **_kwargs: None, + ) + + second = publish_strategy_plugin_push_alerts( + [_alert_signal()], + push_settings=settings, + strategy_label="TQQQ", + context_label="ibkr / paper / tqqq", + alert_store=store, + send_notification=lambda **_kwargs: True, + log_message=lambda *_args, **_kwargs: None, + ) + + assert first.sent_count == 1 + assert second.sent_count == 0 + assert second.skipped_count == 1 + assert second.deliveries[0].reason == "duplicate_alert" + + +def test_push_settings_reads_pushover_and_ntfy_config_from_object(): + pushover = StrategyPluginPushSettings.from_object( + SimpleNamespace( + crisis_alert_push_recipients="user-key", + crisis_alert_push_provider="pushover", + crisis_alert_push_app_token="app-token", + crisis_alert_push_device="iphone", + crisis_alert_push_priority="1", + ) + ) + ntfy = StrategyPluginPushSettings.from_object( + SimpleNamespace( + crisis_alert_push_recipients="risk-topic", + crisis_alert_push_provider="ntfy", + crisis_alert_push_access_token="access-token", + crisis_alert_push_api_base_url="https://ntfy.example.test", + crisis_alert_push_priority="5", + crisis_alert_push_tags="warning", + crisis_alert_push_body_max_chars="300", + ) + ) + + assert pushover.recipients == ("user-key",) + assert pushover.provider == "pushover" + assert pushover.app_token == "app-token" + assert pushover.device == "iphone" + assert pushover.priority == "1" + assert pushover.missing_fields() == () + assert ntfy.recipients == ("risk-topic",) + assert ntfy.provider == "ntfy" + assert ntfy.access_token == "access-token" + assert ntfy.api_base_url == "https://ntfy.example.test" + assert ntfy.priority == "5" + assert ntfy.tags == "warning" + assert ntfy.body_max_chars == 300 + assert ntfy.missing_fields() == () diff --git a/tests/test_strategy_plugins.py b/tests/test_strategy_plugins.py index 2400ccd..9c8bee1 100644 --- a/tests/test_strategy_plugins.py +++ b/tests/test_strategy_plugins.py @@ -9,6 +9,8 @@ PLUGIN_CRISIS_RESPONSE_SHADOW, PLUGIN_MODE_SHADOW, STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, + STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, StrategyPluginDefinition, build_strategy_plugin_alert_messages, build_strategy_plugin_notification_lines, @@ -88,7 +90,14 @@ def test_default_plugin_definition_limits_crisis_response_to_supported_strategie definition = DEFAULT_STRATEGY_PLUGIN_DEFINITIONS[PLUGIN_CRISIS_RESPONSE_SHADOW] self.assertEqual(definition.supported_strategies, CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES) - self.assertEqual(definition.alert_channels, (STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL,)) + self.assertEqual( + definition.alert_channels, + ( + STRATEGY_PLUGIN_ALERT_CHANNEL_EMAIL, + STRATEGY_PLUGIN_ALERT_CHANNEL_SMS, + STRATEGY_PLUGIN_ALERT_CHANNEL_PUSH, + ), + ) validate_strategy_plugin_compatibility( strategy="tqqq_growth_income", plugin=PLUGIN_CRISIS_RESPONSE_SHADOW,