diff --git a/.github/workflows/execution-report-heartbeat.yml b/.github/workflows/execution-report-heartbeat.yml index e1d6808..6bab5ea 100644 --- a/.github/workflows/execution-report-heartbeat.yml +++ b/.github/workflows/execution-report-heartbeat.yml @@ -46,15 +46,20 @@ jobs: RUNTIME_HEARTBEAT_NAME: LongBridgePlatform ${{ matrix.target.label }} RUNTIME_HEARTBEAT_REPORT_PLATFORM: longbridge RUNTIME_HEARTBEAT_ACCOUNT_SCOPE: ${{ vars.ACCOUNT_REGION }} - RUNTIME_HEARTBEAT_REQUIRED_SERVICES: ${{ vars.RUNTIME_HEARTBEAT_REQUIRED_SERVICES || vars.CLOUD_RUN_SERVICE }} + RUNTIME_HEARTBEAT_REQUIRED_SERVICES: ${{ vars.RUNTIME_HEARTBEAT_REQUIRED_SERVICES }} RUNTIME_HEARTBEAT_GCS_URIS: ${{ vars.RUNTIME_HEARTBEAT_GCS_URIS || vars.EXECUTION_REPORT_GCS_URI }} RUNTIME_HEARTBEAT_LOOKBACK_HOURS: ${{ inputs.lookback_hours || vars.RUNTIME_HEARTBEAT_LOOKBACK_HOURS || '36' }} RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }} RUNTIME_HEARTBEAT_ACCEPT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STATUSES }} RUNTIME_HEARTBEAT_REJECT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STATUSES }} + RUNTIME_HEARTBEAT_SCHEDULER_AWARE: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_AWARE || 'true' }} + RUNTIME_HEARTBEAT_SCHEDULER_LOCATION: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_LOCATION || vars.CLOUD_RUN_REGION }} RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH }} RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE }} + CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }} CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} + CLOUD_RUN_SERVICES: ${{ vars.CLOUD_RUN_SERVICES }} + CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }} GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }} TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }} steps: diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index ec414cc..7ad02de 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -95,6 +95,7 @@ jobs: CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }} CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT: ${{ vars.CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT }} + CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }} ACCOUNT_PREFIX: ${{ vars.ACCOUNT_PREFIX }} TELEGRAM_TOKEN_SECRET_NAME: ${{ vars.TELEGRAM_TOKEN_SECRET_NAME }} LONGPORT_APP_KEY_SECRET_NAME: ${{ vars.LONGPORT_APP_KEY_SECRET_NAME }} @@ -941,6 +942,45 @@ jobs: gcloud "${gcloud_args[@]}" + - name: Sync Cloud Scheduler timezone + if: steps.config.outputs.env_sync_enabled == 'true' + run: | + set -euo pipefail + + scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}" + if [ -z "${scheduler_location}" ]; then + echo "Cloud Scheduler timezone sync requires CLOUD_RUN_REGION or CLOUD_SCHEDULER_LOCATION." >&2 + exit 1 + fi + + market_timezone="$(python - <<'PY' + import os + + timezone = os.environ.get("LONGBRIDGE_MARKET_TIMEZONE", "").strip() + market = os.environ.get("LONGBRIDGE_MARKET", "").strip().upper() + if not timezone: + timezone = "Asia/Hong_Kong" if market == "HK" else "America/New_York" + print(timezone) + PY + )" + + for suffix in scheduler probe-scheduler precheck-scheduler; do + job_name="${CLOUD_RUN_SERVICE}-${suffix}" + if ! gcloud scheduler jobs describe "${job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" >/dev/null 2>&1; then + echo "Cloud Scheduler job ${job_name} was not found in ${scheduler_location}; skipping timezone sync." + continue + fi + + echo "Updating Cloud Scheduler job ${job_name} timezone to ${market_timezone}." + gcloud scheduler jobs update http "${job_name}" \ + --project="${GCP_PROJECT_ID}" \ + --location="${scheduler_location}" \ + --time-zone="${market_timezone}" \ + --quiet + done + - name: Prune old Cloud Run revisions if: steps.config.outputs.enabled == 'true' run: | diff --git a/scripts/execution_report_heartbeat.py b/scripts/execution_report_heartbeat.py index 749ca39..09202b7 100644 --- a/scripts/execution_report_heartbeat.py +++ b/scripts/execution_report_heartbeat.py @@ -145,10 +145,13 @@ def _base_report_uris() -> list[str]: return unique -def _load_required_services() -> list[str]: +def _load_required_service_candidates() -> tuple[list[str], bool]: + explicit_services = _split_values(os.environ.get("RUNTIME_HEARTBEAT_REQUIRED_SERVICES")) + if explicit_services: + return _unique_values(explicit_services), True + services = [] for name in ( - "RUNTIME_HEARTBEAT_REQUIRED_SERVICES", "CLOUD_RUN_SERVICES", "CLOUD_RUN_SERVICE", ): @@ -179,15 +182,239 @@ def _load_required_services() -> list[str]: except json.JSONDecodeError: pass + return _unique_values(services), False + + +def _load_required_services( + *, + project: str | None = None, + since: dt.datetime | None = None, + now: dt.datetime | None = None, +) -> list[str]: + services, _skip_reason, _scheduler_checked = _resolve_required_services( + project=project, + since=since, + now=now, + ) + return services + + +def _resolve_required_services( + *, + project: str | None = None, + since: dt.datetime | None = None, + now: dt.datetime | None = None, +) -> tuple[list[str], str | None, bool]: + services, explicit = _load_required_service_candidates() + if explicit or not services: + return services, None, False + if not _env_bool("RUNTIME_HEARTBEAT_SCHEDULER_AWARE", True): + return services, None, False + if since is None or now is None: + return services, None, False + try: + due_services = _filter_scheduler_due_services( + services, + project=project, + since=since, + now=now, + ) + except RuntimeError as exc: + print( + f"Unable to resolve Cloud Scheduler-backed heartbeat services: {exc}; " + "falling back to all configured services.", + file=sys.stderr, + ) + return services, None, False + if not due_services: + return ( + [], + "no configured Cloud Scheduler main job was due in the heartbeat lookback window", + True, + ) + return due_services, None, True + + +def _unique_values(values: list[str]) -> list[str]: seen = set() unique = [] - for service in services: - if service not in seen: - seen.add(service) - unique.append(service) + for value in values: + if value not in seen: + seen.add(value) + unique.append(value) return unique +def _scheduler_location() -> str: + return ( + os.environ.get("RUNTIME_HEARTBEAT_SCHEDULER_LOCATION") + or os.environ.get("CLOUD_RUN_REGION") + or "us-central1" + ) + + +def _list_scheduler_jobs(*, project: str | None) -> list[dict[str, Any]]: + command = [ + "gcloud", + "scheduler", + "jobs", + "list", + "--location", + _scheduler_location(), + "--format=json", + ] + if project: + command.extend(["--project", project]) + result = _run_gcloud(command) + if result.returncode != 0: + detail = (result.stderr or result.stdout or "").strip() + raise RuntimeError(detail or "gcloud scheduler jobs list failed") + if not result.stdout.strip(): + return [] + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gcloud scheduler jobs list returned invalid JSON: {exc}") from exc + return payload if isinstance(payload, list) else [] + + +def _scheduler_job_targets_strategy_run(job: dict[str, Any], service: str) -> bool: + if str(job.get("state") or "").strip().upper() not in {"", "ENABLED"}: + return False + uri = str((job.get("httpTarget") or {}).get("uri") or "").strip() + if not uri: + return False + parsed = urllib.parse.urlparse(uri) + path = parsed.path or "/" + if path != "/": + return False + service_text = str(service or "").strip().lower() + return bool(service_text and service_text in parsed.netloc.lower()) + + +def _cron_token_value(token: str, *, names: dict[str, int] | None = None) -> int: + normalized = token.strip().lower() + if names and normalized in names: + return names[normalized] + return int(normalized) + + +def _cron_field_values( + field: str, + *, + minimum: int, + maximum: int, + names: dict[str, int] | None = None, +) -> set[int] | None: + text = str(field or "").strip().lower() + if text in {"", "*"}: + return None + values: set[int] = set() + for raw_part in text.split(","): + part = raw_part.strip() + if not part: + continue + base, raw_step = part, "1" + if "/" in part: + base, raw_step = part.split("/", 1) + step = max(1, int(raw_step)) + if base == "*": + start, end = minimum, maximum + elif "-" in base: + raw_start, raw_end = base.split("-", 1) + start = _cron_token_value(raw_start, names=names) + end = _cron_token_value(raw_end, names=names) + else: + start = end = _cron_token_value(base, names=names) + for value in range(start, end + 1, step): + if minimum <= value <= maximum: + values.add(value) + elif maximum == 6 and value == 7: + values.add(0) + return values + + +def _cron_matches(schedule: str, value: dt.datetime) -> bool: + fields = str(schedule or "").split() + if len(fields) != 5: + return False + minute, hour, day_of_month, month, day_of_week = fields + dow_names = { + "sun": 0, + "mon": 1, + "tue": 2, + "wed": 3, + "thu": 4, + "fri": 5, + "sat": 6, + } + minute_values = _cron_field_values(minute, minimum=0, maximum=59) + hour_values = _cron_field_values(hour, minimum=0, maximum=23) + dom_values = _cron_field_values(day_of_month, minimum=1, maximum=31) + month_values = _cron_field_values(month, minimum=1, maximum=12) + dow_values = _cron_field_values(day_of_week, minimum=0, maximum=6, names=dow_names) + if minute_values is not None and value.minute not in minute_values: + return False + if hour_values is not None and value.hour not in hour_values: + return False + if month_values is not None and value.month not in month_values: + return False + + dom_matches = dom_values is None or value.day in dom_values + cron_weekday = value.isoweekday() % 7 + dow_matches = dow_values is None or cron_weekday in dow_values + if dom_values is not None and dow_values is not None: + return dom_matches or dow_matches + return dom_matches and dow_matches + + +def _scheduler_job_due_between( + job: dict[str, Any], + *, + since: dt.datetime, + now: dt.datetime, +) -> bool: + schedule = str(job.get("schedule") or "").strip() + if not schedule: + return False + try: + timezone = ZoneInfo(str(job.get("timeZone") or "UTC")) + except Exception: # noqa: BLE001 + timezone = dt.timezone.utc + + since_utc = since.astimezone(dt.timezone.utc) + now_utc = now.astimezone(dt.timezone.utc) + cursor = since_utc.replace(second=0, microsecond=0) + if cursor < since_utc: + cursor += dt.timedelta(minutes=1) + while cursor <= now_utc: + if _cron_matches(schedule, cursor.astimezone(timezone)): + return True + cursor += dt.timedelta(minutes=1) + return False + + +def _filter_scheduler_due_services( + services: list[str], + *, + project: str | None, + since: dt.datetime, + now: dt.datetime, +) -> list[str]: + jobs = _list_scheduler_jobs(project=project) + due_services = [] + for service in services: + service_jobs = [ + job for job in jobs if _scheduler_job_targets_strategy_run(job, service) + ] + if not service_jobs or any( + _scheduler_job_due_between(job, since=since, now=now) + for job in service_jobs + ): + due_services.append(service) + return due_services + + def _report_globs(since: dt.datetime, now: dt.datetime) -> list[str]: explicit = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_GLOBS")) if explicit: @@ -380,21 +607,29 @@ def main(now: dt.datetime | None = None) -> int: lookback_hours = float(os.environ.get("RUNTIME_HEARTBEAT_LOOKBACK_HOURS") or "36") max_reports = int(os.environ.get("RUNTIME_HEARTBEAT_MAX_REPORTS_TO_READ") or "20") fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True) - required_services = _load_required_services() now = now or dt.datetime.now(dt.timezone.utc) if now.tzinfo is None: now = now.replace(tzinfo=dt.timezone.utc) now = now.astimezone(dt.timezone.utc) - try: - expected_window = _expected_report_window_status(now) - except ValueError as exc: - raise SystemExit(str(exc)) from exc - if expected_window and not expected_window[0]: - print(f"Execution report heartbeat skipped for {name}: {expected_window[1]}") + since = now - dt.timedelta(hours=lookback_hours) + required_services, scheduler_skip_reason, scheduler_checked = _resolve_required_services( + project=project, + since=since, + now=now, + ) + if scheduler_skip_reason: + print(f"Execution report heartbeat skipped for {name}: {scheduler_skip_reason}") return 0 + if not scheduler_checked: + try: + expected_window = _expected_report_window_status(now) + except ValueError as exc: + raise SystemExit(str(exc)) from exc + if expected_window and not expected_window[0]: + print(f"Execution report heartbeat skipped for {name}: {expected_window[1]}") + return 0 - since = now - dt.timedelta(hours=lookback_hours) globs = _report_globs(since, now) if not globs: raise SystemExit("No heartbeat GCS report URI configured") diff --git a/tests/test_execution_report_heartbeat.py b/tests/test_execution_report_heartbeat.py index eb8c1fe..1702245 100644 --- a/tests/test_execution_report_heartbeat.py +++ b/tests/test_execution_report_heartbeat.py @@ -1,4 +1,5 @@ import datetime as dt +import json import os import sys from pathlib import Path @@ -28,6 +29,150 @@ def _clear_runtime_env(monkeypatch): monkeypatch.delenv(name, raising=False) +def test_explicit_required_services_override_target_derived_services(monkeypatch): + _clear_runtime_env(monkeypatch) + monkeypatch.setenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", "svc-daily-a,svc-daily-b") + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps( + { + "targets": [ + {"service": "svc-daily-a"}, + {"service": "svc-monthly"}, + ] + } + ), + ) + + assert heartbeat._load_required_services() == ["svc-daily-a", "svc-daily-b"] + + +def test_required_services_fall_back_to_cloud_run_targets(monkeypatch): + _clear_runtime_env(monkeypatch) + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps( + { + "targets": [ + {"service": "svc-a"}, + {"runtime_target": {"service_name": "svc-b"}}, + {"service": "svc-a"}, + ] + } + ), + ) + + assert heartbeat._load_required_services() == ["svc-a", "svc-b"] + + +def test_scheduler_aware_required_services_only_include_due_main_schedulers(monkeypatch): + _clear_runtime_env(monkeypatch) + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps( + { + "targets": [ + {"service": "svc-daily"}, + {"service": "svc-monthly"}, + ] + } + ), + ) + monkeypatch.setattr( + heartbeat, + "_list_scheduler_jobs", + lambda **_kwargs: [ + { + "state": "ENABLED", + "schedule": "45 15 * * 1-5", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-daily.example.run.app/"}, + }, + { + "state": "ENABLED", + "schedule": "45 15 26 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/"}, + }, + { + "state": "ENABLED", + "schedule": "35 9,15 25-30 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/probe"}, + }, + ], + ) + + required = heartbeat._load_required_services( + project="project-1", + since=dt.datetime(2026, 6, 5, 0, 0, tzinfo=dt.timezone.utc), + now=dt.datetime(2026, 6, 6, 2, 0, tzinfo=dt.timezone.utc), + ) + + assert required == ["svc-daily"] + + +def test_scheduler_aware_required_services_include_monthly_service_when_due(monkeypatch): + _clear_runtime_env(monkeypatch) + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps({"targets": [{"service": "svc-monthly"}]}), + ) + monkeypatch.setattr( + heartbeat, + "_list_scheduler_jobs", + lambda **_kwargs: [ + { + "state": "ENABLED", + "schedule": "45 15 26 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/"}, + }, + ], + ) + + required = heartbeat._load_required_services( + project="project-1", + since=dt.datetime(2026, 6, 26, 19, 0, tzinfo=dt.timezone.utc), + now=dt.datetime(2026, 6, 26, 20, 0, tzinfo=dt.timezone.utc), + ) + + assert required == ["svc-monthly"] + + +def test_main_skips_when_no_scheduler_main_job_is_due(monkeypatch, capsys): + _clear_runtime_env(monkeypatch) + monkeypatch.setenv("GCP_PROJECT_ID", "longbridgequant") + monkeypatch.setenv("RUNTIME_HEARTBEAT_NAME", "Monthly runtime") + monkeypatch.setenv("RUNTIME_HEARTBEAT_REPORT_PLATFORM", "longbridge") + monkeypatch.setenv("CLOUD_RUN_SERVICE", "longbridge-monthly-service") + monkeypatch.setenv("RUNTIME_HEARTBEAT_GCS_URIS", "gs://bucket/execution-reports") + monkeypatch.setattr( + heartbeat, + "_list_scheduler_jobs", + lambda **_kwargs: [ + { + "state": "ENABLED", + "schedule": "45 15 1-7 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://longbridge-monthly-service.example.run.app/"}, + }, + ], + ) + monkeypatch.setattr( + heartbeat, + "_list_gcs_objects", + lambda *_args, **_kwargs: pytest.fail("GCS should not be queried when no scheduler job is due"), + ) + + result = heartbeat.main(now=dt.datetime(2026, 6, 10, 1, 35, tzinfo=dt.timezone.utc)) + + assert result == 0 + output = capsys.readouterr().out + assert "Execution report heartbeat skipped for Monthly runtime" in output + assert "no configured Cloud Scheduler main job was due" in output + + def test_main_skips_outside_expected_day_of_month_window(monkeypatch, capsys): _clear_runtime_env(monkeypatch) monkeypatch.setenv("RUNTIME_HEARTBEAT_NAME", "Monthly runtime") diff --git a/tests/test_sync_cloud_run_env_workflow.sh b/tests/test_sync_cloud_run_env_workflow.sh index f6ffd86..0fb900c 100644 --- a/tests/test_sync_cloud_run_env_workflow.sh +++ b/tests/test_sync_cloud_run_env_workflow.sh @@ -22,6 +22,7 @@ grep -Fq 'echo "LONGBRIDGE_DRY_RUN_ONLY=true"' "$workflow_file" grep -Fq 'echo "LONGBRIDGE_MARKET=HK"' "$workflow_file" grep -Fq 'echo "LONGBRIDGE_SYMBOL_SUFFIX=.HK"' "$workflow_file" grep -Fq 'CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT: ${{ vars.CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT }}' "$workflow_file" +grep -Fq 'CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }}' "$workflow_file" grep -Fq 'Skipping Cloud Run commit wait because CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT is disabled.' "$workflow_file" grep -Fq 'permissions:' "$workflow_file" grep -Fq 'id-token: write' "$workflow_file" @@ -211,6 +212,14 @@ grep -Fq 'join_by_delimiter()' "$workflow_file" grep -Fq 'gcloud_args+=(--remove-secrets "$(IFS=,; echo "${remove_secret_vars[*]}")")' "$workflow_file" grep -Fq 'gcloud_args+=(--update-secrets "$(IFS=,; echo "${secret_pairs[*]}")")' "$workflow_file" grep -Fq -- '--update-env-vars "^|^$(join_by_delimiter "|" "${env_pairs[@]}")"' "$workflow_file" +grep -Fq 'Sync Cloud Scheduler timezone' "$workflow_file" +grep -Fq 'scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}"' "$workflow_file" +grep -Fq 'timezone = os.environ.get("LONGBRIDGE_MARKET_TIMEZONE", "").strip()' "$workflow_file" +grep -Fq 'timezone = "Asia/Hong_Kong" if market == "HK" else "America/New_York"' "$workflow_file" +grep -Fq 'for suffix in scheduler probe-scheduler precheck-scheduler; do' "$workflow_file" +grep -Fq 'gcloud scheduler jobs describe "${job_name}"' "$workflow_file" +grep -Fq 'gcloud scheduler jobs update http "${job_name}"' "$workflow_file" +grep -Fq -- '--time-zone="${market_timezone}"' "$workflow_file" if grep -Fq 'SERVICE_NAME: ${{ vars.SERVICE_NAME }}' "$workflow_file"; then echo "unexpected SERVICE_NAME env wiring still present" >&2