Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/execution-report-heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ jobs:
RUNTIME_HEARTBEAT_NAME: LongBridgePlatform ${{ matrix.target.label }}
RUNTIME_HEARTBEAT_REPORT_PLATFORM: longbridge
RUNTIME_HEARTBEAT_ACCOUNT_SCOPE: ${{ vars.ACCOUNT_REGION }}
RUNTIME_HEARTBEAT_REQUIRED_SERVICES: ${{ vars.RUNTIME_HEARTBEAT_REQUIRED_SERVICES || vars.CLOUD_RUN_SERVICE }}
RUNTIME_HEARTBEAT_REQUIRED_SERVICES: ${{ vars.RUNTIME_HEARTBEAT_REQUIRED_SERVICES }}
RUNTIME_HEARTBEAT_GCS_URIS: ${{ vars.RUNTIME_HEARTBEAT_GCS_URIS || vars.EXECUTION_REPORT_GCS_URI }}
RUNTIME_HEARTBEAT_LOOKBACK_HOURS: ${{ inputs.lookback_hours || vars.RUNTIME_HEARTBEAT_LOOKBACK_HOURS || '36' }}
RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }}
RUNTIME_HEARTBEAT_ACCEPT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STATUSES }}
RUNTIME_HEARTBEAT_REJECT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STATUSES }}
RUNTIME_HEARTBEAT_SCHEDULER_AWARE: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_AWARE || 'true' }}
RUNTIME_HEARTBEAT_SCHEDULER_LOCATION: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_LOCATION || vars.CLOUD_RUN_REGION }}
RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_DAY_OF_MONTH }}
RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE: ${{ vars.RUNTIME_HEARTBEAT_EXPECTED_TIMEZONE }}
CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }}
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
CLOUD_RUN_SERVICES: ${{ vars.CLOUD_RUN_SERVICES }}
CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }}
GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }}
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
steps:
Expand Down
40 changes: 40 additions & 0 deletions .github/workflows/sync-cloud-run-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
CLOUD_RUN_REGION: ${{ vars.CLOUD_RUN_REGION }}
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT: ${{ vars.CLOUD_RUN_ENV_SYNC_WAIT_FOR_COMMIT }}
CLOUD_SCHEDULER_LOCATION: ${{ vars.CLOUD_SCHEDULER_LOCATION }}
ACCOUNT_PREFIX: ${{ vars.ACCOUNT_PREFIX }}
TELEGRAM_TOKEN_SECRET_NAME: ${{ vars.TELEGRAM_TOKEN_SECRET_NAME }}
LONGPORT_APP_KEY_SECRET_NAME: ${{ vars.LONGPORT_APP_KEY_SECRET_NAME }}
Expand Down Expand Up @@ -941,6 +942,45 @@ jobs:

gcloud "${gcloud_args[@]}"

- name: Sync Cloud Scheduler timezone
if: steps.config.outputs.env_sync_enabled == 'true'
run: |
set -euo pipefail

scheduler_location="${CLOUD_SCHEDULER_LOCATION:-${CLOUD_RUN_REGION}}"
if [ -z "${scheduler_location}" ]; then
echo "Cloud Scheduler timezone sync requires CLOUD_RUN_REGION or CLOUD_SCHEDULER_LOCATION." >&2
exit 1
fi

market_timezone="$(python - <<'PY'
import os

timezone = os.environ.get("LONGBRIDGE_MARKET_TIMEZONE", "").strip()
market = os.environ.get("LONGBRIDGE_MARKET", "").strip().upper()
if not timezone:
timezone = "Asia/Hong_Kong" if market == "HK" else "America/New_York"
print(timezone)
PY
)"

for suffix in scheduler probe-scheduler precheck-scheduler; do
job_name="${CLOUD_RUN_SERVICE}-${suffix}"
if ! gcloud scheduler jobs describe "${job_name}" \
--project="${GCP_PROJECT_ID}" \
--location="${scheduler_location}" >/dev/null 2>&1; then
echo "Cloud Scheduler job ${job_name} was not found in ${scheduler_location}; skipping timezone sync."
continue
fi

echo "Updating Cloud Scheduler job ${job_name} timezone to ${market_timezone}."
gcloud scheduler jobs update http "${job_name}" \
--project="${GCP_PROJECT_ID}" \
--location="${scheduler_location}" \
--time-zone="${market_timezone}" \
--quiet
done

- name: Prune old Cloud Run revisions
if: steps.config.outputs.enabled == 'true'
run: |
Expand Down
263 changes: 249 additions & 14 deletions scripts/execution_report_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ def _base_report_uris() -> list[str]:
return unique


