Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 110 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from flask import Flask

import google.auth
import requests
from application.runtime_broker_adapters import build_runtime_broker_adapters
from application.runtime_composer import build_runtime_composer
from application.rebalance_service import run_strategy as run_rebalance_cycle
Expand Down Expand Up @@ -110,6 +111,14 @@ def t(key, **kwargs):
return build_translator(NOTIFY_LANG)(key, **kwargs)


def _split_env_list(value: str | None) -> tuple[str, ...]:
return tuple(
item.strip()
for item in str(value or "").replace(";", ",").split(",")
if item.strip()
)


signal_text = build_signal_text(t)
strategy_display_name = build_strategy_display_name(t)(
STRATEGY_PROFILE,
Expand Down Expand Up @@ -235,6 +244,79 @@ def build_strategy_plugin_alert_messages(signals):
return STRATEGY_ADAPTERS.build_strategy_plugin_alert_messages(signals)


def _runtime_error_notification_targets() -> tuple[tuple[str, str], ...]:
targets: list[tuple[str, str]] = []
if TG_TOKEN and TG_CHAT_ID:
targets.append((TG_TOKEN, TG_CHAT_ID))
crisis_token = os.getenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN")
for chat_id in _split_env_list(os.getenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS")):
if crisis_token and chat_id:
targets.append((crisis_token, chat_id))

seen: set[tuple[str, str]] = set()
unique_targets: list[tuple[str, str]] = []
for target in targets:
if target in seen:
continue
seen.add(target)
unique_targets.append(target)
return tuple(unique_targets)


def _runtime_error_notification_message(exc: Exception, *, route_label: str) -> str:
error_text = f"{type(exc).__name__}: {exc}"
if len(error_text) > 1200:
error_text = error_text[:1197] + "..."
return "\n".join(
(
"LongBridge strategy run failed",
f"service: {os.getenv('K_SERVICE') or SECRET_NAME or 'longbridge-platform'}",
f"revision: {os.getenv('K_REVISION') or '<unknown>'}",
f"route: {route_label}",
f"strategy: {STRATEGY_PROFILE}",
f"account_scope: {ACCOUNT_REGION}",
f"error: {error_text}",
)
)


def _notify_runtime_error(exc: Exception, *, route_label: str) -> bool:
targets = _runtime_error_notification_targets()
if not targets:
print("LongBridge runtime error notification skipped: no Telegram target configured.", flush=True)
return False
message = _runtime_error_notification_message(exc, route_label=route_label)
for token, chat_id in targets:
try:
requests.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={"chat_id": chat_id, "text": message},
timeout=10,
)
except Exception as send_exc:
print(f"LongBridge runtime error Telegram send failed: {send_exc}", flush=True)
return True


def _handle_route_runtime_error(exc: Exception, *, route_label: str):
print(f"LongBridge route failed before strategy-cycle handling: {type(exc).__name__}: {exc}", flush=True)
traceback.print_exc()
_notify_runtime_error(exc, route_label=route_label)
return "Error", 500


def _route_with_runtime_error_fallback(handler, *, success_body: str, route_label: str, **kwargs):
try:
result = handler(**kwargs)
except Exception as exc:
return _handle_route_runtime_error(exc, route_label=route_label)
if result is False:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep handled strategy failures acknowledged

When run_strategy() catches a strategy_cycle exception it now returns False, and this branch converts that handled failure into an HTTP 500 for the Cloud Scheduler-backed /, /backfill, and /precheck routes. In scheduler jobs with retries enabled, a broker/strategy error that may have occurred after submitting part of an order cycle is now treated as an unacknowledged request and can rerun the same cycle; previously these handled failures were notified via publish_cycle_notification but the route still acknowledged the trigger. Reserve the 500/runtime fallback for exceptions that escape before the strategy-cycle handler runs.

Useful? React with 👍 / 👎.

return "Error", 500
if isinstance(result, tuple):
return result
return success_body, 200


def build_strategy_plugin_alert_state_settings():
return StrategyPluginAlertStateSettings.from_env(
gcp_project_id=PROJECT_ID,
Expand Down Expand Up @@ -321,7 +403,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
},
)
print(composer.with_prefix("Outside market hours; skip."), flush=True)
return
return True
if force_run and not market_open:
reporting_adapters.log_event(
log_context,
Expand Down Expand Up @@ -366,6 +448,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
"strategy_cycle_completed",
message="Strategy execution completed",
)
return True

