Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/strategy_plugin_runtime_contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ Do not put `mode` in the platform mount config. `expected_mode` may be used only
as a fail-closed guard and should be `shadow` when present. Artifacts declaring
`paper`, `advisory`, or `live` are rejected.

## Plugin Definitions

The shared kit owns plugin compatibility through a registry-style
`StrategyPluginDefinition`. Platform repos should not hard-code which strategies
a plugin supports; they should call the shared parser/loader and let it reject
unsupported mounts or artifacts.

The default registry currently defines:

| Plugin | Supported strategies | Supported mode | Escalated alert channel |
| --- | --- | --- | --- |
| `crisis_response_shadow` | `tqqq_growth_income`, `soxl_soxx_trend_income` | `shadow` | `google_voice` |

To expand a plugin later, update the shared definition or pass an explicit
definition registry into the parser/loader. This keeps future plugin eligibility
changes out of platform runtime code.

## Runtime Loader

Use `quant_platform_kit.common.strategy_plugins`:
Expand Down
14 changes: 14 additions & 0 deletions src/quant_platform_kit/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@
resolve_runtime_target_from_env,
)
from .strategy_plugins import (
CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES,
DEFAULT_STRATEGY_PLUGIN_DEFINITIONS,
PLUGIN_CRISIS_RESPONSE_SHADOW,
PLUGIN_MODE_SHADOW,
STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE,
STRATEGY_PLUGIN_ALERT_ACTIONS,
STRATEGY_PLUGIN_NON_ALERT_ROUTES,
SUPPORTED_STRATEGY_PLUGIN_MODES,
StrategyPluginAlertMessage,
StrategyPluginDefinition,
StrategyPluginMountConfig,
StrategyPluginSignal,
build_strategy_plugin_alert_key,
Expand All @@ -52,18 +57,23 @@
build_strategy_plugin_report_payload,
load_configured_strategy_plugin_signals,
load_strategy_plugin_signal,
normalize_strategy_plugin_definitions,
normalize_strategy_plugin_mode,
parse_strategy_plugin_mounts,
should_alert_strategy_plugin_signal,
translate_strategy_plugin_value,
validate_strategy_plugin_compatibility,
validate_strategy_plugin_signal_payload,
)

__all__ = [
"COMMON_ZH_NOTIFICATION_REPLACEMENTS",
"CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES",
"DEFAULT_EXECUTION_BLOCKING_SKIP_REASONS",
"DEFAULT_STRATEGY_PLUGIN_DEFINITIONS",
"DEFAULT_TERMINAL_FUNDING_BLOCK_SKIP_REASONS",
"DEFAULT_TERMINAL_STRATEGY_RUN_STAGES",
"PLUGIN_CRISIS_RESPONSE_SHADOW",
"PLUGIN_MODE_SHADOW",
"STAGE_COMPLETED",
"STAGE_DRY_RUN_COMPLETED",
Expand All @@ -74,6 +84,7 @@
"STAGE_PARTIAL_SUBMITTED",
"STAGE_RECONCILED",
"STAGE_SUBMITTED",
"STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE",
"STRATEGY_PLUGIN_ALERT_ACTIONS",
"STRATEGY_PLUGIN_NON_ALERT_ROUTES",
"SUPPORTED_STRATEGY_PLUGIN_MODES",
Expand All @@ -93,6 +104,7 @@
"RuntimeAssembly",
"build_runtime_assembly",
"StrategyPluginAlertMessage",
"StrategyPluginDefinition",
"StrategyPluginMountConfig",
"StrategyPluginSignal",
"build_strategy_plugin_alert_key",
Expand All @@ -102,11 +114,13 @@
"build_runtime_target",
"load_configured_strategy_plugin_signals",
"load_strategy_plugin_signal",
"normalize_strategy_plugin_definitions",
"normalize_strategy_plugin_mode",
"parse_strategy_plugin_mounts",
"resolve_runtime_target_from_env",
"should_alert_strategy_plugin_signal",
"translate_strategy_plugin_value",
"translator_uses_zh",
"validate_strategy_plugin_compatibility",
"validate_strategy_plugin_signal_payload",
]
148 changes: 143 additions & 5 deletions src/quant_platform_kit/common/strategy_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,66 @@
from pathlib import Path
from typing import Any, Callable

