Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
build_sender,
build_translator,
)
from runtime_logging import RuntimeLogContext, build_run_id, emit_runtime_log
from quant_platform_kit.longbridge import (
build_contexts,
calculate_rotation_indicators,
Expand Down Expand Up @@ -80,6 +81,16 @@ def get_project_id():
TOKEN_REFRESH_THRESHOLD_DAYS = 30

SEPARATOR = "━━━━━━━━━━━━━━━━━━"
RUNTIME_LOG_CONTEXT = RuntimeLogContext(
platform="longbridge",
deploy_target="cloud_run",
service_name=SERVICE_NAME or os.getenv("K_SERVICE", "longbridge-platform"),
strategy_profile=STRATEGY_PROFILE,
account_scope=ACCOUNT_REGION,
account_region=ACCOUNT_REGION,
project_id=PROJECT_ID,
extra_fields={"account_prefix": ACCOUNT_PREFIX},
)

def t(key, **kwargs):
return build_translator(NOTIFY_LANG)(key, **kwargs)
Expand All @@ -94,6 +105,15 @@ def notify_issue(title, detail):
return build_issue_notifier(with_prefix_fn=with_prefix, send_tg_message_fn=send_tg_message)(title, detail)


def log_runtime_event(log_context, event, **fields):
return emit_runtime_log(
log_context,
event,
printer=lambda line: print(line, flush=True),
**fields,
)


def is_filled_status(status):
return notifications_is_filled_status(status)

Expand Down Expand Up @@ -207,14 +227,32 @@ def resolve_rebalance_plan(*, indicators, account_state):


def run_strategy():
log_context = RUNTIME_LOG_CONTEXT.with_run(build_run_id())
try:
log_runtime_event(
log_context,
"strategy_cycle_started",
message="Starting strategy execution",
)
print(with_prefix(f"[{datetime.now()}] Starting strategy..."), flush=True)

market_open = is_market_open_now()
if isinstance(market_open, tuple):
market_open, error = market_open
log_runtime_event(
log_context,
"market_hours_check_failed",
message="Market hours check failed",
severity="WARNING",
error_message=str(error),
)
print(with_prefix(f"Market hours check failed: {error}"), flush=True)
if not market_open:
log_runtime_event(
log_context,
"outside_market_hours",
message="Outside market hours; skip execution",
)
print(with_prefix("Outside market hours; skip."), flush=True)
return
run_rebalance_cycle(
Expand All @@ -238,8 +276,21 @@ def run_strategy():
estimate_max_purchase_quantity=estimate_max_purchase_quantity,
submit_order_with_alert=submit_order_with_alert,
)
log_runtime_event(
log_context,
"strategy_cycle_completed",
message="Strategy execution completed",
)

except Exception:
except Exception as exc:
log_runtime_event(
log_context,
"strategy_cycle_failed",
message="Strategy execution failed",
severity="ERROR",
error_type=type(exc).__name__,
error_message=str(exc),
)
err = traceback.format_exc()
print(with_prefix(f"Strategy error:\n{err}"), flush=True)
send_tg_message(f"{t('error_title')}\n{err}")
Expand Down
153 changes: 153 additions & 0 deletions runtime_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from __future__ import annotations

import json
from dataclasses import dataclass, field, replace
from datetime import datetime, timezone
from typing import Any, Callable, Mapping


LogPrinter = Callable[..., Any]


def build_run_id(now: datetime | None = None) -> str:
current = now.astimezone(timezone.utc) if now is not None else datetime.now(timezone.utc)
return current.strftime("%Y%m%dT%H%M%SZ")


def extract_cloud_trace(project_id: str | None, header_value: str | None) -> str | None:
if not project_id or not header_value:
return None
trace_id = str(header_value).split("/", 1)[0].strip()
if not trace_id:
return None
return f"projects/{project_id}/traces/{trace_id}"


@dataclass(frozen=True)
class RuntimeLogContext:
platform: str
deploy_target: str
service_name: str
strategy_profile: str
run_id: str = ""
account_scope: str | None = None
account_group: str | None = None
account_region: str | None = None
project_id: str | None = None
instance_name: str | None = None
trace: str | None = None
extra_fields: Mapping[str, Any] = field(default_factory=dict)

def __post_init__(self) -> None:
for field_name in ("platform", "deploy_target", "service_name", "strategy_profile"):
if not str(getattr(self, field_name, "") or "").strip():
raise ValueError(f"{field_name} must not be empty")

def with_run(
self,
run_id: str | None = None,
*,
trace: str | None = None,
extra_fields: Mapping[str, Any] | None = None,
) -> "RuntimeLogContext":
merged_extra = dict(self.extra_fields)
if extra_fields:
merged_extra.update(dict(extra_fields))
return replace(
self,
run_id=str(run_id or self.run_id or ""),
trace=self.trace if trace is None else trace,
extra_fields=merged_extra,
)


def emit_runtime_log(
context: RuntimeLogContext,
event: str,
*,
message: str | None = None,
severity: str = "INFO",
printer: LogPrinter = print,
now: datetime | None = None,
**fields: Any,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"timestamp": _format_timestamp(now),
"severity": str(severity or "INFO").upper(),
"event": str(event),
"message": str(message or event),
"platform": context.platform,
"deploy_target": context.deploy_target,
"service_name": context.service_name,
"strategy_profile": context.strategy_profile,
"run_id": context.run_id or None,
"account_scope": context.account_scope,
"account_group": context.account_group,
"account_region": context.account_region,
"project_id": context.project_id,
"instance_name": context.instance_name,
}
payload.update(_normalize_mapping(context.extra_fields))
payload.update(_normalize_mapping(fields))
if context.trace:
payload["logging.googleapis.com/trace"] = context.trace

cleaned_payload = _drop_empty(payload)
encoded = json.dumps(cleaned_payload, ensure_ascii=False, sort_keys=True, default=_json_default)
_write_log_line(printer, encoded)
return cleaned_payload



def _format_timestamp(now: datetime | None) -> str:
current = now.astimezone(timezone.utc) if now is not None else datetime.now(timezone.utc)
return current.isoformat().replace("+00:00", "Z")



def _normalize_mapping(mapping: Mapping[str, Any] | None) -> dict[str, Any]:
if not mapping:
return {}
return {str(key): _normalize_value(value) for key, value in mapping.items()}



def _normalize_value(value: Any) -> Any:
if isinstance(value, datetime):
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(value, Mapping):
return _drop_empty({str(key): _normalize_value(item) for key, item in value.items()})
if isinstance(value, tuple):
return [_normalize_value(item) for item in value]
if isinstance(value, list):
return [_normalize_value(item) for item in value]
return value



def _drop_empty(payload: Mapping[str, Any]) -> dict[str, Any]:
cleaned: dict[str, Any] = {}
for key, value in payload.items():
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
if isinstance(value, (list, tuple, dict)) and len(value) == 0:
continue
cleaned[str(key)] = value
return cleaned



def _json_default(value: Any) -> Any:
if isinstance(value, datetime):
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
return str(value)



def _write_log_line(printer: LogPrinter, line: str) -> None:
try:
printer(line, flush=True)
except TypeError:
printer(line)
17 changes: 17 additions & 0 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,23 @@ def fake_run_strategy():
self.assertEqual(body, "OK")
self.assertTrue(observed["called"])

def test_run_strategy_emits_structured_runtime_events(self):
module = load_module()
observed = []

module.build_run_id = lambda: "run-001"
module.emit_runtime_log = lambda context, event, **fields: observed.append((context.run_id, event, fields))
module.is_market_open_now = lambda: True
module.run_rebalance_cycle = lambda **_kwargs: None

module.run_strategy()

self.assertEqual(
[event for _run_id, event, _fields in observed],
["strategy_cycle_started", "strategy_cycle_completed"],
)
self.assertTrue(all(run_id == "run-001" for run_id, _event, _fields in observed))


if __name__ == "__main__":
unittest.main()