def _load_required_services() -> list[str]:
def _load_required_service_candidates() -> tuple[list[str], bool]:
explicit_services = _split_values(os.environ.get("RUNTIME_HEARTBEAT_REQUIRED_SERVICES"))
if explicit_services:
return _unique_values(explicit_services), True

services = []
for name in (
"RUNTIME_HEARTBEAT_REQUIRED_SERVICES",
"CLOUD_RUN_SERVICES",
"CLOUD_RUN_SERVICE",
):
Expand Down Expand Up @@ -179,15 +182,239 @@ def _load_required_services() -> list[str]:
except json.JSONDecodeError:
pass

return _unique_values(services), False


def _load_required_services(
*,
project: str | None = None,
since: dt.datetime | None = None,
now: dt.datetime | None = None,
) -> list[str]:
services, _skip_reason, _scheduler_checked = _resolve_required_services(
project=project,
since=since,
now=now,
)
return services


def _resolve_required_services(
*,
project: str | None = None,
since: dt.datetime | None = None,
now: dt.datetime | None = None,
) -> tuple[list[str], str | None, bool]:
services, explicit = _load_required_service_candidates()
if explicit or not services:
return services, None, False
if not _env_bool("RUNTIME_HEARTBEAT_SCHEDULER_AWARE", True):
return services, None, False
if since is None or now is None:
return services, None, False
try:
due_services = _filter_scheduler_due_services(
services,
project=project,
since=since,
now=now,
)
except RuntimeError as exc:
print(
f"Unable to resolve Cloud Scheduler-backed heartbeat services: {exc}; "
"falling back to all configured services.",
file=sys.stderr,
)
return services, None, False
if not due_services:
return (
[],
"no configured Cloud Scheduler main job was due in the heartbeat lookback window",
True,
)
return due_services, None, True


def _unique_values(values: list[str]) -> list[str]:
seen = set()
unique = []
for service in services:
if service not in seen:
seen.add(service)
unique.append(service)
for value in values:
if value not in seen:
seen.add(value)
unique.append(value)
return unique


def _scheduler_location() -> str:
return (
os.environ.get("RUNTIME_HEARTBEAT_SCHEDULER_LOCATION")
or os.environ.get("CLOUD_RUN_REGION")
or "us-central1"
)


def _list_scheduler_jobs(*, project: str | None) -> list[dict[str, Any]]:
command = [
"gcloud",
"scheduler",
"jobs",
"list",
"--location",
_scheduler_location(),
"--format=json",
]
if project:
command.extend(["--project", project])
result = _run_gcloud(command)
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip()
raise RuntimeError(detail or "gcloud scheduler jobs list failed")
if not result.stdout.strip():
return []
try:
payload = json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"gcloud scheduler jobs list returned invalid JSON: {exc}") from exc
return payload if isinstance(payload, list) else []


def _scheduler_job_targets_strategy_run(job: dict[str, Any], service: str) -> bool:
if str(job.get("state") or "").strip().upper() not in {"", "ENABLED"}:
return False
uri = str((job.get("httpTarget") or {}).get("uri") or "").strip()
if not uri:
return False
parsed = urllib.parse.urlparse(uri)
path = parsed.path or "/"
if path != "/":
return False
service_text = str(service or "").strip().lower()
return bool(service_text and service_text in parsed.netloc.lower())


def _cron_token_value(token: str, *, names: dict[str, int] | None = None) -> int:
normalized = token.strip().lower()
if names and normalized in names:
return names[normalized]
return int(normalized)


def _cron_field_values(
field: str,
*,
minimum: int,
maximum: int,
names: dict[str, int] | None = None,
) -> set[int] | None:
text = str(field or "").strip().lower()
if text in {"", "*"}:
return None
values: set[int] = set()
for raw_part in text.split(","):
part = raw_part.strip()
if not part:
continue
base, raw_step = part, "1"
if "/" in part:
base, raw_step = part.split("/", 1)
step = max(1, int(raw_step))
if base == "*":
start, end = minimum, maximum
elif "-" in base:
raw_start, raw_end = base.split("-", 1)
start = _cron_token_value(raw_start, names=names)
end = _cron_token_value(raw_end, names=names)
else:
start = end = _cron_token_value(base, names=names)
for value in range(start, end + 1, step):
if minimum <= value <= maximum:
values.add(value)
elif maximum == 6 and value == 7:
values.add(0)
return values