PLUGIN_CRISIS_RESPONSE_SHADOW = "crisis_response_shadow"
PLUGIN_MODE_SHADOW = "shadow"
STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE = "google_voice"
SUPPORTED_STRATEGY_PLUGIN_MODES = frozenset({PLUGIN_MODE_SHADOW})
DEFAULT_PLUGIN_ARTIFACT_CACHE_DIR = Path(tempfile.gettempdir()) / "quant_strategy_plugin_artifacts"
STRATEGY_PLUGIN_NON_ALERT_ROUTES = frozenset({"no_action"})
STRATEGY_PLUGIN_ALERT_ACTIONS = frozenset({"defend", "blocked"})
CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES = frozenset(
{
"tqqq_growth_income",
"soxl_soxx_trend_income",
}
)


@dataclass(frozen=True)
class StrategyPluginDefinition:
plugin: str
supported_strategies: frozenset[str] | None = None
supported_modes: frozenset[str] = field(default_factory=lambda: SUPPORTED_STRATEGY_PLUGIN_MODES)
alert_channels: tuple[str, ...] = ()

def normalized(self) -> "StrategyPluginDefinition":
plugin = _required_string(self.plugin, field_name="plugin")
supported_strategies = (
frozenset(_required_string(strategy, field_name="supported_strategy") for strategy in self.supported_strategies)
if self.supported_strategies is not None
else None
)
supported_modes = frozenset(
normalize_strategy_plugin_mode(mode, field_name="supported_mode")
for mode in self.supported_modes
)
if not supported_modes:
raise ValueError(f"strategy plugin definition for {plugin} must include at least one supported mode")
alert_channels = tuple(
_required_string(channel, field_name="alert_channel")
for channel in self.alert_channels
)
return StrategyPluginDefinition(
plugin=plugin,
supported_strategies=supported_strategies,
supported_modes=supported_modes,
alert_channels=alert_channels,
)

def supports_strategy(self, strategy: str) -> bool:
if self.supported_strategies is None:
return True
return strategy in self.supported_strategies


DEFAULT_STRATEGY_PLUGIN_DEFINITIONS: Mapping[str, StrategyPluginDefinition] = {
PLUGIN_CRISIS_RESPONSE_SHADOW: StrategyPluginDefinition(
plugin=PLUGIN_CRISIS_RESPONSE_SHADOW,
supported_strategies=CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES,
supported_modes=SUPPORTED_STRATEGY_PLUGIN_MODES,
alert_channels=(STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE,),
)
}


@dataclass(frozen=True)
Expand Down Expand Up @@ -77,11 +132,66 @@ def normalize_strategy_plugin_mode(value: Any, *, field_name: str = "mode") -> s
return mode


def normalize_strategy_plugin_definitions(
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
) -> Mapping[str, StrategyPluginDefinition]:
raw_definitions = (
DEFAULT_STRATEGY_PLUGIN_DEFINITIONS.values()
if plugin_definitions is None
else plugin_definitions.values()
if isinstance(plugin_definitions, Mapping)
else plugin_definitions
)
definitions: dict[str, StrategyPluginDefinition] = {}
for definition in raw_definitions:
if not isinstance(definition, StrategyPluginDefinition):
raise TypeError("strategy plugin definitions must be StrategyPluginDefinition objects")
normalized = definition.normalized()
if normalized.plugin in definitions:
raise ValueError(f"duplicate strategy plugin definition: plugin={normalized.plugin}")
definitions[normalized.plugin] = normalized
return definitions


def validate_strategy_plugin_compatibility(
*,
strategy: str,
plugin: str,
mode: str | None = None,
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
source: str = "plugin",
) -> None:
strategy_name = _required_string(strategy, field_name="strategy")
plugin_name = _required_string(plugin, field_name="plugin")
definitions = normalize_strategy_plugin_definitions(plugin_definitions)
definition = definitions.get(plugin_name)
if definition is None:
return
if not definition.supports_strategy(strategy_name):
allowed = ", ".join(sorted(definition.supported_strategies or ())) or "any"
raise ValueError(
f"strategy plugin {plugin_name} does not support strategy {strategy_name} "
f"in {source}; supported strategies: {allowed}"
)
if mode is None:
return
mode_name = normalize_strategy_plugin_mode(mode, field_name="mode")
if mode_name not in definition.supported_modes:
allowed_modes = ", ".join(sorted(definition.supported_modes))
raise ValueError(
f"strategy plugin {plugin_name} does not support mode {mode_name} "
f"in {source}; supported modes: {allowed_modes}"
)


