From 202c18cb4649ef14b033594002d29da30fb6093b Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 2 Jun 2026 18:13:09 +0800 Subject: [PATCH] Add Cloud Run runtime guard alerts --- .github/workflows/runtime-guard.yml | 80 +++++++ README.md | 56 +++++ scripts/cloud_run_runtime_guard.py | 309 ++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 .github/workflows/runtime-guard.yml create mode 100644 scripts/cloud_run_runtime_guard.py diff --git a/.github/workflows/runtime-guard.yml b/.github/workflows/runtime-guard.yml new file mode 100644 index 0000000..61559c2 --- /dev/null +++ b/.github/workflows/runtime-guard.yml @@ -0,0 +1,80 @@ +name: Runtime Guard + +on: + workflow_dispatch: + inputs: + lookback_minutes: + description: "Cloud Logging lookback window in minutes." + required: false + type: string + default: "180" + require_success: + description: "Alert if no successful Cloud Run request exists in the lookback window." + required: false + type: choice + default: "false" + options: + - "false" + - "true" + fail_workflow_on_alert: + description: "Fail this workflow when an alert is emitted." + required: false + type: choice + default: "true" + options: + - "true" + - "false" + schedule: + - cron: "29,59 * * * *" + +env: + GCP_PROJECT_ID: longbridgequant + GCP_WORKLOAD_IDENTITY_PROVIDER: projects/252919773759/locations/global/workloadIdentityPools/github-actions/providers/github-main + GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT: longbridge-platform-deploy@longbridgequant.iam.gserviceaccount.com + +jobs: + guard: + name: Check ${{ matrix.target.label }} Cloud Run runtime + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + target: + - label: PAPER + environment: longbridge-paper + - label: HK + environment: longbridge-hk + - label: SG + environment: longbridge-sg + permissions: + contents: read + id-token: write + environment: ${{ matrix.target.environment }} + env: + RUNTIME_GUARD_NAME: LongBridgePlatform ${{ matrix.target.label }} + RUNTIME_GUARD_CLOUD_RUN_SERVICES: ${{ vars.RUNTIME_GUARD_CLOUD_RUN_SERVICES }} + RUNTIME_GUARD_LOOKBACK_MINUTES: ${{ inputs.lookback_minutes || vars.RUNTIME_GUARD_LOOKBACK_MINUTES || '180' }} + RUNTIME_GUARD_REQUIRE_SUCCESS: ${{ inputs.require_success || vars.RUNTIME_GUARD_REQUIRE_SUCCESS || 'false' }} + RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT || 'true' }} + RUNTIME_GUARD_SCHEDULER_JOB_PATTERN: ${{ vars.RUNTIME_GUARD_SCHEDULER_JOB_PATTERN || vars.CLOUD_RUN_SERVICE }} + CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} + GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }} + CRISIS_ALERT_TELEGRAM_CHAT_IDS: ${{ vars.CRISIS_ALERT_TELEGRAM_CHAT_IDS }} + CRISIS_ALERT_TELEGRAM_API_BASE_URL: ${{ vars.CRISIS_ALERT_TELEGRAM_API_BASE_URL }} + TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }} + CRISIS_ALERT_TELEGRAM_BOT_TOKEN: ${{ secrets.CRISIS_ALERT_TELEGRAM_BOT_TOKEN }} + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v3 + with: + workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ env.GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT }} + + - name: Set up gcloud + uses: google-github-actions/setup-gcloud@v3 + + - name: Check Cloud Scheduler and Cloud Run logs + run: python scripts/cloud_run_runtime_guard.py diff --git a/README.md b/README.md index 6b79ff6..1e60b4b 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,38 @@ Important: - Here "shared" only means **shared inside this repository** between the `paper`, `HK`, and `SG` Cloud Run services. The Telegram token can still be shared, but LongPort app credentials should live in Secret Manager and be referenced by per-environment secret-name variables; they are not meant to be a global secret set reused by unrelated quant repos. - If you want one cross-project shared layer across multiple quant repos, keep it small: `GLOBAL_TELEGRAM_CHAT_ID`, `NOTIFY_LANG`, `CRISIS_ALERT_CHANNELS`, and shared crisis alert settings under `CRISIS_ALERT_EMAIL_*`/`CRISIS_ALERT_PUSH_*` are reasonable when the same alert policy applies; account credentials, deployment keys, and alert secrets are not. +### Runtime guard alerting + +`.github/workflows/runtime-guard.yml` is a second notification layer for failures +outside the LongBridge Flask handler. It runs once per GitHub Environment +(`longbridge-paper`, `longbridge-hk`, and `longbridge-sg`), reads Cloud Logging +for recent Cloud Scheduler errors and Cloud Run request/runtime failures, then +sends Telegram directly through `CRISIS_ALERT_TELEGRAM_BOT_TOKEN` + +`CRISIS_ALERT_TELEGRAM_CHAT_IDS` or the fallback `TELEGRAM_TOKEN` + +`GLOBAL_TELEGRAM_CHAT_ID`. + +The guard does not invoke Cloud Run trading routes. It is meant to catch cases +where Scheduler cannot reach the service, OIDC/IAM/audience is wrong, Cloud Run +returns 4xx/5xx, or the container fails before app-level Telegram fallback code +can run. + +Required setup: + +- keep each Environment's `CLOUD_RUN_SERVICE` set, or set + `RUNTIME_GUARD_CLOUD_RUN_SERVICES` +- grant the GitHub deploy service account `roles/logging.viewer` on + `longbridgequant` +- keep Telegram chat/token variables or secrets configured in GitHub +- optionally set `RUNTIME_GUARD_SCHEDULER_JOB_PATTERN` per Environment; by + default the workflow filters Scheduler logs by that Environment's + `CLOUD_RUN_SERVICE` + +The scheduled guard runs every 30 minutes. For a missed-run heartbeat, set +`RUNTIME_GUARD_REQUIRE_SUCCESS=true` and choose +`RUNTIME_GUARD_LOOKBACK_MINUTES` so the window covers the expected Scheduler run +for that Environment. The default leaves the heartbeat check off to avoid false +alerts outside active market windows. + ### Deployment unit and naming - `QuantPlatformKit` is only a shared dependency; Cloud Run still deploys `LongBridgePlatform` itself. @@ -379,6 +411,30 @@ Secret Manager 中需存在 `LONGPORT_SECRET_NAME` 指定的密钥(默认: `lo - 这里的“共享”只是指 **同一个仓库里的 paper / HK / SG 服务共享**。Telegram token 可以继续共用,但 LongPort app 凭据建议放到 Secret Manager,并通过各自 Environment 里的 secret-name 变量引用,不建议把它们当成所有 quant 共用的全局 secrets。 - 如果你真的要在多个 quant 仓库之间保留一层全局共享,建议只保留 `GLOBAL_TELEGRAM_CHAT_ID`、`NOTIFY_LANG`、`CRISIS_ALERT_CHANNELS`,以及同一套危机告警策略下的 `CRISIS_ALERT_EMAIL_*`/`CRISIS_ALERT_PUSH_*` 这种低耦合配置。账户凭据、部署 key 和告警 secret 不要做成全局共享。 +### Runtime Guard 告警 + +`.github/workflows/runtime-guard.yml` 是 LongBridge Flask handler 之外的第二层通知。它按 +GitHub Environment 分别运行一次(`longbridge-paper`、`longbridge-hk`、`longbridge-sg`), +只读取 Cloud Logging 中最近的 Cloud Scheduler 错误和 Cloud Run 请求/运行失败,然后直接通过 +`CRISIS_ALERT_TELEGRAM_BOT_TOKEN` + `CRISIS_ALERT_TELEGRAM_CHAT_IDS` 或 fallback 的 +`TELEGRAM_TOKEN` + `GLOBAL_TELEGRAM_CHAT_ID` 发 Telegram。 + +这个 guard 不会调用 Cloud Run 的交易路由,主要覆盖 Scheduler 没打到服务、 +OIDC/IAM/audience 配错、Cloud Run 返回 4xx/5xx、或容器在 app-level Telegram fallback +执行前就失败的情况。 + +需要的配置: + +- 每个 Environment 保持 `CLOUD_RUN_SERVICE` 正确,或设置 `RUNTIME_GUARD_CLOUD_RUN_SERVICES` +- GitHub deploy service account 需要 `longbridgequant` 项目级 `roles/logging.viewer` +- GitHub 中继续配置 Telegram chat/token 变量或 secrets +- 可选按 Environment 设置 `RUNTIME_GUARD_SCHEDULER_JOB_PATTERN`;默认会按该 Environment 的 + `CLOUD_RUN_SERVICE` 过滤 Scheduler 日志 + +默认计划每 30 分钟检查一次。若要做 missed-run 心跳,按 Environment 设置 +`RUNTIME_GUARD_REQUIRE_SUCCESS=true`,并把 `RUNTIME_GUARD_LOOKBACK_MINUTES` 设成覆盖该环境预期 +Scheduler 运行时间的窗口。默认不强制心跳,避免非交易窗口误报。 + ### 部署单元和命名建议 - `QuantPlatformKit` 只是共享依赖,不单独部署;Cloud Run 继续只部署 `LongBridgePlatform`。 diff --git a/scripts/cloud_run_runtime_guard.py b/scripts/cloud_run_runtime_guard.py new file mode 100644 index 0000000..561a808 --- /dev/null +++ b/scripts/cloud_run_runtime_guard.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +"""Check Cloud Scheduler and Cloud Run logs, then notify Telegram on failures.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import re +import subprocess +import sys +import urllib.parse +import urllib.request +from typing import Any + + +ERROR_SEVERITIES = {"ERROR", "CRITICAL", "ALERT", "EMERGENCY"} +FAILURE_WORDS = ( + "DEADLINE_EXCEEDED", + "INTERNAL_ERROR", + "PERMISSION_DENIED", + "UNAUTHENTICATED", + "URL_ERROR", + "URL_UNREACHABLE", +) + + +def _split_values(raw: str | None) -> list[str]: + if not raw: + return [] + return [part.strip() for part in re.split(r"[,;\n]+", raw) if part.strip()] + + +def _env_bool(name: str, default: bool = False) -> bool: + value = (os.environ.get(name) or "").strip().lower() + if not value: + return default + return value in {"1", "true", "yes", "y", "on"} + + +def _load_services() -> list[str]: + services = [] + for name in ( + "RUNTIME_GUARD_CLOUD_RUN_SERVICES", + "CLOUD_RUN_SERVICES", + "CLOUD_RUN_SERVICE", + ): + services.extend(_split_values(os.environ.get(name))) + + raw_targets = (os.environ.get("CLOUD_RUN_SERVICE_TARGETS_JSON") or "").strip() + if raw_targets: + try: + payload = json.loads(raw_targets) + targets = payload.get("targets") if isinstance(payload, dict) else payload + if isinstance(targets, list): + for target in targets: + if not isinstance(target, dict): + continue + runtime_target = target.get("runtime_target") or target.get( + "runtime_target_json" + ) + if isinstance(runtime_target, str): + try: + runtime_target = json.loads(runtime_target) + except json.JSONDecodeError: + runtime_target = {} + for key in ("service", "service_name", "cloud_run_service"): + value = target.get(key) or ( + runtime_target.get(key) + if isinstance(runtime_target, dict) + else None + ) + if value: + services.extend(_split_values(str(value))) + break + except json.JSONDecodeError as exc: + raise RuntimeError(f"CLOUD_RUN_SERVICE_TARGETS_JSON is invalid: {exc}") from exc + + seen = set() + unique = [] + for service in services: + if service not in seen: + seen.add(service) + unique.append(service) + return unique + + +def _run_gcloud_logging(project: str, log_filter: str, limit: int) -> list[dict[str, Any]]: + command = [ + "gcloud", + "logging", + "read", + log_filter, + "--project", + project, + "--format=json", + f"--limit={limit}", + ] + result = subprocess.run(command, text=True, capture_output=True, check=False) + if result.returncode != 0: + detail = (result.stderr or result.stdout or "").strip() + raise RuntimeError(detail or "gcloud logging read failed") + if not result.stdout.strip(): + return [] + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gcloud returned invalid JSON: {exc}") from exc + return payload if isinstance(payload, list) else [] + + +def _status(entry: dict[str, Any]) -> int | None: + value = (entry.get("httpRequest") or {}).get("status") + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _entry_text(entry: dict[str, Any]) -> str: + chunks = [] + for key in ("textPayload", "message"): + value = entry.get(key) + if value: + chunks.append(str(value)) + for key in ("jsonPayload", "protoPayload"): + value = entry.get(key) + if value: + chunks.append(json.dumps(value, sort_keys=True)) + return " ".join(chunks) + + +def _is_failure(entry: dict[str, Any]) -> bool: + severity = str(entry.get("severity") or "").upper() + status = _status(entry) + text = _entry_text(entry).upper() + return ( + severity in ERROR_SEVERITIES + or (status is not None and status >= 400) + or any(word in text for word in FAILURE_WORDS) + ) + + +def _is_success(entry: dict[str, Any]) -> bool: + status = _status(entry) + return status is not None and 200 <= status < 400 + + +def _labels(entry: dict[str, Any]) -> dict[str, Any]: + resource = entry.get("resource") or {} + labels = resource.get("labels") or {} + return labels if isinstance(labels, dict) else {} + + +def _summarize(entry: dict[str, Any]) -> str: + labels = _labels(entry) + target = labels.get("service_name") or labels.get("job_id") or labels.get("job_name") + timestamp = str(entry.get("timestamp") or "-") + severity = str(entry.get("severity") or "-") + status = _status(entry) + status_text = f" status={status}" if status is not None else "" + text = re.sub(r"\s+", " ", _entry_text(entry)).strip() + if len(text) > 180: + text = text[:177] + "..." + suffix = f" {text}" if text else "" + return f"- {timestamp} {target or ''} severity={severity}{status_text}{suffix}" + + +def _send_telegram(message: str) -> bool: + targets: list[tuple[str, str]] = [] + crisis_token = os.environ.get("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + for chat_id in _split_values(os.environ.get("CRISIS_ALERT_TELEGRAM_CHAT_IDS")): + if crisis_token: + targets.append((crisis_token, chat_id)) + + token = os.environ.get("TELEGRAM_TOKEN") or os.environ.get("TG_TOKEN") + for chat_id in _split_values(os.environ.get("GLOBAL_TELEGRAM_CHAT_ID")): + if token: + targets.append((token, chat_id)) + + unique_targets = list(dict.fromkeys(targets)) + if not unique_targets: + print("No Telegram token/chat configured; unable to send runtime guard alert.", file=sys.stderr) + return False + + ok = True + base_url = os.environ.get("CRISIS_ALERT_TELEGRAM_API_BASE_URL") or "https://api.telegram.org" + for token_value, chat_id in unique_targets: + body = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode() + request = urllib.request.Request( + f"{base_url.rstrip('/')}/bot{token_value}/sendMessage", + data=body, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + if response.status >= 400: + ok = False + print(f"Telegram returned HTTP {response.status}", file=sys.stderr) + except Exception as exc: # noqa: BLE001 + ok = False + print(f"Telegram send failed: {exc}", file=sys.stderr) + return ok + + +def main() -> int: + project = ( + os.environ.get("RUNTIME_GUARD_GCP_PROJECT_ID") + or os.environ.get("GCP_PROJECT_ID") + or os.environ.get("GOOGLE_CLOUD_PROJECT") + ) + if not project: + raise SystemExit("GCP_PROJECT_ID or GOOGLE_CLOUD_PROJECT is required") + + name = os.environ.get("RUNTIME_GUARD_NAME") or os.environ.get("GITHUB_REPOSITORY") or "Cloud Run" + lookback_minutes = int(os.environ.get("RUNTIME_GUARD_LOOKBACK_MINUTES") or "180") + limit = int(os.environ.get("RUNTIME_GUARD_LOG_LIMIT") or "200") + require_success = _env_bool("RUNTIME_GUARD_REQUIRE_SUCCESS", False) + fail_workflow = _env_bool("RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT", True) + check_scheduler = _env_bool("RUNTIME_GUARD_CHECK_SCHEDULER", True) + scheduler_pattern = os.environ.get("RUNTIME_GUARD_SCHEDULER_JOB_PATTERN") or "" + + since = ( + dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=lookback_minutes) + ).replace(microsecond=0) + since_text = since.isoformat().replace("+00:00", "Z") + + issues: list[str] = [] + details: list[str] = [] + success_count = 0 + + try: + services = _load_services() + except RuntimeError as exc: + services = [] + issues.append(f"service configuration error: {exc}") + + for service in services: + log_filter = ( + 'resource.type="cloud_run_revision" ' + f'AND resource.labels.service_name="{service}" ' + f'AND timestamp >= "{since_text}"' + ) + try: + entries = _run_gcloud_logging(project, log_filter, limit) + except RuntimeError as exc: + issues.append(f"Cloud Run log query failed for {service}: {exc}") + continue + failures = [entry for entry in entries if _is_failure(entry)] + success_count += sum(1 for entry in entries if _is_success(entry)) + if failures: + issues.append(f"{len(failures)} Cloud Run failure log(s) for {service}") + details.extend(_summarize(entry) for entry in failures[:5]) + + if services and require_success and success_count == 0: + issues.append( + f"no successful Cloud Run request found for {', '.join(services)} in the last {lookback_minutes} minutes" + ) + + if check_scheduler: + log_filter = f'resource.type="cloud_scheduler_job" AND timestamp >= "{since_text}"' + try: + entries = _run_gcloud_logging(project, log_filter, limit) + if scheduler_pattern: + regex = re.compile(scheduler_pattern) + entries = [ + entry + for entry in entries + if regex.search(str(_labels(entry).get("job_id") or _labels(entry).get("job_name") or "")) + ] + failures = [entry for entry in entries if _is_failure(entry)] + if failures: + issues.append(f"{len(failures)} Cloud Scheduler failure log(s)") + details.extend(_summarize(entry) for entry in failures[:5]) + except RuntimeError as exc: + issues.append(f"Cloud Scheduler log query failed: {exc}") + + if not issues: + service_text = ", ".join(services) if services else "" + print( + f"Runtime guard OK for {name}: services={service_text}, lookback={lookback_minutes}m, successes={success_count}" + ) + return 0 + + run_url = "" + if os.environ.get("GITHUB_SERVER_URL") and os.environ.get("GITHUB_REPOSITORY") and os.environ.get("GITHUB_RUN_ID"): + run_url = ( + f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}" + f"/actions/runs/{os.environ['GITHUB_RUN_ID']}" + ) + message_lines = [ + f"[Runtime Guard] {name}", + f"Project: {project}", + f"Lookback: {lookback_minutes} minutes", + "Issues:", + *[f"- {issue}" for issue in issues], + ] + if details: + message_lines.extend(["Details:", *details[:10]]) + if run_url: + message_lines.append(f"Workflow: {run_url}") + message = "\n".join(message_lines) + print(message) + _send_telegram(message[:3900]) + return 1 if fail_workflow else 0 + + +if __name__ == "__main__": + raise SystemExit(main())