diff --git a/docs/strategy_plugin_runtime_contract.md b/docs/strategy_plugin_runtime_contract.md index 75712d2..d85dc73 100644 --- a/docs/strategy_plugin_runtime_contract.md +++ b/docs/strategy_plugin_runtime_contract.md @@ -57,6 +57,23 @@ Do not put `mode` in the platform mount config. `expected_mode` may be used only as a fail-closed guard and should be `shadow` when present. Artifacts declaring `paper`, `advisory`, or `live` are rejected. +## Plugin Definitions + +The shared kit owns plugin compatibility through a registry-style +`StrategyPluginDefinition`. Platform repos should not hard-code which strategies +a plugin supports; they should call the shared parser/loader and let it reject +unsupported mounts or artifacts. + +The default registry currently defines: + +| Plugin | Supported strategies | Supported mode | Escalated alert channel | +| --- | --- | --- | --- | +| `crisis_response_shadow` | `tqqq_growth_income`, `soxl_soxx_trend_income` | `shadow` | `google_voice` | + +To expand a plugin later, update the shared definition or pass an explicit +definition registry into the parser/loader. This keeps future plugin eligibility +changes out of platform runtime code. + ## Runtime Loader Use `quant_platform_kit.common.strategy_plugins`: diff --git a/src/quant_platform_kit/common/__init__.py b/src/quant_platform_kit/common/__init__.py index f81982d..8ba3f38 100644 --- a/src/quant_platform_kit/common/__init__.py +++ b/src/quant_platform_kit/common/__init__.py @@ -39,11 +39,16 @@ resolve_runtime_target_from_env, ) from .strategy_plugins import ( + CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES, + DEFAULT_STRATEGY_PLUGIN_DEFINITIONS, + PLUGIN_CRISIS_RESPONSE_SHADOW, PLUGIN_MODE_SHADOW, + STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE, STRATEGY_PLUGIN_ALERT_ACTIONS, STRATEGY_PLUGIN_NON_ALERT_ROUTES, SUPPORTED_STRATEGY_PLUGIN_MODES, StrategyPluginAlertMessage, + StrategyPluginDefinition, StrategyPluginMountConfig, StrategyPluginSignal, build_strategy_plugin_alert_key, @@ -52,18 +57,23 @@ build_strategy_plugin_report_payload, load_configured_strategy_plugin_signals, load_strategy_plugin_signal, + normalize_strategy_plugin_definitions, normalize_strategy_plugin_mode, parse_strategy_plugin_mounts, should_alert_strategy_plugin_signal, translate_strategy_plugin_value, + validate_strategy_plugin_compatibility, validate_strategy_plugin_signal_payload, ) __all__ = [ "COMMON_ZH_NOTIFICATION_REPLACEMENTS", + "CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES", "DEFAULT_EXECUTION_BLOCKING_SKIP_REASONS", + "DEFAULT_STRATEGY_PLUGIN_DEFINITIONS", "DEFAULT_TERMINAL_FUNDING_BLOCK_SKIP_REASONS", "DEFAULT_TERMINAL_STRATEGY_RUN_STAGES", + "PLUGIN_CRISIS_RESPONSE_SHADOW", "PLUGIN_MODE_SHADOW", "STAGE_COMPLETED", "STAGE_DRY_RUN_COMPLETED", @@ -74,6 +84,7 @@ "STAGE_PARTIAL_SUBMITTED", "STAGE_RECONCILED", "STAGE_SUBMITTED", + "STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE", "STRATEGY_PLUGIN_ALERT_ACTIONS", "STRATEGY_PLUGIN_NON_ALERT_ROUTES", "SUPPORTED_STRATEGY_PLUGIN_MODES", @@ -93,6 +104,7 @@ "RuntimeAssembly", "build_runtime_assembly", "StrategyPluginAlertMessage", + "StrategyPluginDefinition", "StrategyPluginMountConfig", "StrategyPluginSignal", "build_strategy_plugin_alert_key", @@ -102,11 +114,13 @@ "build_runtime_target", "load_configured_strategy_plugin_signals", "load_strategy_plugin_signal", + "normalize_strategy_plugin_definitions", "normalize_strategy_plugin_mode", "parse_strategy_plugin_mounts", "resolve_runtime_target_from_env", "should_alert_strategy_plugin_signal", "translate_strategy_plugin_value", "translator_uses_zh", + "validate_strategy_plugin_compatibility", "validate_strategy_plugin_signal_payload", ] diff --git a/src/quant_platform_kit/common/strategy_plugins.py b/src/quant_platform_kit/common/strategy_plugins.py index 74ef8be..4913e20 100644 --- a/src/quant_platform_kit/common/strategy_plugins.py +++ b/src/quant_platform_kit/common/strategy_plugins.py @@ -10,11 +10,66 @@ from pathlib import Path from typing import Any, Callable +PLUGIN_CRISIS_RESPONSE_SHADOW = "crisis_response_shadow" PLUGIN_MODE_SHADOW = "shadow" +STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE = "google_voice" SUPPORTED_STRATEGY_PLUGIN_MODES = frozenset({PLUGIN_MODE_SHADOW}) DEFAULT_PLUGIN_ARTIFACT_CACHE_DIR = Path(tempfile.gettempdir()) / "quant_strategy_plugin_artifacts" STRATEGY_PLUGIN_NON_ALERT_ROUTES = frozenset({"no_action"}) STRATEGY_PLUGIN_ALERT_ACTIONS = frozenset({"defend", "blocked"}) +CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES = frozenset( + { + "tqqq_growth_income", + "soxl_soxx_trend_income", + } +) + + +@dataclass(frozen=True) +class StrategyPluginDefinition: + plugin: str + supported_strategies: frozenset[str] | None = None + supported_modes: frozenset[str] = field(default_factory=lambda: SUPPORTED_STRATEGY_PLUGIN_MODES) + alert_channels: tuple[str, ...] = () + + def normalized(self) -> "StrategyPluginDefinition": + plugin = _required_string(self.plugin, field_name="plugin") + supported_strategies = ( + frozenset(_required_string(strategy, field_name="supported_strategy") for strategy in self.supported_strategies) + if self.supported_strategies is not None + else None + ) + supported_modes = frozenset( + normalize_strategy_plugin_mode(mode, field_name="supported_mode") + for mode in self.supported_modes + ) + if not supported_modes: + raise ValueError(f"strategy plugin definition for {plugin} must include at least one supported mode") + alert_channels = tuple( + _required_string(channel, field_name="alert_channel") + for channel in self.alert_channels + ) + return StrategyPluginDefinition( + plugin=plugin, + supported_strategies=supported_strategies, + supported_modes=supported_modes, + alert_channels=alert_channels, + ) + + def supports_strategy(self, strategy: str) -> bool: + if self.supported_strategies is None: + return True + return strategy in self.supported_strategies + + +DEFAULT_STRATEGY_PLUGIN_DEFINITIONS: Mapping[str, StrategyPluginDefinition] = { + PLUGIN_CRISIS_RESPONSE_SHADOW: StrategyPluginDefinition( + plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, + supported_strategies=CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES, + supported_modes=SUPPORTED_STRATEGY_PLUGIN_MODES, + alert_channels=(STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE,), + ) +} @dataclass(frozen=True) @@ -77,11 +132,66 @@ def normalize_strategy_plugin_mode(value: Any, *, field_name: str = "mode") -> s return mode +def normalize_strategy_plugin_definitions( + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, +) -> Mapping[str, StrategyPluginDefinition]: + raw_definitions = ( + DEFAULT_STRATEGY_PLUGIN_DEFINITIONS.values() + if plugin_definitions is None + else plugin_definitions.values() + if isinstance(plugin_definitions, Mapping) + else plugin_definitions + ) + definitions: dict[str, StrategyPluginDefinition] = {} + for definition in raw_definitions: + if not isinstance(definition, StrategyPluginDefinition): + raise TypeError("strategy plugin definitions must be StrategyPluginDefinition objects") + normalized = definition.normalized() + if normalized.plugin in definitions: + raise ValueError(f"duplicate strategy plugin definition: plugin={normalized.plugin}") + definitions[normalized.plugin] = normalized + return definitions + + +def validate_strategy_plugin_compatibility( + *, + strategy: str, + plugin: str, + mode: str | None = None, + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, + source: str = "plugin", +) -> None: + strategy_name = _required_string(strategy, field_name="strategy") + plugin_name = _required_string(plugin, field_name="plugin") + definitions = normalize_strategy_plugin_definitions(plugin_definitions) + definition = definitions.get(plugin_name) + if definition is None: + return + if not definition.supports_strategy(strategy_name): + allowed = ", ".join(sorted(definition.supported_strategies or ())) or "any" + raise ValueError( + f"strategy plugin {plugin_name} does not support strategy {strategy_name} " + f"in {source}; supported strategies: {allowed}" + ) + if mode is None: + return + mode_name = normalize_strategy_plugin_mode(mode, field_name="mode") + if mode_name not in definition.supported_modes: + allowed_modes = ", ".join(sorted(definition.supported_modes)) + raise ValueError( + f"strategy plugin {plugin_name} does not support mode {mode_name} " + f"in {source}; supported modes: {allowed_modes}" + ) + + def parse_strategy_plugin_mounts( raw_config: str | Sequence[Mapping[str, Any]] | Mapping[str, Any] | None, + *, + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, ) -> tuple[StrategyPluginMountConfig, ...]: if raw_config is None or raw_config == "": return () + definitions = normalize_strategy_plugin_definitions(plugin_definitions) payload: Any if isinstance(raw_config, str): payload = json.loads(raw_config) @@ -111,17 +221,25 @@ def parse_strategy_plugin_mounts( raise ValueError(f"duplicate strategy plugin mount: strategy={strategy} plugin={plugin}") seen.add(key) expected_mode = item.get("expected_mode") + normalized_expected_mode = ( + normalize_strategy_plugin_mode(expected_mode, field_name="expected_mode") + if expected_mode is not None + else None + ) + validate_strategy_plugin_compatibility( + strategy=strategy, + plugin=plugin, + mode=normalized_expected_mode, + plugin_definitions=definitions, + source="mount", + ) mounts.append( StrategyPluginMountConfig( strategy=strategy, plugin=plugin, signal_path=signal_path, enabled=_as_bool(item.get("enabled"), default=True), - expected_mode=( - normalize_strategy_plugin_mode(expected_mode, field_name="expected_mode") - if expected_mode is not None - else None - ), + expected_mode=normalized_expected_mode, ) ) return tuple(mounts) @@ -132,14 +250,23 @@ def load_configured_strategy_plugin_signals( *, strategy_profile: str | None = None, client_factory: Any = None, + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, ) -> tuple[StrategyPluginSignal, ...]: selected_strategy = _optional_string(strategy_profile) + definitions = normalize_strategy_plugin_definitions(plugin_definitions) signals: list[StrategyPluginSignal] = [] for mount in mounts: if not mount.enabled: continue if selected_strategy is not None and mount.strategy != selected_strategy: continue + validate_strategy_plugin_compatibility( + strategy=mount.strategy, + plugin=mount.plugin, + mode=mount.expected_mode, + plugin_definitions=definitions, + source="mount", + ) signals.append( load_strategy_plugin_signal( mount.signal_path, @@ -147,6 +274,7 @@ def load_configured_strategy_plugin_signals( expected_plugin=mount.plugin, expected_mode=mount.expected_mode, client_factory=client_factory, + plugin_definitions=definitions, ) ) return tuple(signals) @@ -159,6 +287,7 @@ def load_strategy_plugin_signal( expected_plugin: str | None = None, expected_mode: str | None = None, client_factory: Any = None, + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, ) -> StrategyPluginSignal: local_path, metadata = _materialize_artifact_path(reference, client_factory=client_factory) if not local_path.exists(): @@ -173,6 +302,7 @@ def load_strategy_plugin_signal( expected_mode=expected_mode, source_uri=metadata.get("source_uri"), local_path=str(local_path), + plugin_definitions=plugin_definitions, ) @@ -184,6 +314,7 @@ def validate_strategy_plugin_signal_payload( expected_mode: str | None = None, source_uri: str | None = None, local_path: str | None = None, + plugin_definitions: Mapping[str, StrategyPluginDefinition] | Sequence[StrategyPluginDefinition] | None = None, ) -> StrategyPluginSignal: strategy = _required_string(payload.get("strategy"), field_name="strategy") plugin = _required_string(payload.get("plugin"), field_name="plugin") @@ -210,6 +341,13 @@ def validate_strategy_plugin_signal_payload( "strategy plugin artifact mode mismatch: " f"expected {normalized_expected_mode}, got {effective_mode}" ) + validate_strategy_plugin_compatibility( + strategy=strategy, + plugin=plugin, + mode=effective_mode, + plugin_definitions=plugin_definitions, + source="artifact", + ) execution_controls = payload.get("execution_controls") or {} if not isinstance(execution_controls, Mapping): diff --git a/tests/test_strategy_plugins.py b/tests/test_strategy_plugins.py index 190820e..43a6fed 100644 --- a/tests/test_strategy_plugins.py +++ b/tests/test_strategy_plugins.py @@ -4,7 +4,12 @@ from pathlib import Path from quant_platform_kit.common.strategy_plugins import ( + CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES, + DEFAULT_STRATEGY_PLUGIN_DEFINITIONS, + PLUGIN_CRISIS_RESPONSE_SHADOW, PLUGIN_MODE_SHADOW, + STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE, + StrategyPluginDefinition, build_strategy_plugin_alert_messages, build_strategy_plugin_notification_lines, build_strategy_plugin_report_payload, @@ -12,6 +17,7 @@ load_strategy_plugin_signal, parse_strategy_plugin_mounts, should_alert_strategy_plugin_signal, + validate_strategy_plugin_compatibility, validate_strategy_plugin_signal_payload, ) @@ -78,6 +84,58 @@ def test_parse_strategy_plugin_mounts_rejects_platform_mode_selection(self): with self.assertRaisesRegex(ValueError, "must not set mode"): parse_strategy_plugin_mounts(raw) + def test_default_plugin_definition_limits_crisis_response_to_supported_strategies(self): + definition = DEFAULT_STRATEGY_PLUGIN_DEFINITIONS[PLUGIN_CRISIS_RESPONSE_SHADOW] + + self.assertEqual(definition.supported_strategies, CRISIS_RESPONSE_SHADOW_SUPPORTED_STRATEGIES) + self.assertEqual(definition.alert_channels, (STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE,)) + validate_strategy_plugin_compatibility( + strategy="tqqq_growth_income", + plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, + mode=PLUGIN_MODE_SHADOW, + ) + validate_strategy_plugin_compatibility( + strategy="soxl_soxx_trend_income", + plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, + mode=PLUGIN_MODE_SHADOW, + ) + + def test_parse_strategy_plugin_mounts_rejects_unsupported_crisis_response_strategy(self): + raw = [ + { + "strategy": "global_etf_rotation", + "plugin": PLUGIN_CRISIS_RESPONSE_SHADOW, + "signal_path": "gs://bucket/latest_signal.json", + } + ] + + with self.assertRaisesRegex( + ValueError, + "crisis_response_shadow does not support strategy global_etf_rotation", + ): + parse_strategy_plugin_mounts(raw) + + def test_plugin_definition_can_extend_future_strategy_support(self): + raw = [ + { + "strategy": "global_etf_rotation", + "plugin": PLUGIN_CRISIS_RESPONSE_SHADOW, + "signal_path": "gs://bucket/latest_signal.json", + } + ] + definitions = { + PLUGIN_CRISIS_RESPONSE_SHADOW: StrategyPluginDefinition( + plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, + supported_strategies=frozenset({"global_etf_rotation"}), + supported_modes=frozenset({PLUGIN_MODE_SHADOW}), + alert_channels=(STRATEGY_PLUGIN_ALERT_CHANNEL_GOOGLE_VOICE,), + ) + } + + mounts = parse_strategy_plugin_mounts(raw, plugin_definitions=definitions) + + self.assertEqual(mounts[0].strategy, "global_etf_rotation") + def test_load_strategy_plugin_signal_validates_identity_and_mode(self): with tempfile.TemporaryDirectory() as tmp_dir: signal_path = Path(tmp_dir) / "latest_signal.json" @@ -124,7 +182,7 @@ def test_load_configured_strategy_plugin_signals_filters_strategy_and_disabled_m "signal_path": str(signal_path), }, { - "strategy": "soxl_growth_income", + "strategy": "soxl_soxx_trend_income", "plugin": "crisis_response_shadow", "signal_path": str(root / "missing.json"), }, @@ -155,6 +213,15 @@ def test_validate_strategy_plugin_signal_payload_rejects_non_shadow_expected_mod expected_mode="live", ) + def test_validate_strategy_plugin_signal_payload_rejects_unsupported_crisis_response_strategy(self): + with self.assertRaisesRegex( + ValueError, + "crisis_response_shadow does not support strategy global_etf_rotation", + ): + validate_strategy_plugin_signal_payload( + _signal_payload(strategy="global_etf_rotation"), + ) + def test_build_strategy_plugin_report_payload_uses_compact_summary(self): with tempfile.TemporaryDirectory() as tmp_dir: signal_path = Path(tmp_dir) / "latest_signal.json"