def _cron_matches(schedule: str, value: dt.datetime) -> bool:
fields = str(schedule or "").split()
if len(fields) != 5:
return False
minute, hour, day_of_month, month, day_of_week = fields
dow_names = {
"sun": 0,
"mon": 1,
"tue": 2,
"wed": 3,
"thu": 4,
"fri": 5,
"sat": 6,
}
minute_values = _cron_field_values(minute, minimum=0, maximum=59)
hour_values = _cron_field_values(hour, minimum=0, maximum=23)
dom_values = _cron_field_values(day_of_month, minimum=1, maximum=31)
month_values = _cron_field_values(month, minimum=1, maximum=12)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle Cloud Scheduler month names

Cloud Scheduler accepts month names (JAN through DEC) in the month field per the Google Cloud cron-format docs, but this parser only supplies a names table for day-of-week. When a valid scheduler job for one of these services uses a schedule such as 45 15 * JAN *, _cron_token_value() tries int('jan'), raising ValueError; that exception is not caught by _resolve_required_services, so the heartbeat workflow exits instead of checking or safely skipping the service.

Useful? React with 👍 / 👎.

dow_values = _cron_field_values(day_of_week, minimum=0, maximum=6, names=dow_names)
if minute_values is not None and value.minute not in minute_values:
return False
if hour_values is not None and value.hour not in hour_values:
return False
if month_values is not None and value.month not in month_values:
return False

dom_matches = dom_values is None or value.day in dom_values
cron_weekday = value.isoweekday() % 7
dow_matches = dow_values is None or cron_weekday in dow_values
if dom_values is not None and dow_values is not None:
return dom_matches or dow_matches
return dom_matches and dow_matches


def _scheduler_job_due_between(
job: dict[str, Any],
*,
since: dt.datetime,
now: dt.datetime,
) -> bool:
schedule = str(job.get("schedule") or "").strip()
if not schedule:
return False
try:
timezone = ZoneInfo(str(job.get("timeZone") or "UTC"))
except Exception: # noqa: BLE001
timezone = dt.timezone.utc

since_utc = since.astimezone(dt.timezone.utc)
now_utc = now.astimezone(dt.timezone.utc)
cursor = since_utc.replace(second=0, microsecond=0)
if cursor < since_utc:
cursor += dt.timedelta(minutes=1)
while cursor <= now_utc:
if _cron_matches(schedule, cursor.astimezone(timezone)):
return True
cursor += dt.timedelta(minutes=1)
return False


def _filter_scheduler_due_services(
services: list[str],
*,
project: str | None,
since: dt.datetime,
now: dt.datetime,
) -> list[str]:
jobs = _list_scheduler_jobs(project=project)
due_services = []
for service in services:
service_jobs = [
job for job in jobs if _scheduler_job_targets_strategy_run(job, service)
]
if not service_jobs or any(
_scheduler_job_due_between(job, since=since, now=now)
for job in service_jobs
):
due_services.append(service)
return due_services


def _report_globs(since: dt.datetime, now: dt.datetime) -> list[str]:
explicit = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_GLOBS"))
if explicit:
Expand Down Expand Up @@ -380,21 +607,29 @@ def main(now: dt.datetime | None = None) -> int:
lookback_hours = float(os.environ.get("RUNTIME_HEARTBEAT_LOOKBACK_HOURS") or "36")
max_reports = int(os.environ.get("RUNTIME_HEARTBEAT_MAX_REPORTS_TO_READ") or "20")
fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True)
required_services = _load_required_services()

now = now or dt.datetime.now(dt.timezone.utc)
if now.tzinfo is None:
now = now.replace(tzinfo=dt.timezone.utc)
now = now.astimezone(dt.timezone.utc)
try:
expected_window = _expected_report_window_status(now)
except ValueError as exc:
raise SystemExit(str(exc)) from exc
if expected_window and not expected_window[0]:
print(f"Execution report heartbeat skipped for {name}: {expected_window[1]}")
since = now - dt.timedelta(hours=lookback_hours)
required_services, scheduler_skip_reason, scheduler_checked = _resolve_required_services(
project=project,
since=since,
now=now,
)
if scheduler_skip_reason:
print(f"Execution report heartbeat skipped for {name}: {scheduler_skip_reason}")
return 0
if not scheduler_checked:
try:
expected_window = _expected_report_window_status(now)
except ValueError as exc:
raise SystemExit(str(exc)) from exc
if expected_window and not expected_window[0]:
print(f"Execution report heartbeat skipped for {name}: {expected_window[1]}")
return 0

since = now - dt.timedelta(hours=lookback_hours)
globs = _report_globs(since, now)
if not globs:
raise SystemExit("No heartbeat GCS report URI configured")
Expand Down
Loading