Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion application/runtime_strategy_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,34 @@
)


def _get_direct_market_history_profiles() -> frozenset[str]:
try:
from hk_equity_strategies import get_direct_market_history_profiles
except (ImportError, AttributeError): # pragma: no cover - compatibility fallback
return frozenset()
return frozenset(
str(profile).strip().lower()
for profile in get_direct_market_history_profiles()
)


def _requires_materialized_market_history(strategy_profile: str) -> bool:
return str(strategy_profile or "").strip().lower() in _get_direct_market_history_profiles()


def _loaded_history_to_rows(history):
if (
hasattr(history, "items")
and not hasattr(history, "columns")
and not isinstance(history, Mapping)
):
return [
{"date": date_value, "close": close_value}
for date_value, close_value in history.items()
]
return history


@dataclass(frozen=True)
class LongBridgeRuntimeStrategyAdapters:
strategy_runtime: Any
Expand Down Expand Up @@ -80,8 +108,13 @@ def calculate_strategy_indicators(self, quote_context):
if "market_history" in available_inputs or "benchmark_history" in available_inputs or "qqq_history" in available_inputs:
market_data_port = self.broker_adapters.build_market_data_port(quote_context)
if "market_history" in available_inputs:
market_history = (
self._build_materialized_market_history(market_data_port)
if _requires_materialized_market_history(self.strategy_profile)
else self.broker_adapters.build_market_history_loader(market_data_port)
)
market_inputs = {
"market_history": self.broker_adapters.build_market_history_loader(market_data_port),
"market_history": market_history,
}
if "benchmark_history" in available_inputs:
market_inputs["benchmark_history"] = self.broker_adapters.build_price_history(
Expand All @@ -99,6 +132,29 @@ def calculate_strategy_indicators(self, quote_context):
trend_ma_window = int(self.strategy_runtime_config.get("trend_ma_window", 150))
return self.calculate_rotation_indicators_fn(quote_context, trend_window=trend_ma_window)

def _market_history_symbols(self) -> tuple[str, ...]:
raw_symbols = (
self.strategy_runtime_config.get("universe_symbols")
or getattr(self.broker_adapters, "strategy_symbols", ())
or getattr(self.strategy_runtime, "managed_symbols", ())
)
if isinstance(raw_symbols, str):
raw_symbols = raw_symbols.replace(";", ",").split(",")
return tuple(
dict.fromkeys(
str(symbol).strip()
for symbol in raw_symbols
if str(symbol).strip()
)
)

def _build_materialized_market_history(self, market_data_port):
load_market_history = self.broker_adapters.build_market_history_loader(market_data_port)
return {
symbol: _loaded_history_to_rows(load_market_history(None, symbol))
for symbol in self._market_history_symbols()
}

def resolve_rebalance_plan(self, *, indicators, snapshot=None, account_state=None):
available_inputs = set(self.available_inputs)
resolved_snapshot = snapshot
Expand Down
61 changes: 60 additions & 1 deletion tests/test_runtime_strategy_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
from pathlib import Path
from types import SimpleNamespace

import pandas as pd


ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
HK_STRATEGIES_SRC = ROOT.parent / "HkEquityStrategies" / "src"
if str(HK_STRATEGIES_SRC) not in sys.path:
sys.path.insert(0, str(HK_STRATEGIES_SRC))

from application.runtime_strategy_adapters import build_runtime_strategy_adapters

Expand Down Expand Up @@ -35,7 +40,9 @@ def build_price_history(self, market_data_port, symbol):
signal_text_fn=lambda icon: f"signal:{icon}",
translator=lambda key, **_kwargs: key,
broker_adapters=FakeBrokerAdapters(),
calculate_rotation_indicators_fn=lambda *_args, **_kwargs: (_ for _ in ()).throw(AssertionError("unexpected fallback")),
calculate_rotation_indicators_fn=lambda *_args, **_kwargs: (
_ for _ in ()
).throw(AssertionError("unexpected fallback")),
build_strategy_evaluation_inputs_fn=lambda **_kwargs: {},
map_strategy_decision_to_plan_fn=lambda *_args, **_kwargs: {},
)
Expand All @@ -52,6 +59,58 @@ def build_price_history(self, market_data_port, symbol):
}


def test_runtime_strategy_adapters_materialize_hk_direct_market_history():
observed = {}

class FakeBrokerAdapters:
strategy_symbols = ("02800", "02834")

def build_market_data_port(self, quote_context):
observed["market_data_port_context"] = quote_context
return "market-data-port"

def build_market_history_loader(self, market_data_port):
observed["market_history_loader_port"] = market_data_port

def load_market_history(_broker_client, symbol):
observed.setdefault("history_calls", []).append(symbol)
return pd.Series(
[10.0, 11.0],
index=pd.to_datetime(["2026-05-29", "2026-06-01"], utc=True),
dtype=float,
)

return load_market_history

adapters = build_runtime_strategy_adapters(
strategy_runtime=SimpleNamespace(evaluate=lambda **_kwargs: None),
strategy_profile="hk_listed_global_etf_rotation",
strategy_runtime_config={"universe_symbols": ("02800", "02834")},
available_inputs=("market_history",),
benchmark_symbol="QQQ",
signal_text_fn=lambda icon: f"signal:{icon}",
translator=lambda key, **_kwargs: key,
broker_adapters=FakeBrokerAdapters(),
calculate_rotation_indicators_fn=lambda *_args, **_kwargs: (
_ for _ in ()
).throw(AssertionError("unexpected fallback")),
build_strategy_evaluation_inputs_fn=lambda **_kwargs: {},
map_strategy_decision_to_plan_fn=lambda *_args, **_kwargs: {},
)

result = adapters.calculate_strategy_indicators("quote-context")

assert observed["market_data_port_context"] == "quote-context"
assert observed["market_history_loader_port"] == "market-data-port"
assert observed["history_calls"] == ["02800", "02834"]
assert sorted(result["market_history"]) == ["02800", "02834"]
assert result["market_history"]["02800"][0]["date"] == pd.Timestamp(
"2026-05-29",
tz="UTC",
)
assert result["market_history"]["02800"][0]["close"] == 10.0


def test_runtime_strategy_adapters_fall_back_to_rotation_indicators():
observed = {}

Expand Down