From 5534d79f2d11845bdbf7f6d8d1f500caae720399 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 7 Apr 2026 09:34:45 +0800 Subject: [PATCH] feat: add structured runtime events for request handling --- main.py | 53 +++++++++++- runtime_logging.py | 153 +++++++++++++++++++++++++++++++++ tests/test_request_handling.py | 17 ++++ 3 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 runtime_logging.py diff --git a/main.py b/main.py index 0507b70..3252ea1 100644 --- a/main.py +++ b/main.py @@ -29,6 +29,7 @@ build_sender, build_translator, ) +from runtime_logging import RuntimeLogContext, build_run_id, emit_runtime_log from quant_platform_kit.longbridge import ( build_contexts, calculate_rotation_indicators, @@ -80,6 +81,16 @@ def get_project_id(): TOKEN_REFRESH_THRESHOLD_DAYS = 30 SEPARATOR = "━━━━━━━━━━━━━━━━━━" +RUNTIME_LOG_CONTEXT = RuntimeLogContext( + platform="longbridge", + deploy_target="cloud_run", + service_name=SERVICE_NAME or os.getenv("K_SERVICE", "longbridge-platform"), + strategy_profile=STRATEGY_PROFILE, + account_scope=ACCOUNT_REGION, + account_region=ACCOUNT_REGION, + project_id=PROJECT_ID, + extra_fields={"account_prefix": ACCOUNT_PREFIX}, +) def t(key, **kwargs): return build_translator(NOTIFY_LANG)(key, **kwargs) @@ -94,6 +105,15 @@ def notify_issue(title, detail): return build_issue_notifier(with_prefix_fn=with_prefix, send_tg_message_fn=send_tg_message)(title, detail) +def log_runtime_event(log_context, event, **fields): + return emit_runtime_log( + log_context, + event, + printer=lambda line: print(line, flush=True), + **fields, + ) + + def is_filled_status(status): return notifications_is_filled_status(status) @@ -207,14 +227,32 @@ def resolve_rebalance_plan(*, indicators, account_state): def run_strategy(): + log_context = RUNTIME_LOG_CONTEXT.with_run(build_run_id()) try: + log_runtime_event( + log_context, + "strategy_cycle_started", + message="Starting strategy execution", + ) print(with_prefix(f"[{datetime.now()}] Starting strategy..."), flush=True) market_open = is_market_open_now() if isinstance(market_open, tuple): market_open, error = market_open + log_runtime_event( + log_context, + "market_hours_check_failed", + message="Market hours check failed", + severity="WARNING", + error_message=str(error), + ) print(with_prefix(f"Market hours check failed: {error}"), flush=True) if not market_open: + log_runtime_event( + log_context, + "outside_market_hours", + message="Outside market hours; skip execution", + ) print(with_prefix("Outside market hours; skip."), flush=True) return run_rebalance_cycle( @@ -238,8 +276,21 @@ def run_strategy(): estimate_max_purchase_quantity=estimate_max_purchase_quantity, submit_order_with_alert=submit_order_with_alert, ) + log_runtime_event( + log_context, + "strategy_cycle_completed", + message="Strategy execution completed", + ) - except Exception: + except Exception as exc: + log_runtime_event( + log_context, + "strategy_cycle_failed", + message="Strategy execution failed", + severity="ERROR", + error_type=type(exc).__name__, + error_message=str(exc), + ) err = traceback.format_exc() print(with_prefix(f"Strategy error:\n{err}"), flush=True) send_tg_message(f"{t('error_title')}\n{err}") diff --git a/runtime_logging.py b/runtime_logging.py new file mode 100644 index 0000000..dcf871f --- /dev/null +++ b/runtime_logging.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass, field, replace +from datetime import datetime, timezone +from typing import Any, Callable, Mapping + + +LogPrinter = Callable[..., Any] + + +def build_run_id(now: datetime | None = None) -> str: + current = now.astimezone(timezone.utc) if now is not None else datetime.now(timezone.utc) + return current.strftime("%Y%m%dT%H%M%SZ") + + +def extract_cloud_trace(project_id: str | None, header_value: str | None) -> str | None: + if not project_id or not header_value: + return None + trace_id = str(header_value).split("/", 1)[0].strip() + if not trace_id: + return None + return f"projects/{project_id}/traces/{trace_id}" + + +@dataclass(frozen=True) +class RuntimeLogContext: + platform: str + deploy_target: str + service_name: str + strategy_profile: str + run_id: str = "" + account_scope: str | None = None + account_group: str | None = None + account_region: str | None = None + project_id: str | None = None + instance_name: str | None = None + trace: str | None = None + extra_fields: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + for field_name in ("platform", "deploy_target", "service_name", "strategy_profile"): + if not str(getattr(self, field_name, "") or "").strip(): + raise ValueError(f"{field_name} must not be empty") + + def with_run( + self, + run_id: str | None = None, + *, + trace: str | None = None, + extra_fields: Mapping[str, Any] | None = None, + ) -> "RuntimeLogContext": + merged_extra = dict(self.extra_fields) + if extra_fields: + merged_extra.update(dict(extra_fields)) + return replace( + self, + run_id=str(run_id or self.run_id or ""), + trace=self.trace if trace is None else trace, + extra_fields=merged_extra, + ) + + +def emit_runtime_log( + context: RuntimeLogContext, + event: str, + *, + message: str | None = None, + severity: str = "INFO", + printer: LogPrinter = print, + now: datetime | None = None, + **fields: Any, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "timestamp": _format_timestamp(now), + "severity": str(severity or "INFO").upper(), + "event": str(event), + "message": str(message or event), + "platform": context.platform, + "deploy_target": context.deploy_target, + "service_name": context.service_name, + "strategy_profile": context.strategy_profile, + "run_id": context.run_id or None, + "account_scope": context.account_scope, + "account_group": context.account_group, + "account_region": context.account_region, + "project_id": context.project_id, + "instance_name": context.instance_name, + } + payload.update(_normalize_mapping(context.extra_fields)) + payload.update(_normalize_mapping(fields)) + if context.trace: + payload["logging.googleapis.com/trace"] = context.trace + + cleaned_payload = _drop_empty(payload) + encoded = json.dumps(cleaned_payload, ensure_ascii=False, sort_keys=True, default=_json_default) + _write_log_line(printer, encoded) + return cleaned_payload + + + +def _format_timestamp(now: datetime | None) -> str: + current = now.astimezone(timezone.utc) if now is not None else datetime.now(timezone.utc) + return current.isoformat().replace("+00:00", "Z") + + + +def _normalize_mapping(mapping: Mapping[str, Any] | None) -> dict[str, Any]: + if not mapping: + return {} + return {str(key): _normalize_value(value) for key, value in mapping.items()} + + + +def _normalize_value(value: Any) -> Any: + if isinstance(value, datetime): + return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + if isinstance(value, Mapping): + return _drop_empty({str(key): _normalize_value(item) for key, item in value.items()}) + if isinstance(value, tuple): + return [_normalize_value(item) for item in value] + if isinstance(value, list): + return [_normalize_value(item) for item in value] + return value + + + +def _drop_empty(payload: Mapping[str, Any]) -> dict[str, Any]: + cleaned: dict[str, Any] = {} + for key, value in payload.items(): + if value is None: + continue + if isinstance(value, str) and not value.strip(): + continue + if isinstance(value, (list, tuple, dict)) and len(value) == 0: + continue + cleaned[str(key)] = value + return cleaned + + + +def _json_default(value: Any) -> Any: + if isinstance(value, datetime): + return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + return str(value) + + + +def _write_log_line(printer: LogPrinter, line: str) -> None: + try: + printer(line, flush=True) + except TypeError: + printer(line) diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index dfb92bb..2049a78 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -174,6 +174,23 @@ def fake_run_strategy(): self.assertEqual(body, "OK") self.assertTrue(observed["called"]) + def test_run_strategy_emits_structured_runtime_events(self): + module = load_module() + observed = [] + + module.build_run_id = lambda: "run-001" + module.emit_runtime_log = lambda context, event, **fields: observed.append((context.run_id, event, fields)) + module.is_market_open_now = lambda: True + module.run_rebalance_cycle = lambda **_kwargs: None + + module.run_strategy() + + self.assertEqual( + [event for _run_id, event, _fields in observed], + ["strategy_cycle_started", "strategy_cycle_completed"], + ) + self.assertTrue(all(run_id == "run-001" for run_id, _event, _fields in observed)) + if __name__ == "__main__": unittest.main()