except Exception as exc:
append_runtime_report_error(
Expand All @@ -388,6 +471,7 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
detailed_text=f"Strategy error:\n{err}",
compact_text=f"{t('error_title')}\n{err}",
)
return False
finally:
try:
report_path = reporting_adapters.persist_execution_report(report)
Expand Down Expand Up @@ -487,28 +571,46 @@ def run_probe(*, response_body: str = "Probe OK"):
@app.route("/", methods=["POST", "GET"])
def handle_trigger():
"""Entrypoint for Cloud Run / scheduler: run strategy and return 200."""
run_strategy()
return "OK", 200
return _route_with_runtime_error_fallback(
run_strategy,
success_body="OK",
route_label="POST /",
)


@app.route("/backfill", methods=["POST", "GET"])
def handle_backfill():
"""Manual backfill entrypoint for verification-only execution."""
run_strategy(force_run=True, validation_only=True)
return "OK", 200
return _route_with_runtime_error_fallback(
run_strategy,
force_run=True,
validation_only=True,
success_body="OK",
route_label="POST /backfill",
)


@app.route("/precheck", methods=["POST", "GET"])
def handle_precheck():
"""Pre-market / post-open verification entrypoint for dry-run only execution."""
run_strategy(force_run=True, validation_only=True, validation_label="precheck")
return "Precheck OK", 200
return _route_with_runtime_error_fallback(
run_strategy,
force_run=True,
validation_only=True,
validation_label="precheck",
success_body="Precheck OK",
route_label="POST /precheck",
)


@app.route("/probe", methods=["POST", "GET"])
def handle_probe():
"""Post-open broker/account health probe; notify only on failure."""
return run_probe()
return _route_with_runtime_error_fallback(
run_probe,
success_body="Probe OK",
route_label="POST /probe",
)


if __name__ == "__main__":
Expand Down
36 changes: 36 additions & 0 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,42 @@ def fake_run_strategy():
self.assertEqual(body, "OK",)
self.assertTrue(observed["called"])

def test_handle_trigger_returns_500_when_strategy_reports_failure(self):
module = load_module()
module.run_strategy = lambda: False

with module.app.test_request_context("/", method="POST"):
body, status = module.handle_trigger()

self.assertEqual(status, 500)
self.assertEqual(body, "Error")

def test_handle_trigger_runtime_error_fallback_sends_telegram(self):
module = load_module()
observed = {"payloads": []}

class FakeResponse:
status_code = 200

def fake_post(_url, *, json, timeout):
observed["payloads"].append((json, timeout))
return FakeResponse()

module.TG_TOKEN = "token-1"
module.TG_CHAT_ID = "chat-1"
module.requests.post = fake_post
module.run_strategy = lambda: (_ for _ in ()).throw(RuntimeError("boom"))

with module.app.test_request_context("/", method="POST"):
body, status = module.handle_trigger()

self.assertEqual(status, 500)
self.assertEqual(body, "Error")
self.assertEqual(len(observed["payloads"]), 1)
self.assertEqual(observed["payloads"][0][0]["chat_id"], "chat-1")
self.assertIn("LongBridge strategy run failed", observed["payloads"][0][0]["text"])
self.assertIn("RuntimeError: boom", observed["payloads"][0][0]["text"])

def test_handle_trigger_allows_get(self):
module = load_module()
observed = {"called": False}
Expand Down