Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/execution-report-heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ jobs:
RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }}
RUNTIME_HEARTBEAT_ACCEPT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STATUSES }}
RUNTIME_HEARTBEAT_REJECT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STATUSES }}
RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH }}
RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE }}
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }}
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
Expand Down
70 changes: 68 additions & 2 deletions scripts/execution_report_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import urllib.parse
import urllib.request
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError


DEFAULT_ACCEPT_STATUSES = {"ok", "skipped", "success", "completed", "no_action"}
Expand Down Expand Up @@ -42,6 +43,60 @@ def _env_bool(name: str, default: bool = False) -> bool:
return value in {"1", "true", "yes", "y", "on"}


def _parse_day_of_month_selector(raw: str) -> set[int]:
days: set[int] = set()
for part in _split_values(raw):
if part == "*":
return set(range(1, 32))
match = re.fullmatch(r"(\d{1,2})(?:-(\d{1,2}))?", part)
if not match:
raise ValueError(f"Invalid RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH value: {part!r}")
start = int(match.group(1))
end = int(match.group(2) or start)
if start < 1 or end > 31 or start > end:
raise ValueError(f"Invalid day-of-month range: {part!r}")
days.update(range(start, end + 1))
return days


def _expected_report_window_status(now: dt.datetime) -> tuple[bool, str] | None:
raw_days = (
os.environ.get("RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH")
or os.environ.get("RUNTIME_HEARTBEAT_EXPECTED_MONTH_DAYS")
or ""
).strip()
if not raw_days:
return None

timezone_name = (
os.environ.get("RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE")
or os.environ.get("RUNTIME_HEARTBEAT_TIMEZONE")
or "UTC"
).strip()
try:
timezone = ZoneInfo(timezone_name)
except ZoneInfoNotFoundError as exc:
raise ValueError(f"Invalid RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE value: {timezone_name!r}") from exc

days = _parse_day_of_month_selector(raw_days)
if not days:
return None

if now.tzinfo is None:
now = now.replace(tzinfo=dt.timezone.utc)
local_now = now.astimezone(timezone)
local_date = local_now.date().isoformat()
if local_now.day not in days:
return (
False,
f"outside expected day-of-month window {raw_days} in {timezone_name}; local_date={local_date}",
)
return (
True,
f"inside expected day-of-month window {raw_days} in {timezone_name}; local_date={local_date}",
)


def _parse_timestamp(value: Any) -> dt.datetime | None:
if not value:
return None
Expand Down Expand Up @@ -315,7 +370,7 @@ def _send_telegram(message: str) -> bool:
return ok


def main() -> int:
def main(now: dt.datetime | None = None) -> int:
project = (
os.environ.get("RUNTIME_HEARTBEAT_GCP_PROJECT_ID")
or os.environ.get("GCP_PROJECT_ID")
Expand All @@ -327,7 +382,18 @@ def main() -> int:
fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True)
required_services = _load_required_services()

now = dt.datetime.now(dt.timezone.utc)
now = now or dt.datetime.now(dt.timezone.utc)
if now.tzinfo is None:
now = now.replace(tzinfo=dt.timezone.utc)
now = now.astimezone(dt.timezone.utc)
try:
expected_window = _expected_report_window_status(now)
except ValueError as exc:
raise SystemExit(str(exc)) from exc
if expected_window and not expected_window[0]:
print(f"Execution report heartbeat skipped for {name}: {expected_window[1]}")
return 0

since = now - dt.timedelta(hours=lookback_hours)
globs = _report_globs(since, now)
if not globs:
Expand Down
97 changes: 97 additions & 0 deletions tests/test_execution_report_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import datetime as dt
import os
import sys
from pathlib import Path

import pytest


ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))

from scripts import execution_report_heartbeat as heartbeat # noqa: E402


def _clear_runtime_env(monkeypatch):
for name in list(os.environ):
if name.startswith("RUNTIME_HEARTBEAT_") or name in {
"CLOUD_RUN_SERVICE",
"CLOUD_RUN_SERVICES",
"CLOUD_RUN_SERVICE_TARGETS_JSON",
"EXECUTION_REPORT_GCS_URI",
"FIRSTRADE_GCS_STATE_BUCKET",
"FIRSTRADE_STATE_PREFIX",
"GCP_PROJECT_ID",
"GOOGLE_CLOUD_PROJECT",
}:
monkeypatch.delenv(name, raising=False)


def test_main_skips_outside_expected_day_of_month_window(monkeypatch, capsys):
_clear_runtime_env(monkeypatch)
monkeypatch.setenv("RUNTIME_HEARTBEAT_NAME", "Monthly runtime")
monkeypatch.setenv("RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH", "1-7")
monkeypatch.setenv("RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE", "America/New_York")

monkeypatch.setattr(
heartbeat,
"_list_gcs_objects",
lambda *_args, **_kwargs: pytest.fail("GCS should not be queried outside the expected window"),
)

result = heartbeat.main(now=dt.datetime(2026, 6, 10, 1, 35, tzinfo=dt.timezone.utc))

assert result == 0
output = capsys.readouterr().out
assert "Execution report heartbeat skipped for Monthly runtime" in output
assert "outside expected day-of-month window 1-7 in America/New_York" in output
assert "local_date=2026-06-09" in output


def test_main_checks_reports_inside_expected_day_of_month_window(monkeypatch, capsys):
_clear_runtime_env(monkeypatch)
monkeypatch.setenv("GCP_PROJECT_ID", "longbridgequant")
monkeypatch.setenv("RUNTIME_HEARTBEAT_NAME", "Monthly runtime")
monkeypatch.setenv("RUNTIME_HEARTBEAT_REPORT_PLATFORM", "longbridge")
monkeypatch.setenv("RUNTIME_HEARTBEAT_ACCOUNT_SCOPE", "MONTHLY")
monkeypatch.setenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", "longbridge-monthly-service")
monkeypatch.setenv("RUNTIME_HEARTBEAT_GCS_URIS", "gs://bucket/execution-reports")
monkeypatch.setenv("RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH", "1-7")
monkeypatch.setenv("RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE", "America/New_York")

observed = {}

def fake_list_gcs_objects(gcs_glob, *, project):
observed["gcs_glob"] = gcs_glob
observed["project"] = project
return [
{
"url": "gs://bucket/execution-reports/longbridge/profile/MONTHLY/2026-06/report.json",
"metadata": {"updated": "2026-06-04T23:20:00Z"},
}
]

def fake_cat_gcs_json(uri, *, project):
observed["cat_uri"] = uri
observed["cat_project"] = project
return {
"platform": "longbridge",
"account_scope": "MONTHLY",
"service_name": "longbridge-monthly-service",
"status": "ok",
}

monkeypatch.setattr(heartbeat, "_list_gcs_objects", fake_list_gcs_objects)
monkeypatch.setattr(heartbeat, "_cat_gcs_json", fake_cat_gcs_json)

result = heartbeat.main(now=dt.datetime(2026, 6, 4, 23, 30, tzinfo=dt.timezone.utc))

assert result == 0
assert observed["gcs_glob"] == "gs://bucket/execution-reports/longbridge/**/2026-06/*.json"
assert observed["project"] == "longbridgequant"
assert observed["cat_project"] == "longbridgequant"
assert observed["cat_uri"].endswith("/report.json")
output = capsys.readouterr().out
assert "Execution report heartbeat OK for Monthly runtime" in output
assert "longbridge-monthly-service@2026-06-04T23:20:00+00:00" in output