diff --git a/main.py b/main.py index 83bf60a..b46acc0 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,7 @@ from flask import Flask import google.auth +import requests from application.runtime_broker_adapters import build_runtime_broker_adapters from application.runtime_composer import build_runtime_composer from application.rebalance_service import run_strategy as run_rebalance_cycle @@ -110,6 +111,14 @@ def t(key, **kwargs): return build_translator(NOTIFY_LANG)(key, **kwargs) +def _split_env_list(value: str | None) -> tuple[str, ...]: + return tuple( + item.strip() + for item in str(value or "").replace(";", ",").split(",") + if item.strip() + ) + + signal_text = build_signal_text(t) strategy_display_name = build_strategy_display_name(t)( STRATEGY_PROFILE, @@ -235,6 +244,79 @@ def build_strategy_plugin_alert_messages(signals): return STRATEGY_ADAPTERS.build_strategy_plugin_alert_messages(signals) +def _runtime_error_notification_targets() -> tuple[tuple[str, str], ...]: + targets: list[tuple[str, str]] = [] + if TG_TOKEN and TG_CHAT_ID: + targets.append((TG_TOKEN, TG_CHAT_ID)) + crisis_token = os.getenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + for chat_id in _split_env_list(os.getenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS")): + if crisis_token and chat_id: + targets.append((crisis_token, chat_id)) + + seen: set[tuple[str, str]] = set() + unique_targets: list[tuple[str, str]] = [] + for target in targets: + if target in seen: + continue + seen.add(target) + unique_targets.append(target) + return tuple(unique_targets) + + +def _runtime_error_notification_message(exc: Exception, *, route_label: str) -> str: + error_text = f"{type(exc).__name__}: {exc}" + if len(error_text) > 1200: + error_text = error_text[:1197] + "..." + return "\n".join( + ( + "LongBridge strategy run failed", + f"service: {os.getenv('K_SERVICE') or SECRET_NAME or 'longbridge-platform'}", + f"revision: {os.getenv('K_REVISION') or ''}", + f"route: {route_label}", + f"strategy: {STRATEGY_PROFILE}", + f"account_scope: {ACCOUNT_REGION}", + f"error: {error_text}", + ) + ) + + +def _notify_runtime_error(exc: Exception, *, route_label: str) -> bool: + targets = _runtime_error_notification_targets() + if not targets: + print("LongBridge runtime error notification skipped: no Telegram target configured.", flush=True) + return False + message = _runtime_error_notification_message(exc, route_label=route_label) + for token, chat_id in targets: + try: + requests.post( + f"https://api.telegram.org/bot{token}/sendMessage", + json={"chat_id": chat_id, "text": message}, + timeout=10, + ) + except Exception as send_exc: + print(f"LongBridge runtime error Telegram send failed: {send_exc}", flush=True) + return True + + +def _handle_route_runtime_error(exc: Exception, *, route_label: str): + print(f"LongBridge route failed before strategy-cycle handling: {type(exc).__name__}: {exc}", flush=True) + traceback.print_exc() + _notify_runtime_error(exc, route_label=route_label) + return "Error", 500 + + +def _route_with_runtime_error_fallback(handler, *, success_body: str, route_label: str, **kwargs): + try: + result = handler(**kwargs) + except Exception as exc: + return _handle_route_runtime_error(exc, route_label=route_label) + if result is False: + return "Error", 500 + if isinstance(result, tuple): + return result + return success_body, 200 + + def build_strategy_plugin_alert_state_settings(): return StrategyPluginAlertStateSettings.from_env( gcp_project_id=PROJECT_ID, @@ -321,7 +403,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali }, ) print(composer.with_prefix("Outside market hours; skip."), flush=True) - return + return True if force_run and not market_open: reporting_adapters.log_event( log_context, @@ -366,6 +448,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali "strategy_cycle_completed", message="Strategy execution completed", ) + return True except Exception as exc: append_runtime_report_error( @@ -388,6 +471,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali detailed_text=f"Strategy error:\n{err}", compact_text=f"{t('error_title')}\n{err}", ) + return False finally: try: report_path = reporting_adapters.persist_execution_report(report) @@ -487,28 +571,46 @@ def run_probe(*, response_body: str = "Probe OK"): @app.route("/", methods=["POST", "GET"]) def handle_trigger(): """Entrypoint for Cloud Run / scheduler: run strategy and return 200.""" - run_strategy() - return "OK", 200 + return _route_with_runtime_error_fallback( + run_strategy, + success_body="OK", + route_label="POST /", + ) @app.route("/backfill", methods=["POST", "GET"]) def handle_backfill(): """Manual backfill entrypoint for verification-only execution.""" - run_strategy(force_run=True, validation_only=True) - return "OK", 200 + return _route_with_runtime_error_fallback( + run_strategy, + force_run=True, + validation_only=True, + success_body="OK", + route_label="POST /backfill", + ) @app.route("/precheck", methods=["POST", "GET"]) def handle_precheck(): """Pre-market / post-open verification entrypoint for dry-run only execution.""" - run_strategy(force_run=True, validation_only=True, validation_label="precheck") - return "Precheck OK", 200 + return _route_with_runtime_error_fallback( + run_strategy, + force_run=True, + validation_only=True, + validation_label="precheck", + success_body="Precheck OK", + route_label="POST /precheck", + ) @app.route("/probe", methods=["POST", "GET"]) def handle_probe(): """Post-open broker/account health probe; notify only on failure.""" - return run_probe() + return _route_with_runtime_error_fallback( + run_probe, + success_body="Probe OK", + route_label="POST /probe", + ) if __name__ == "__main__": diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index ea02913..1b64931 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -216,6 +216,42 @@ def fake_run_strategy(): self.assertEqual(body, "OK",) self.assertTrue(observed["called"]) + def test_handle_trigger_returns_500_when_strategy_reports_failure(self): + module = load_module() + module.run_strategy = lambda: False + + with module.app.test_request_context("/", method="POST"): + body, status = module.handle_trigger() + + self.assertEqual(status, 500) + self.assertEqual(body, "Error") + + def test_handle_trigger_runtime_error_fallback_sends_telegram(self): + module = load_module() + observed = {"payloads": []} + + class FakeResponse: + status_code = 200 + + def fake_post(_url, *, json, timeout): + observed["payloads"].append((json, timeout)) + return FakeResponse() + + module.TG_TOKEN = "token-1" + module.TG_CHAT_ID = "chat-1" + module.requests.post = fake_post + module.run_strategy = lambda: (_ for _ in ()).throw(RuntimeError("boom")) + + with module.app.test_request_context("/", method="POST"): + body, status = module.handle_trigger() + + self.assertEqual(status, 500) + self.assertEqual(body, "Error") + self.assertEqual(len(observed["payloads"]), 1) + self.assertEqual(observed["payloads"][0][0]["chat_id"], "chat-1") + self.assertIn("LongBridge strategy run failed", observed["payloads"][0][0]["text"]) + self.assertIn("RuntimeError: boom", observed["payloads"][0][0]["text"]) + def test_handle_trigger_allows_get(self): module = load_module() observed = {"called": False}