From 550b853234ce0036ddf932024f1eaae21bc9bc25 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 26 May 2026 16:32:57 +0800 Subject: [PATCH] Add QuantConnect connector framework --- README.md | 4 + README.zh-CN.md | 4 + docs/quantconnect.md | 92 +++++ pyproject.toml | 2 +- src/quant_platform_kit/__init__.py | 2 +- .../quantconnect/__init__.py | 31 ++ src/quant_platform_kit/quantconnect/client.py | 167 +++++++++ src/quant_platform_kit/quantconnect/models.py | 323 ++++++++++++++++++ tests/test_quantconnect.py | 190 +++++++++++ 9 files changed, 813 insertions(+), 2 deletions(-) create mode 100644 docs/quantconnect.md create mode 100644 src/quant_platform_kit/quantconnect/__init__.py create mode 100644 src/quant_platform_kit/quantconnect/client.py create mode 100644 src/quant_platform_kit/quantconnect/models.py create mode 100644 tests/test_quantconnect.py diff --git a/README.md b/README.md index 67c1638..77f4cc0 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ It contains: - common domain models and runtime target helpers - narrow ports for market data, portfolio snapshots, order execution, notifications, and state - reusable broker adapter utilities +- QuantConnect Cloud deployment helpers for hybrid hosted/self-hosted runtimes - strategy loading, strategy-plugin, and alert-message contracts - optional strategy-plugin alert channels for email, SMS, push, and Telegram providers - synthetic-data tests for public behavior @@ -60,10 +61,13 @@ src/quant_platform_kit/ binance/ schwab/ longbridge/ + quantconnect/ notifications/ tests/ ``` +See [docs/quantconnect.md](./docs/quantconnect.md) for the public QuantConnect connector contract and placeholder-only examples. + ## Development Run the public test suite: diff --git a/README.zh-CN.md b/README.zh-CN.md index aae02a7..f865016 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -13,6 +13,7 @@ - 通用领域模型和运行目标 helper - 市场数据、持仓快照、订单执行、通知、状态存储等窄接口 - 可复用的券商适配工具 +- 面向混合托管/自托管运行时的 QuantConnect Cloud 部署 helper - 策略加载、策略插件、告警消息契约 - 可选的策略插件 email、SMS、push 和 Telegram 告警通道 - 使用合成数据的公开测试 @@ -60,10 +61,13 @@ src/quant_platform_kit/ binance/ schwab/ longbridge/ + quantconnect/ notifications/ tests/ ``` +公开的 QuantConnect 连接器契约和仅含占位符的示例见 [docs/quantconnect.md](./docs/quantconnect.md)。 + ## 开发 运行公开测试: diff --git a/docs/quantconnect.md b/docs/quantconnect.md new file mode 100644 index 0000000..9c97d98 --- /dev/null +++ b/docs/quantconnect.md @@ -0,0 +1,92 @@ +# QuantConnect Connector + +`quant_platform_kit.quantconnect` contains small, dependency-free helpers for platform repositories that deploy a strategy to QuantConnect Cloud while keeping account wiring and secrets outside this public repository. + +The connector supports: + +- QuantConnect REST API authentication headers from a user id and API token +- live algorithm management calls for authenticate, create, read, list, stop, and liquidate +- QuantConnect live deployment payloads +- Interactive Brokers brokerage payloads for QuantConnect Cloud +- redacted payload helpers for logs and notifications + +It intentionally does not contain production account ids, usernames, passwords, node ids, or project ids. Private platform configuration should provide those values from Secret Manager, GitHub Actions secrets, or another deployment secret store. + +## Example + +```python +from quant_platform_kit.quantconnect import ( + InteractiveBrokersBrokerageSettings, + QuantConnectCredentials, + QuantConnectLiveConnector, + QuantConnectLiveDeployment, + QuantConnectRestClient, +) + +credentials = QuantConnectCredentials.from_env(env) +brokerage = InteractiveBrokersBrokerageSettings.from_env(env) + +deployment = QuantConnectLiveDeployment( + project_id=12345678, + compile_id="compile-id-from-quantconnect", + node_id="LN-node-id-from-quantconnect", + brokerage=brokerage, + data_providers={ + "InteractiveBrokersBrokerage": brokerage, + }, + parameters={ + "strategy_profile": "example_strategy", + "runtime_target": "quantconnect-cloud-slot-a", + }, +) + +client = QuantConnectRestClient(credentials=credentials) +connector = QuantConnectLiveConnector(client) + +# Use deployment.redacted_payload() for logs. Do not log deployment.to_payload(). +result = connector.deploy(deployment) +``` + +The default environment variable names are: + +```text +QUANTCONNECT_USER_ID +QUANTCONNECT_API_TOKEN +QUANTCONNECT_ORGANIZATION_ID + +QUANTCONNECT_IB_USER_NAME +QUANTCONNECT_IB_ACCOUNT +QUANTCONNECT_IB_PASSWORD +QUANTCONNECT_IB_WEEKLY_RESTART_UTC_TIME +QUANTCONNECT_IB_FINANCIAL_ADVISORS_GROUP_FILTER +``` + +For a hybrid deployment, keep the target routing in a private platform repository or deployment secret: + +```json +{ + "self_hosted": [ + { + "strategy_profile": "tqqq_growth_income", + "platform": "interactive_brokers", + "account_selector": ["U00000000"] + } + ], + "quantconnect_cloud": [ + { + "strategy_profile": "example_strategy", + "project_id": 12345678, + "node_id": "LN-placeholder", + "brokerage_secret": "qc-ibkr-slot-b" + } + ] +} +``` + +The public example above uses placeholders only. Real account mappings and brokerage credentials must stay in private runtime configuration. + +## References + +- QuantConnect Cloud API live management: +- QuantConnect create live algorithm API: +- Lean CLI cloud live deploy: diff --git a/pyproject.toml b/pyproject.toml index f0c517d..9321ffd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "quant-platform-kit" -version = "0.7.32" +version = "0.7.33" description = "Shared broker adapters, domain models, execution ports, and notification utilities for QuantStrategyLab strategies." readme = "README.md" requires-python = ">=3.9" diff --git a/src/quant_platform_kit/__init__.py b/src/quant_platform_kit/__init__.py index aaaf6a4..02a4eb5 100644 --- a/src/quant_platform_kit/__init__.py +++ b/src/quant_platform_kit/__init__.py @@ -4,7 +4,7 @@ used by older strategy repositories. """ -__version__ = "0.7.21" +__version__ = "0.7.33" from .common.models import ( ExecutionReport, diff --git a/src/quant_platform_kit/quantconnect/__init__.py b/src/quant_platform_kit/quantconnect/__init__.py new file mode 100644 index 0000000..6510d2d --- /dev/null +++ b/src/quant_platform_kit/quantconnect/__init__.py @@ -0,0 +1,31 @@ +"""QuantConnect cloud deployment helpers.""" + +from .client import ( + DEFAULT_QUANTCONNECT_API_BASE_URL, + QuantConnectApiError, + QuantConnectLiveConnector, + QuantConnectRestClient, +) +from .models import ( + BrokerageHolding, + CashAmount, + InteractiveBrokersBrokerageSettings, + QuantConnectCredentials, + QuantConnectLiveDeployment, + QuantConnectPaperBrokerageSettings, + redact_sensitive_payload, +) + +__all__ = [ + "DEFAULT_QUANTCONNECT_API_BASE_URL", + "BrokerageHolding", + "CashAmount", + "InteractiveBrokersBrokerageSettings", + "QuantConnectApiError", + "QuantConnectCredentials", + "QuantConnectLiveConnector", + "QuantConnectLiveDeployment", + "QuantConnectPaperBrokerageSettings", + "QuantConnectRestClient", + "redact_sensitive_payload", +] diff --git a/src/quant_platform_kit/quantconnect/client.py b/src/quant_platform_kit/quantconnect/client.py new file mode 100644 index 0000000..82a84a1 --- /dev/null +++ b/src/quant_platform_kit/quantconnect/client.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import json +import time +import urllib.error +import urllib.request +from collections.abc import Mapping +from dataclasses import dataclass, field +from typing import Any, Callable + +from .models import QuantConnectCredentials, QuantConnectLiveDeployment, redact_sensitive_payload + + +DEFAULT_QUANTCONNECT_API_BASE_URL = "https://www.quantconnect.com/api/v2" + + +class QuantConnectApiError(RuntimeError): + def __init__( + self, + message: str, + *, + status_code: int | None = None, + payload: Mapping[str, Any] | None = None, + ) -> None: + super().__init__(message) + self.status_code = status_code + self.payload = dict(payload or {}) + + +@dataclass(frozen=True) +class QuantConnectRestClient: + credentials: QuantConnectCredentials + api_base_url: str = DEFAULT_QUANTCONNECT_API_BASE_URL + timeout: float = 15.0 + opener: Any = None + clock: Callable[[], float] = field(default=time.time, repr=False) + + def authenticate(self) -> dict[str, Any]: + return self.post_json("/authenticate", {}) + + def create_live_algorithm(self, deployment: QuantConnectLiveDeployment | Mapping[str, Any]) -> dict[str, Any]: + payload = deployment.to_payload() if hasattr(deployment, "to_payload") else dict(deployment) + return self.post_json("/live/create", payload) + + def read_live_algorithm(self, *, project_id: int, deploy_id: str) -> dict[str, Any]: + return self.post_json( + "/live/read", + { + "projectId": int(project_id), + "deployId": str(deploy_id).strip(), + }, + ) + + def list_live_algorithms( + self, + *, + project_id: int | None = None, + status: str | None = None, + ) -> dict[str, Any]: + payload: dict[str, Any] = {} + if project_id is not None: + payload["projectId"] = int(project_id) + text_status = str(status or "").strip() + if text_status: + payload["status"] = text_status + return self.post_json("/live/list", payload) + + def stop_live_algorithm(self, *, project_id: int) -> dict[str, Any]: + return self.post_json("/live/update/stop", {"projectId": int(project_id)}) + + def liquidate_live_algorithm(self, *, project_id: int) -> dict[str, Any]: + return self.post_json("/live/update/liquidate", {"projectId": int(project_id)}) + + def post_json(self, path: str, payload: Mapping[str, Any] | None = None) -> dict[str, Any]: + request_payload = dict(payload or {}) + request = urllib.request.Request( + self._endpoint(path), + data=json.dumps(request_payload, ensure_ascii=False).encode("utf-8"), + headers={ + **self.credentials.build_auth_headers(clock=self.clock), + "Content-Type": "application/json", + }, + method="POST", + ) + + try: + with self._opener()(request, timeout=self.timeout) as response: + status_code = _response_status(response) + raw_body = response.read() + except urllib.error.HTTPError as exc: + status_code = int(exc.code) + raw_body = exc.read() + parsed_error = _parse_response_body(raw_body) + raise QuantConnectApiError( + f"QuantConnect API request failed with HTTP {status_code}", + status_code=status_code, + payload=redact_sensitive_payload(parsed_error), + ) from exc + + result = _parse_response_body(raw_body) + if status_code < 200 or status_code >= 300: + raise QuantConnectApiError( + f"QuantConnect API request failed with HTTP {status_code}", + status_code=status_code, + payload=redact_sensitive_payload(result), + ) + if result.get("success") is False: + errors = result.get("errors") + message = "QuantConnect API request failed" + if errors: + message = f"{message}: {errors}" + raise QuantConnectApiError( + message, + status_code=status_code, + payload=redact_sensitive_payload(result), + ) + return result + + def _endpoint(self, path: str) -> str: + base_url = str(self.api_base_url or DEFAULT_QUANTCONNECT_API_BASE_URL).rstrip("/") + endpoint_path = str(path or "").strip().lstrip("/") + if not endpoint_path: + raise ValueError("path must not be empty.") + return f"{base_url}/{endpoint_path}" + + def _opener(self) -> Any: + return self.opener or urllib.request.urlopen + + +@dataclass(frozen=True) +class QuantConnectLiveConnector: + client: QuantConnectRestClient + + def deploy(self, deployment: QuantConnectLiveDeployment) -> dict[str, Any]: + return self.client.create_live_algorithm(deployment) + + def running_deployments(self, *, project_id: int | None = None) -> tuple[dict[str, Any], ...]: + result = self.client.list_live_algorithms(project_id=project_id, status="Running") + live = result.get("live") or () + if not isinstance(live, list): + return () + return tuple(dict(item) for item in live if isinstance(item, Mapping)) + + def stop_project(self, *, project_id: int, liquidate: bool = False) -> dict[str, Any]: + if liquidate: + return self.client.liquidate_live_algorithm(project_id=project_id) + return self.client.stop_live_algorithm(project_id=project_id) + + +def _response_status(response: Any) -> int: + status = getattr(response, "status", None) + if status is None: + status = response.getcode() + return int(status) + + +def _parse_response_body(raw_body: bytes | str | None) -> dict[str, Any]: + if raw_body is None or raw_body == b"" or raw_body == "": + return {} + if isinstance(raw_body, bytes): + body_text = raw_body.decode("utf-8") + else: + body_text = raw_body + parsed = json.loads(body_text) + if not isinstance(parsed, dict): + raise QuantConnectApiError("QuantConnect API response must decode to an object.") + return parsed diff --git a/src/quant_platform_kit/quantconnect/models.py b/src/quant_platform_kit/quantconnect/models.py new file mode 100644 index 0000000..604e476 --- /dev/null +++ b/src/quant_platform_kit/quantconnect/models.py @@ -0,0 +1,323 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import time +from collections.abc import Mapping +from dataclasses import dataclass, field +from typing import Any, Callable + + +_SENSITIVE_KEY_PARTS = ("password", "token", "secret", "key") + + +def _required_text(value: object, field_name: str) -> str: + text = str(value or "").strip() + if not text: + raise ValueError(f"{field_name} must not be empty.") + return text + + +def _optional_text(value: object) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None + + +def _lookup(mapping: Mapping[str, object], *keys: str) -> object | None: + for key in keys: + value = mapping.get(key) + if value is not None and str(value).strip() != "": + return value + return None + + +def _payload_from(value: Any) -> dict[str, Any]: + if hasattr(value, "to_payload"): + payload = value.to_payload() + else: + payload = value + if not isinstance(payload, Mapping): + raise TypeError("QuantConnect payload values must be mappings or expose to_payload().") + return dict(payload) + + +def redact_sensitive_payload(payload: Mapping[str, Any]) -> dict[str, Any]: + """Return a copy with common credential fields replaced by a redaction marker.""" + + redacted: dict[str, Any] = {} + for key, value in payload.items(): + key_text = str(key) + lowered_key = key_text.lower() + if any(part in lowered_key for part in _SENSITIVE_KEY_PARTS): + redacted[key_text] = "***" + continue + if isinstance(value, Mapping): + redacted[key_text] = redact_sensitive_payload(value) + elif isinstance(value, list): + redacted[key_text] = [ + redact_sensitive_payload(item) if isinstance(item, Mapping) else item + for item in value + ] + else: + redacted[key_text] = value + return redacted + + +@dataclass(frozen=True) +class QuantConnectCredentials: + user_id: str + api_token: str = field(repr=False) + organization_id: str | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "user_id", _required_text(self.user_id, "user_id")) + object.__setattr__(self, "api_token", _required_text(self.api_token, "api_token")) + object.__setattr__( + self, + "organization_id", + _optional_text(self.organization_id), + ) + + @classmethod + def from_env( + cls, + env: Mapping[str, str | None], + *, + prefix: str = "QUANTCONNECT", + ) -> "QuantConnectCredentials": + normalized_prefix = _required_text(prefix, "prefix").upper() + return cls.from_mapping( + env, + user_id_key=f"{normalized_prefix}_USER_ID", + api_token_key=f"{normalized_prefix}_API_TOKEN", + organization_id_key=f"{normalized_prefix}_ORGANIZATION_ID", + ) + + @classmethod + def from_mapping( + cls, + values: Mapping[str, object], + *, + user_id_key: str = "user_id", + api_token_key: str = "api_token", + organization_id_key: str = "organization_id", + ) -> "QuantConnectCredentials": + user_id = _lookup(values, user_id_key, "userId", "user_id") + api_token = _lookup(values, api_token_key, "apiToken", "api_token") + organization_id = _lookup(values, organization_id_key, "organizationId", "organization_id") + return cls( + user_id=_required_text(user_id, user_id_key), + api_token=_required_text(api_token, api_token_key), + organization_id=_optional_text(organization_id), + ) + + @classmethod + def from_json_payload(cls, payload: str) -> "QuantConnectCredentials": + values = json.loads(payload) + if not isinstance(values, Mapping): + raise ValueError("QuantConnect credential payload must decode to an object.") + return cls.from_mapping(values) + + def build_auth_headers( + self, + *, + clock: Callable[[], float] = time.time, + ) -> dict[str, str]: + timestamp = str(int(clock())) + time_stamped_token = f"{self.api_token}:{timestamp}".encode("utf-8") + hashed_token = hashlib.sha256(time_stamped_token).hexdigest() + auth_payload = f"{self.user_id}:{hashed_token}".encode("utf-8") + authentication = base64.b64encode(auth_payload).decode("ascii") + return { + "Authorization": f"Basic {authentication}", + "Timestamp": timestamp, + } + + def redacted(self) -> dict[str, str | None]: + return { + "user_id": self.user_id, + "api_token": "***", + "organization_id": self.organization_id, + } + + +@dataclass(frozen=True) +class CashAmount: + amount: float + currency: str = "USD" + + def to_payload(self) -> dict[str, Any]: + return { + "amount": float(self.amount), + "currency": _required_text(self.currency, "currency"), + } + + +@dataclass(frozen=True) +class BrokerageHolding: + symbol_id: str + symbol: str + quantity: float + average_price: float + + def to_payload(self) -> dict[str, Any]: + return { + "symbolId": _required_text(self.symbol_id, "symbol_id"), + "symbol": _required_text(self.symbol, "symbol"), + "quantity": float(self.quantity), + "averagePrice": float(self.average_price), + } + + +@dataclass(frozen=True) +class QuantConnectPaperBrokerageSettings: + cash: tuple[CashAmount, ...] = (CashAmount(amount=100000.0, currency="USD"),) + holdings: tuple[BrokerageHolding, ...] = () + + def to_payload(self) -> dict[str, Any]: + return { + "id": "QuantConnectBrokerage", + "holdings": [holding.to_payload() for holding in self.holdings], + "cash": [cash_amount.to_payload() for cash_amount in self.cash], + } + + +@dataclass(frozen=True) +class InteractiveBrokersBrokerageSettings: + user_name: str + account: str + password: str = field(repr=False) + weekly_restart_utc_time: str = "09:30:00" + financial_advisors_group_filter: str | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "user_name", _required_text(self.user_name, "user_name")) + object.__setattr__(self, "account", _required_text(self.account, "account")) + object.__setattr__(self, "password", _required_text(self.password, "password")) + object.__setattr__( + self, + "weekly_restart_utc_time", + _required_text(self.weekly_restart_utc_time, "weekly_restart_utc_time"), + ) + object.__setattr__( + self, + "financial_advisors_group_filter", + _optional_text(self.financial_advisors_group_filter), + ) + + @classmethod + def from_env( + cls, + env: Mapping[str, str | None], + *, + prefix: str = "QUANTCONNECT_IB", + ) -> "InteractiveBrokersBrokerageSettings": + normalized_prefix = _required_text(prefix, "prefix").upper() + return cls.from_mapping( + env, + user_name_key=f"{normalized_prefix}_USER_NAME", + account_key=f"{normalized_prefix}_ACCOUNT", + password_key=f"{normalized_prefix}_PASSWORD", + weekly_restart_utc_time_key=f"{normalized_prefix}_WEEKLY_RESTART_UTC_TIME", + financial_advisors_group_filter_key=f"{normalized_prefix}_FINANCIAL_ADVISORS_GROUP_FILTER", + ) + + @classmethod + def from_mapping( + cls, + values: Mapping[str, object], + *, + user_name_key: str = "user_name", + account_key: str = "account", + password_key: str = "password", + weekly_restart_utc_time_key: str = "weekly_restart_utc_time", + financial_advisors_group_filter_key: str = "financial_advisors_group_filter", + ) -> "InteractiveBrokersBrokerageSettings": + return cls( + user_name=_required_text( + _lookup(values, user_name_key, "ib-user-name", "ib_user_name", "userName"), + user_name_key, + ), + account=_required_text( + _lookup(values, account_key, "ib-account", "ib_account", "account"), + account_key, + ), + password=_required_text( + _lookup(values, password_key, "ib-password", "ib_password", "password"), + password_key, + ), + weekly_restart_utc_time=_required_text( + _lookup( + values, + weekly_restart_utc_time_key, + "ib-weekly-restart-utc-time", + "ib_weekly_restart_utc_time", + "weeklyRestartUtcTime", + ), + weekly_restart_utc_time_key, + ), + financial_advisors_group_filter=_optional_text( + _lookup( + values, + financial_advisors_group_filter_key, + "ib-financial-advisors-group-filter", + "ib_financial_advisors_group_filter", + "financialAdvisorsGroupFilter", + ) + ), + ) + + @classmethod + def from_json_payload(cls, payload: str) -> "InteractiveBrokersBrokerageSettings": + values = json.loads(payload) + if not isinstance(values, Mapping): + raise ValueError("Interactive Brokers brokerage payload must decode to an object.") + return cls.from_mapping(values) + + def to_payload(self) -> dict[str, Any]: + payload = { + "id": "InteractiveBrokersBrokerage", + "ib-user-name": self.user_name, + "ib-account": self.account, + "ib-password": self.password, + "ib-weekly-restart-utc-time": self.weekly_restart_utc_time, + } + if self.financial_advisors_group_filter is not None: + payload["ib-financial-advisors-group-filter"] = self.financial_advisors_group_filter + return payload + + def redacted_payload(self) -> dict[str, Any]: + return redact_sensitive_payload(self.to_payload()) + + +@dataclass(frozen=True) +class QuantConnectLiveDeployment: + project_id: int + compile_id: str + node_id: str + brokerage: Any + version_id: str = "-1" + data_providers: Mapping[str, Any] = field(default_factory=dict) + parameters: Mapping[str, Any] = field(default_factory=dict) + notification: Mapping[str, Any] = field(default_factory=dict) + + def to_payload(self) -> dict[str, Any]: + return { + "versionId": _required_text(self.version_id, "version_id"), + "projectId": int(self.project_id), + "compileId": _required_text(self.compile_id, "compile_id"), + "nodeId": _required_text(self.node_id, "node_id"), + "brokerage": _payload_from(self.brokerage), + "dataProviders": { + str(name): _payload_from(settings) + for name, settings in self.data_providers.items() + }, + "parameters": dict(self.parameters), + "notification": dict(self.notification), + } + + def redacted_payload(self) -> dict[str, Any]: + return redact_sensitive_payload(self.to_payload()) diff --git a/tests/test_quantconnect.py b/tests/test_quantconnect.py new file mode 100644 index 0000000..6d18771 --- /dev/null +++ b/tests/test_quantconnect.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import unittest + +from quant_platform_kit.quantconnect import ( + BrokerageHolding, + CashAmount, + InteractiveBrokersBrokerageSettings, + QuantConnectApiError, + QuantConnectCredentials, + QuantConnectLiveConnector, + QuantConnectLiveDeployment, + QuantConnectPaperBrokerageSettings, + QuantConnectRestClient, +) + + +class _FakeResponse: + def __init__(self, payload: dict[str, object], *, status: int = 200) -> None: + self.payload = payload + self.status = status + + def __enter__(self): + return self + + def __exit__(self, *_args): + return False + + def read(self) -> bytes: + return json.dumps(self.payload).encode("utf-8") + + +class QuantConnectTests(unittest.TestCase): + def test_credentials_build_quantconnect_auth_headers(self) -> None: + credentials = QuantConnectCredentials(user_id="42", api_token="test-token") + headers = credentials.build_auth_headers(clock=lambda: 1234567890) + + hashed_token = hashlib.sha256(b"test-token:1234567890").hexdigest() + expected_auth = base64.b64encode(f"42:{hashed_token}".encode("utf-8")).decode("ascii") + self.assertEqual(headers["Timestamp"], "1234567890") + self.assertEqual(headers["Authorization"], f"Basic {expected_auth}") + + def test_credentials_load_from_env_style_mapping(self) -> None: + credentials = QuantConnectCredentials.from_env( + { + "QUANTCONNECT_USER_ID": "42", + "QUANTCONNECT_API_TOKEN": "test-token", + "QUANTCONNECT_ORGANIZATION_ID": "org-1", + } + ) + + self.assertEqual(credentials.user_id, "42") + self.assertEqual(credentials.organization_id, "org-1") + self.assertEqual(credentials.redacted()["api_token"], "***") + + def test_interactive_brokers_brokerage_payload_uses_quantconnect_keys(self) -> None: + settings = InteractiveBrokersBrokerageSettings( + user_name="ib-user", + account="U00000000", + password="ib-password", + weekly_restart_utc_time="08:30:00", + ) + + self.assertEqual( + settings.to_payload(), + { + "id": "InteractiveBrokersBrokerage", + "ib-user-name": "ib-user", + "ib-account": "U00000000", + "ib-password": "ib-password", + "ib-weekly-restart-utc-time": "08:30:00", + }, + ) + self.assertEqual(settings.redacted_payload()["ib-password"], "***") + + def test_live_deployment_builds_create_payload(self) -> None: + deployment = QuantConnectLiveDeployment( + project_id=123, + compile_id="compile-1", + node_id="LN-1", + brokerage=QuantConnectPaperBrokerageSettings( + cash=(CashAmount(amount=25000.0, currency="USD"),), + holdings=( + BrokerageHolding( + symbol_id="SPY R735QTJ8XC9X", + symbol="SPY", + quantity=1, + average_price=500, + ), + ), + ), + data_providers={ + "QuantConnectBrokerage": { + "id": "QuantConnectBrokerage", + }, + }, + parameters={"strategy": "tqqq_growth_income"}, + ) + + payload = deployment.to_payload() + + self.assertEqual(payload["versionId"], "-1") + self.assertEqual(payload["projectId"], 123) + self.assertEqual(payload["compileId"], "compile-1") + self.assertEqual(payload["nodeId"], "LN-1") + self.assertEqual(payload["brokerage"]["id"], "QuantConnectBrokerage") + self.assertEqual(payload["brokerage"]["cash"][0]["amount"], 25000.0) + self.assertEqual(payload["parameters"]["strategy"], "tqqq_growth_income") + + def test_rest_client_posts_authenticated_json(self) -> None: + requests = [] + + def opener(request, timeout): + requests.append((request, timeout)) + return _FakeResponse({"success": True, "deployId": "L-1"}) + + client = QuantConnectRestClient( + credentials=QuantConnectCredentials(user_id="42", api_token="test-token"), + api_base_url="https://qc.example.test/api/v2/", + timeout=3.0, + opener=opener, + clock=lambda: 1234567890, + ) + + result = client.create_live_algorithm( + { + "projectId": 123, + "compileId": "compile-1", + "nodeId": "LN-1", + "versionId": "-1", + "brokerage": {"id": "QuantConnectBrokerage"}, + } + ) + + self.assertEqual(result["deployId"], "L-1") + request, timeout = requests[0] + self.assertEqual(timeout, 3.0) + self.assertEqual(request.full_url, "https://qc.example.test/api/v2/live/create") + self.assertEqual(request.method, "POST") + self.assertIn("Authorization", request.headers) + self.assertEqual(json.loads(request.data.decode("utf-8"))["projectId"], 123) + + def test_rest_client_raises_for_unsuccessful_api_response(self) -> None: + def opener(_request, timeout=None): + del timeout + return _FakeResponse({"success": False, "errors": ["invalid credentials"]}) + + client = QuantConnectRestClient( + credentials=QuantConnectCredentials(user_id="42", api_token="test-token"), + opener=opener, + clock=lambda: 1234567890, + ) + + with self.assertRaises(QuantConnectApiError) as context: + client.authenticate() + + self.assertIn("invalid credentials", str(context.exception)) + + def test_live_connector_filters_running_deployments(self) -> None: + def opener(_request, timeout=None): + del timeout + return _FakeResponse( + { + "success": True, + "live": [ + {"projectId": 123, "deployId": "L-1", "status": "Running"}, + "malformed", + ], + } + ) + + connector = QuantConnectLiveConnector( + QuantConnectRestClient( + credentials=QuantConnectCredentials(user_id="42", api_token="test-token"), + opener=opener, + clock=lambda: 1234567890, + ) + ) + + self.assertEqual( + connector.running_deployments(project_id=123), + ({"projectId": 123, "deployId": "L-1", "status": "Running"},), + ) + + +if __name__ == "__main__": + unittest.main()