def parse_strategy_plugin_mounts(
raw_config: str | Sequence[Mapping[str, Any]] | Mapping[str, Any] | None,
*,
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
) -> tuple[StrategyPluginMountConfig, ...]:
if raw_config is None or raw_config == "":
return ()
definitions = normalize_strategy_plugin_definitions(plugin_definitions)
payload: Any
if isinstance(raw_config, str):
payload = json.loads(raw_config)
Expand Down Expand Up @@ -111,17 +221,25 @@ def parse_strategy_plugin_mounts(
raise ValueError(f"duplicate strategy plugin mount: strategy={strategy} plugin={plugin}")
seen.add(key)
expected_mode = item.get("expected_mode")
normalized_expected_mode = (
normalize_strategy_plugin_mode(expected_mode, field_name="expected_mode")
if expected_mode is not None
else None
)
validate_strategy_plugin_compatibility(
strategy=strategy,
plugin=plugin,
mode=normalized_expected_mode,
plugin_definitions=definitions,
source="mount",
)
mounts.append(
StrategyPluginMountConfig(
strategy=strategy,
plugin=plugin,
signal_path=signal_path,
enabled=_as_bool(item.get("enabled"), default=True),
expected_mode=(
normalize_strategy_plugin_mode(expected_mode, field_name="expected_mode")
if expected_mode is not None
else None
),
expected_mode=normalized_expected_mode,
)
)
return tuple(mounts)
Expand All @@ -132,21 +250,31 @@ def load_configured_strategy_plugin_signals(
*,
strategy_profile: str | None = None,
client_factory: Any = None,
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
) -> tuple[StrategyPluginSignal, ...]:
selected_strategy = _optional_string(strategy_profile)
definitions = normalize_strategy_plugin_definitions(plugin_definitions)
signals: list[StrategyPluginSignal] = []
for mount in mounts:
if not mount.enabled:
continue
if selected_strategy is not None and mount.strategy != selected_strategy:
continue
validate_strategy_plugin_compatibility(
strategy=mount.strategy,
plugin=mount.plugin,
mode=mount.expected_mode,
plugin_definitions=definitions,
source="mount",
)
signals.append(
load_strategy_plugin_signal(
mount.signal_path,
expected_strategy=mount.strategy,
expected_plugin=mount.plugin,
expected_mode=mount.expected_mode,
client_factory=client_factory,
plugin_definitions=definitions,
)
)
return tuple(signals)
Expand All @@ -159,6 +287,7 @@ def load_strategy_plugin_signal(
expected_plugin: str | None = None,
expected_mode: str | None = None,
client_factory: Any = None,
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
) -> StrategyPluginSignal:
local_path, metadata = _materialize_artifact_path(reference, client_factory=client_factory)
if not local_path.exists():
Expand All @@ -173,6 +302,7 @@ def load_strategy_plugin_signal(
expected_mode=expected_mode,
source_uri=metadata.get("source_uri"),
local_path=str(local_path),
plugin_definitions=plugin_definitions,
)


Expand All @@ -184,6 +314,7 @@ def validate_strategy_plugin_signal_payload(
expected_mode: str | None = None,
source_uri: str | None = None,
local_path: str | None = None,
plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None,
) -> StrategyPluginSignal:
strategy = _required_string(payload.get("strategy"), field_name="strategy")
plugin = _required_string(payload.get("plugin"), field_name="plugin")
Expand All @@ -210,6 +341,13 @@ def validate_strategy_plugin_signal_payload(
"strategy plugin artifact mode mismatch: "
f"expected {normalized_expected_mode}, got {effective_mode}"
)
validate_strategy_plugin_compatibility(
strategy=strategy,
plugin=plugin,
mode=effective_mode,
plugin_definitions=plugin_definitions,
source="artifact",
)

execution_controls = payload.get("execution_controls") or {}
if not isinstance(execution_controls, Mapping):
Expand Down
Loading