-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdecision_mapper.py
More file actions
196 lines (172 loc) · 7.2 KB
/
decision_mapper.py
File metadata and controls
196 lines (172 loc) · 7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from quant_platform_kit.strategy_contracts import (
StrategyDecision,
build_allocation_intent,
build_allocation_payload,
resolve_decision_target_mode,
translate_decision_to_target_mode,
)
from strategy_registry import IBKR_PLATFORM, resolve_strategy_definition
_EMERGENCY_FLAGS = frozenset({"emergency", "hard_defense"})
_NO_EXECUTE_FLAGS = frozenset({"no_execute"})
def _resolve_allocation_order(strategy_profile: str) -> str:
canonical_profile = resolve_strategy_definition(
strategy_profile,
platform_id=IBKR_PLATFORM,
).profile
if canonical_profile == "soxl_soxx_trend_income":
return "risk_income_safe"
return "risk_safe_income"
def _normalize_to_weight_decision(
decision: StrategyDecision,
runtime_metadata: Mapping[str, Any],
) -> StrategyDecision:
return translate_decision_to_target_mode(
decision,
target_mode="weight",
total_equity=runtime_metadata.get("portfolio_total_equity"),
)
def _derive_target_weights(decision: StrategyDecision) -> dict[str, float]:
weights: dict[str, float] = {}
for position in decision.positions:
if position.target_weight is None:
raise ValueError(
"IBKR decision mapper only supports weight-based positions; "
f"position {position.symbol!r} is missing target_weight"
)
weights[position.symbol] = float(position.target_weight)
return weights
def _derive_managed_symbols(
decision: StrategyDecision,
runtime_metadata: Mapping[str, Any],
*,
allocation_payload: Mapping[str, Any] | None = None,
) -> tuple[str, ...]:
explicit = runtime_metadata.get("managed_symbols")
if explicit:
return tuple(str(symbol) for symbol in explicit)
allocation_symbols = allocation_payload.get("strategy_symbols") if allocation_payload else None
if allocation_symbols:
return tuple(str(symbol) for symbol in allocation_symbols)
return tuple(position.symbol for position in decision.positions)
def _derive_safe_haven_symbol(
decision: StrategyDecision,
runtime_metadata: Mapping[str, Any],
) -> str | None:
explicit = runtime_metadata.get("safe_haven_symbol")
if explicit:
return str(explicit)
for position in decision.positions:
if position.role == "safe_haven":
return position.symbol
return None
def _derive_signal_description(
decision: StrategyDecision,
runtime_metadata: Mapping[str, Any],
) -> str:
diagnostics = decision.diagnostics
candidates = (
diagnostics.get("signal_description"),
diagnostics.get("signal_display"),
diagnostics.get("signal_message"),
runtime_metadata.get("signal_description"),
runtime_metadata.get("signal_display"),
)
for candidate in candidates:
text = str(candidate or "").strip()
if text:
return text
return "decision_ready"
def _derive_status_description(
decision: StrategyDecision,
runtime_metadata: Mapping[str, Any],
) -> str:
diagnostics = decision.diagnostics
candidates = (
diagnostics.get("status_description"),
diagnostics.get("canary_status"),
diagnostics.get("market_status"),
runtime_metadata.get("status_description"),
)
for candidate in candidates:
text = str(candidate or "").strip()
if text:
return text
return _derive_signal_description(decision, runtime_metadata)
def _derive_execution_annotations(
diagnostics: Mapping[str, Any],
runtime_metadata: Mapping[str, Any],
) -> dict[str, Any]:
annotations: dict[str, Any] = {}
raw_runtime_annotations = runtime_metadata.get("execution_annotations")
if isinstance(raw_runtime_annotations, Mapping):
annotations.update(raw_runtime_annotations)
raw_diagnostic_annotations = diagnostics.get("execution_annotations")
if isinstance(raw_diagnostic_annotations, Mapping):
annotations.update(raw_diagnostic_annotations)
return annotations
def map_strategy_decision(
decision: StrategyDecision,
*,
strategy_profile: str,
runtime_metadata: Mapping[str, Any] | None = None,
) -> tuple[dict[str, float] | None, str, bool, str, dict[str, Any]]:
runtime_metadata = dict(runtime_metadata or {})
canonical_profile = resolve_strategy_definition(
strategy_profile,
platform_id=IBKR_PLATFORM,
).profile
diagnostics = dict(decision.diagnostics)
risk_flags = tuple(str(flag) for flag in decision.risk_flags)
no_execute = bool(_NO_EXECUTE_FLAGS & set(risk_flags))
target_mode = resolve_decision_target_mode(decision)
total_equity_value = runtime_metadata.get("portfolio_total_equity")
if not no_execute and target_mode == "value" and total_equity_value is not None:
total_equity = float(total_equity_value)
if total_equity <= 0.0:
no_execute = True
risk_flags = tuple(dict.fromkeys((*risk_flags, "no_execute")))
diagnostics.setdefault("execution_blocked_reason", "non_positive_total_equity")
diagnostics.setdefault("portfolio_total_equity", total_equity)
normalized_decision = decision if no_execute else _normalize_to_weight_decision(decision, runtime_metadata)
target_weights = None if no_execute else _derive_target_weights(normalized_decision)
allocation_payload = None
if not no_execute and normalized_decision.positions:
allocation_payload = build_allocation_payload(
build_allocation_intent(
normalized_decision,
strategy_profile=canonical_profile,
strategy_symbols_order=_resolve_allocation_order(canonical_profile),
)
)
signal_desc = _derive_signal_description(decision, runtime_metadata)
status_desc = _derive_status_description(decision, runtime_metadata)
is_emergency = bool(_EMERGENCY_FLAGS & set(risk_flags))
metadata: dict[str, Any] = {**runtime_metadata, **diagnostics}
metadata.setdefault("strategy_profile", canonical_profile)
metadata.setdefault("status_icon", "🐤")
metadata.setdefault(
"managed_symbols",
_derive_managed_symbols(normalized_decision, runtime_metadata, allocation_payload=allocation_payload),
)
safe_haven_symbol = _derive_safe_haven_symbol(normalized_decision, runtime_metadata)
if safe_haven_symbol:
metadata.setdefault("safe_haven_symbol", safe_haven_symbol)
metadata.setdefault("risk_flags", risk_flags)
metadata.setdefault("actionable", not no_execute)
if allocation_payload:
metadata.setdefault("allocation", allocation_payload)
execution_annotations = _derive_execution_annotations(diagnostics, runtime_metadata)
if execution_annotations:
metadata.setdefault("execution_annotations", execution_annotations)
dashboard_text = str(
execution_annotations.get("dashboard_text")
or diagnostics.get("dashboard")
or metadata.get("dashboard_text")
or ""
).strip()
if dashboard_text:
metadata.setdefault("dashboard_text", dashboard_text)
return target_weights, signal_desc, is_emergency, status_desc, metadata