From 8c67d2a5a48128d163cfb32889499ffc0123a25f Mon Sep 17 00:00:00 2001 From: Rohan Santhosh Kumar <181558744+Rohan5commit@users.noreply.github.com> Date: Tue, 19 May 2026 18:43:19 +0800 Subject: [PATCH] Add AI config validation and runtime decision visibility --- .github/workflows/daily_trading_bot.yml | 331 +++++++++++++++++++--- email_notifier.py | 89 +++--- ingest_prices.py | 34 ++- llm_trader.py | 52 +++- main.py | 38 ++- positions.py | 12 + quant_platform/scripts/plan_ai_runtime.py | 7 +- trained_model_client.py | 27 +- 8 files changed, 477 insertions(+), 113 deletions(-) diff --git a/.github/workflows/daily_trading_bot.yml b/.github/workflows/daily_trading_bot.yml index 1c8b6b9..795f7ee 100644 --- a/.github/workflows/daily_trading_bot.yml +++ b/.github/workflows/daily_trading_bot.yml @@ -20,6 +20,11 @@ on: required: false default: false type: boolean + disable_core_trading: + description: "Run only the AI bot and skip Core trading" + required: false + default: false + type: boolean reset_ai_positions: description: "Clear only AI positions before the manual run so the trained model must generate fresh trades" required: false @@ -136,7 +141,7 @@ jobs: - name: Run Trading Bot id: run_core_bot - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && !inputs.disable_core_trading }} continue-on-error: true timeout-minutes: 45 env: @@ -154,7 +159,7 @@ jobs: python main.py daily_job - name: Send Core Failure Report - if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && steps.run_core_bot.outcome == 'failure' }} + if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && steps.run_core_bot.outcome == 'failure' && steps.run_core_bot_retry.outcome != 'success' }} continue-on-error: true env: SMTP_SERVER: ${{ secrets.SMTP_SERVER }} @@ -169,6 +174,25 @@ jobs: --source "Run Trading Bot" \ --message "Core daily job failed before sending its report email. Run: ${RUN_URL}" + - name: Run Trading Bot (Core Retry) + id: run_core_bot_retry + if: ${{ always() && steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && !inputs.disable_core_trading && steps.run_core_bot.outcome == 'failure' }} + continue-on-error: true + timeout-minutes: 45 + env: + SMTP_SERVER: ${{ secrets.SMTP_SERVER }} + SMTP_PORT: ${{ secrets.SMTP_PORT }} + SENDER_EMAIL: ${{ secrets.SENDER_EMAIL }} + SENDER_PASSWORD: ${{ secrets.SENDER_PASSWORD }} + RECIPIENT_EMAIL: ${{ secrets.RECIPIENT_EMAIL }} + TRAINED_MODEL_API_KEY: ${{ secrets.TRAINED_MODEL_API_KEY }} + TWELVEDATA_API_KEYS: ${{ secrets.TWELVEDATA_API_KEYS }} + ARM_LIVE_TRADING: ${{ secrets.ARM_LIVE_TRADING }} + LOG_LEVEL: ${{ secrets.LOG_LEVEL }} + DISABLE_AI_TRADING: "1" + run: | + python main.py daily_job + - name: Install Lightning dependencies id: install_lightning_deps if: ${{ steps.install_base_dependencies.outcome == 'success' && ((github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) || (github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) }} @@ -182,40 +206,72 @@ jobs: if: ${{ !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) }} run: | mkdir -p results - if [ "${{ steps.install_lightning_deps.outcome }}" != "success" ]; then - if [ -n "${CEREBRIUM_INFERENCE_URL}" ]; then - runtime_mode="cerebrium_full" - selected_backend="cerebrium_full" - reason="cerebrium_primary_lightning_dependencies_unavailable" - else - runtime_mode="distilled_local" - selected_backend="distilled_local" - reason="lightning_dependencies_unavailable" - fi - { - echo "runtime_mode=${runtime_mode}" - echo "selected_backend=${selected_backend}" - echo "selected_compute_name=" - echo "selected_disk_gb=0" - echo "reason=${reason}" - } >> "$GITHUB_OUTPUT" - python -c 'import json, os, pathlib; payload={"runtime_mode": os.environ["runtime_mode"], "selected_backend": os.environ["selected_backend"], "selected_compute_name": "", "selected_disk_gb": 0, "reason": os.environ["reason"], "preflight": {"install_lightning_deps_outcome": "failure"}}; pathlib.Path("results/ai_runtime_plan.json").write_text(json.dumps(payload, indent=2)+"\n", encoding="utf-8"); print(json.dumps(payload, indent=2))' - cat results/ai_runtime_plan.json + configured_inference_url="${CEREBRIUM_INFERENCE_URL}" + if [ -z "${configured_inference_url}" ]; then + configured_inference_url="${CEREBRIUM_TRAINED_MODEL_URL}" + fi + if [ -z "${configured_inference_url}" ]; then + configured_inference_url="${TRAINED_MODEL_INFERENCE_URL}" + fi + if [ -n "${configured_inference_url}" ]; then + runtime_mode="cerebrium_full" + selected_backend="cerebrium_full" + reason="cerebrium_primary_configured" + else + runtime_mode="distilled_local" + selected_backend="distilled_local" + reason="cerebrium_inference_url_missing" + fi + { + echo "runtime_mode=${runtime_mode}" + echo "selected_backend=${selected_backend}" + echo "selected_compute_name=" + echo "selected_disk_gb=0" + echo "reason=${reason}" + } >> "$GITHUB_OUTPUT" + python -c 'import json, os, pathlib; payload={"runtime_mode": os.environ["runtime_mode"], "selected_backend": os.environ["selected_backend"], "selected_compute_name": "", "selected_disk_gb": 0, "reason": os.environ["reason"], "preflight": {"router_policy": "cerebrium_primary_then_distilled"}}; pathlib.Path("results/ai_runtime_plan.json").write_text(json.dumps(payload, indent=2)+"\n", encoding="utf-8"); print(json.dumps(payload, indent=2))' + cat results/ai_runtime_plan.json + + + - name: Validate Cerebrium primary configuration + id: validate_cerebrium_config + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) }} + run: | + configured_inference_url="${CEREBRIUM_INFERENCE_URL}" + if [ -z "${configured_inference_url}" ]; then + configured_inference_url="${CEREBRIUM_TRAINED_MODEL_URL}" + fi + if [ -z "${configured_inference_url}" ]; then + configured_inference_url="${TRAINED_MODEL_INFERENCE_URL}" + fi + if [ -n "${configured_inference_url}" ]; then + echo "has_inference_url=true" >> "$GITHUB_OUTPUT" + echo "inference_url_source=configured" else - python quant_platform/scripts/plan_ai_runtime.py \ - --json-out results/ai_runtime_plan.json \ - --github-output "$GITHUB_OUTPUT" + echo "has_inference_url=false" >> "$GITHUB_OUTPUT" + echo "inference_url_source=missing" + echo "AI primary inference URL is missing across CEREBRIUM_INFERENCE_URL/CEREBRIUM_TRAINED_MODEL_URL/TRAINED_MODEL_INFERENCE_URL" >&2 fi + - name: Emit AI runtime decision + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) }} + run: | + echo "AI runtime mode: ${{ steps.plan_ai_runtime.outputs.runtime_mode }}" + echo "AI backend: ${{ steps.plan_ai_runtime.outputs.selected_backend }}" + echo "AI reason: ${{ steps.plan_ai_runtime.outputs.reason }}" + echo "AI inference URL configured: ${{ steps.validate_cerebrium_config.outputs.has_inference_url }}" + - name: Warm Cerebrium inference app id: warm_cerebrium_inference - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' }} continue-on-error: true timeout-minutes: 45 env: AI_PRIMARY_BACKEND: "cerebrium" CEREBRIUM_INFERENCE_URL: ${{ secrets.CEREBRIUM_INFERENCE_URL }} CEREBRIUM_API_KEY: ${{ secrets.CEREBRIUM_API_KEY || secrets.TRAINED_MODEL_API_KEY }} + TRAINED_MODEL_MAX_RETRIES: "2" + TRAINED_MODEL_BACKOFF_SECONDS: "10" TRAINED_MODEL_READY_TIMEOUT_SECONDS: "1800" TRAINED_MODEL_READY_POLL_SECONDS: "20" TRAINED_MODEL_TIMEOUT_SECONDS: "600" @@ -223,9 +279,70 @@ jobs: python wait_for_trained_model.py python warm_trained_model.py + - name: Verify Cerebrium predict endpoint + id: verify_cerebrium_predict + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' }} + continue-on-error: true + timeout-minutes: 3 + env: + CEREBRIUM_INFERENCE_URL: ${{ secrets.CEREBRIUM_INFERENCE_URL }} + CEREBRIUM_API_KEY: ${{ secrets.CEREBRIUM_API_KEY || secrets.TRAINED_MODEL_API_KEY }} + run: | + python - <<'PYCODE' + import json, os, requests + url = (os.getenv('CEREBRIUM_INFERENCE_URL') or '').strip() + if not url: + raise SystemExit('CEREBRIUM_INFERENCE_URL is empty') + payload = { + 'task': 'trade_signal_classification', + 'candidates': [{'symbol':'AAPL','as_of_date':'2026-05-13','return_1d':0.01,'return_5d':0.02,'return_10d':0.03,'volatility_20d':0.02,'dist_ma_20':0.01,'dist_ma_50':0.02,'rsi_14':55,'volume_ratio':1.1,'news_count_7d':1,'news_sentiment_7d':0.1}] + } + headers = {'Content-Type':'application/json','Accept':'application/json'} + key = (os.getenv('CEREBRIUM_API_KEY') or '').strip() + if key: + headers['Authorization'] = f'Bearer {key}' + candidate_urls = [url] + if url.rstrip('/').endswith('/predict_trade_candidates'): + candidate_urls.append(url[: -len('/predict_trade_candidates')]) + else: + candidate_urls.append(url.rstrip('/') + '/predict_trade_candidates') + + last_error = None + max_attempts = 3 + preflight_ok = False + for test_url in candidate_urls: + for attempt in range(1, max_attempts + 1): + try: + r = requests.post(test_url, json=payload, headers=headers, timeout=45) + print('preflight url', test_url, 'attempt', attempt) + print('status', r.status_code) + print((r.text or '')[:500]) + r.raise_for_status() + data = r.json() + if not isinstance(data, dict): + raise SystemExit('Predict response was not JSON object') + has_signals = isinstance(data.get('signals'), list) + has_signal = data.get('signal') is not None + if not (has_signals or has_signal): + raise SystemExit('Predict response missing signal/signals') + print('predict preflight ok') + preflight_ok = True + break + except Exception as exc: + last_error = exc + print('preflight attempt failed:', exc) + if attempt < max_attempts: + import time + time.sleep(10 * attempt) + if preflight_ok: + break + if not preflight_ok: + raise SystemExit(f'Predict preflight failed for all candidate URLs: {last_error}') + PYCODE + - name: Run AI Trading Bot on Cerebrium id: run_ai_bot_cerebrium - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.warm_cerebrium_inference.outcome == 'success' }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' && steps.verify_cerebrium_predict.outcome == 'success' }} continue-on-error: true timeout-minutes: 90 env: @@ -248,7 +365,39 @@ jobs: TRAINED_MODEL_TIMEOUT_SECONDS: "600" TRAINED_MODEL_READY_TIMEOUT_SECONDS: "1800" TRAINED_MODEL_READY_POLL_SECONDS: "20" - TRAINED_MODEL_MAX_RETRIES: "0" + TRAINED_MODEL_MAX_RETRIES: "2" + TRAINED_MODEL_BACKOFF_SECONDS: "10" + TRAINED_MODEL_BATCH_SIZE: "4" + run: | + python main.py daily_job + + - name: Run AI Trading Bot on Cerebrium (Retry) + id: run_ai_bot_cerebrium_retry + if: ${{ always() && steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' && steps.verify_cerebrium_predict.outcome == 'success' && steps.run_ai_bot_cerebrium.outcome == 'failure' }} + continue-on-error: true + timeout-minutes: 90 + env: + SMTP_SERVER: ${{ secrets.SMTP_SERVER }} + SMTP_PORT: ${{ secrets.SMTP_PORT }} + SENDER_EMAIL: ${{ secrets.SENDER_EMAIL }} + SENDER_PASSWORD: ${{ secrets.SENDER_PASSWORD }} + RECIPIENT_EMAIL: ${{ secrets.RECIPIENT_EMAIL }} + CEREBRIUM_INFERENCE_URL: ${{ secrets.CEREBRIUM_INFERENCE_URL }} + CEREBRIUM_API_KEY: ${{ secrets.CEREBRIUM_API_KEY || secrets.TRAINED_MODEL_API_KEY }} + TRAINED_MODEL_API_KEY: ${{ secrets.TRAINED_MODEL_API_KEY }} + TWELVEDATA_API_KEYS: ${{ secrets.TWELVEDATA_API_KEYS }} + ARM_LIVE_TRADING: ${{ secrets.ARM_LIVE_TRADING }} + LOG_LEVEL: ${{ secrets.LOG_LEVEL }} + DISABLE_CORE_TRADING: "1" + AI_RUNTIME_MODE: "full" + AI_PRIMARY_BACKEND: "cerebrium" + AI_ROUTER_REASON: "cerebrium_retry_after_failure" + AI_PROMPT_CANDIDATES_LIMIT: "200" + TRAINED_MODEL_TIMEOUT_SECONDS: "600" + TRAINED_MODEL_READY_TIMEOUT_SECONDS: "1800" + TRAINED_MODEL_READY_POLL_SECONDS: "20" + TRAINED_MODEL_MAX_RETRIES: "2" + TRAINED_MODEL_BACKOFF_SECONDS: "10" TRAINED_MODEL_BATCH_SIZE: "4" run: | python main.py daily_job @@ -318,7 +467,7 @@ jobs: - name: Run AI Trading Bot (Distilled Local Fallback) id: run_ai_bot_distilled_local - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local' || (steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.run_ai_bot_cerebrium.outcome == 'failure')) || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure'))) }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local' || (steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.verify_cerebrium_predict.outcome == 'failure' || (steps.run_ai_bot_cerebrium.outcome == 'failure' && steps.run_ai_bot_cerebrium_retry.outcome != 'success'))) || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure'))) }} continue-on-error: true timeout-minutes: 60 env: @@ -333,13 +482,37 @@ jobs: LOG_LEVEL: ${{ secrets.LOG_LEVEL }} DISABLE_CORE_TRADING: "1" AI_RUNTIME_MODE: "distilled_local" - AI_ROUTER_REASON: ${{ steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && 'distilled_after_cerebrium_failure' || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && 'distilled_after_lightning_failure' || steps.plan_ai_runtime.outputs.reason) }} + AI_ROUTER_REASON: ${{ steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true' && 'distilled_after_cerebrium_failure' || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && 'distilled_after_lightning_failure' || steps.plan_ai_runtime.outputs.reason) }} + run: | + python quant_platform/scripts/prepare_ai_only_retry.py --results-dir results + python main.py daily_job + + - name: Run AI Trading Bot (Emergency Distilled Retry) + id: run_ai_bot_distilled_retry + if: ${{ always() && steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.run_ai_bot_distilled_local.outcome == 'failure' }} + continue-on-error: true + timeout-minutes: 60 + env: + SMTP_SERVER: ${{ secrets.SMTP_SERVER }} + SMTP_PORT: ${{ secrets.SMTP_PORT }} + SENDER_EMAIL: ${{ secrets.SENDER_EMAIL }} + SENDER_PASSWORD: ${{ secrets.SENDER_PASSWORD }} + RECIPIENT_EMAIL: ${{ secrets.RECIPIENT_EMAIL }} + TRAINED_MODEL_API_KEY: ${{ secrets.TRAINED_MODEL_API_KEY }} + TWELVEDATA_API_KEYS: ${{ secrets.TWELVEDATA_API_KEYS }} + ARM_LIVE_TRADING: ${{ secrets.ARM_LIVE_TRADING }} + LOG_LEVEL: ${{ secrets.LOG_LEVEL }} + DISABLE_CORE_TRADING: "1" + AI_RUNTIME_MODE: "distilled_local" + AI_ROUTER_REASON: "distilled_emergency_retry_after_failure" + TRAINED_MODEL_TIMEOUT_SECONDS: "450" + TRAINED_MODEL_MAX_RETRIES: "1" run: | python quant_platform/scripts/prepare_ai_only_retry.py --results-dir results python main.py daily_job - name: Send AI Failure Report - if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (((steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full') && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.run_ai_bot_cerebrium.outcome == 'failure') && steps.run_ai_bot_distilled_local.outcome == 'failure') || ((steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full') && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure') && steps.run_ai_bot_distilled_local.outcome == 'failure') || ((steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local') && steps.run_ai_bot_distilled_local.outcome == 'failure')) }} + if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (((steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.validate_cerebrium_config.outputs.has_inference_url == 'true') && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.verify_cerebrium_predict.outcome == 'failure' || (steps.run_ai_bot_cerebrium.outcome == 'failure' && steps.run_ai_bot_cerebrium_retry.outcome != 'success')) && steps.run_ai_bot_distilled_local.outcome == 'failure' && steps.run_ai_bot_distilled_retry.outcome != 'success') || ((steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full') && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure') && steps.run_ai_bot_distilled_local.outcome == 'failure' && steps.run_ai_bot_distilled_retry.outcome != 'success') || ((steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local') && steps.run_ai_bot_distilled_local.outcome == 'failure' && steps.run_ai_bot_distilled_retry.outcome != 'success')) }} continue-on-error: true env: SMTP_SERVER: ${{ secrets.SMTP_SERVER }} @@ -353,8 +526,11 @@ jobs: LAUNCH_LIGHTNING_OUTCOME: ${{ steps.launch_lightning_inference.outcome }} RUN_AI_STUDIO_OUTCOME: ${{ steps.run_ai_bot_in_lightning_studio.outcome }} WARM_CEREBRIUM_OUTCOME: ${{ steps.warm_cerebrium_inference.outcome }} + VERIFY_CEREBRIUM_OUTCOME: ${{ steps.verify_cerebrium_predict.outcome }} RUN_AI_CEREBRIUM_OUTCOME: ${{ steps.run_ai_bot_cerebrium.outcome }} + RUN_AI_CEREBRIUM_RETRY_OUTCOME: ${{ steps.run_ai_bot_cerebrium_retry.outcome }} RUN_AI_DISTILLED_OUTCOME: ${{ steps.run_ai_bot_distilled_local.outcome }} + RUN_AI_DISTILLED_RETRY_OUTCOME: ${{ steps.run_ai_bot_distilled_retry.outcome }} run: | source_step="Run AI Trading Bot" detail="AI daily job failed before sending its report email. Run: ${RUN_URL}" @@ -437,65 +613,138 @@ jobs: model_registry.json key: ${{ runner.os }}-trading-bot-${{ github.run_id }} + + - name: Ensure Daily Email Coverage + if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) }} + continue-on-error: true + env: + SMTP_SERVER: ${{ secrets.SMTP_SERVER }} + SMTP_PORT: ${{ secrets.SMTP_PORT }} + SENDER_EMAIL: ${{ secrets.SENDER_EMAIL }} + SENDER_PASSWORD: ${{ secrets.SENDER_PASSWORD }} + RECIPIENT_EMAIL: ${{ secrets.RECIPIENT_EMAIL }} + RUN_URL: ${{ format('{0}/{1}/actions/runs/{2}', github.server_url, github.repository, github.run_id) }} + CORE_EXPECTED: ${{ (github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_core_trading) }} + AI_EXPECTED: ${{ (github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading) }} + BASE_DEPS_OUTCOME: ${{ steps.install_base_dependencies.outcome }} + CORE_OUTCOME: ${{ steps.run_core_bot.outcome }} + CORE_RETRY_OUTCOME: ${{ steps.run_core_bot_retry.outcome }} + AI_RUNTIME_MODE: ${{ steps.plan_ai_runtime.outputs.runtime_mode }} + RUN_AI_CEREBRIUM_OUTCOME: ${{ steps.run_ai_bot_cerebrium.outcome }} + RUN_AI_CEREBRIUM_RETRY_OUTCOME: ${{ steps.run_ai_bot_cerebrium_retry.outcome }} + RUN_AI_STUDIO_OUTCOME: ${{ steps.run_ai_bot_in_lightning_studio.outcome }} + RUN_AI_DISTILLED_OUTCOME: ${{ steps.run_ai_bot_distilled_local.outcome }} + RUN_AI_DISTILLED_RETRY_OUTCOME: ${{ steps.run_ai_bot_distilled_retry.outcome }} + run: | + core_ok="false" + ai_ok="false" + + if [ "${CORE_EXPECTED}" = "false" ]; then + core_ok="true" + elif [ "${CORE_OUTCOME}" = "success" -o "${CORE_RETRY_OUTCOME}" = "success" ]; then + core_ok="true" + fi + + if [ "${AI_EXPECTED}" = "false" ]; then + ai_ok="true" + elif [ "${AI_RUNTIME_MODE}" = "cerebrium_full" ] && [ "${RUN_AI_CEREBRIUM_OUTCOME}" = "success" -o "${RUN_AI_CEREBRIUM_RETRY_OUTCOME}" = "success" -o "${RUN_AI_DISTILLED_OUTCOME}" = "success" -o "${RUN_AI_DISTILLED_RETRY_OUTCOME}" = "success" ]; then + ai_ok="true" + elif [ "${AI_RUNTIME_MODE}" = "lightning_full" ] && [ "${RUN_AI_STUDIO_OUTCOME}" = "success" -o "${RUN_AI_DISTILLED_OUTCOME}" = "success" -o "${RUN_AI_DISTILLED_RETRY_OUTCOME}" = "success" ]; then + ai_ok="true" + elif [ "${AI_RUNTIME_MODE}" = "distilled_local" ] && [ "${RUN_AI_DISTILLED_OUTCOME}" = "success" -o "${RUN_AI_DISTILLED_RETRY_OUTCOME}" = "success" ]; then + ai_ok="true" + fi + + if [ "${BASE_DEPS_OUTCOME}" != "success" ]; then + # Base dependency failure reports are emitted earlier; do not duplicate here. + exit 0 + fi + + if [ "${core_ok}" != "true" ]; then + python send_failure_report.py \ + --strategy-tag Core \ + --source "Email Coverage Guard" \ + --message "Core report email was not confirmed from workflow outcomes. Run: ${RUN_URL}" + fi + + if [ "${ai_ok}" != "true" ]; then + python send_failure_report.py \ + --strategy-tag AI \ + --source "Email Coverage Guard" \ + --message "AI report email was not confirmed from workflow outcomes (fallback may also have failed). Run: ${RUN_URL}" \ + --model-used "quant-trained-trading-model" + fi + - name: Evaluate Scheduled/Runner Run Status if: ${{ always() && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) }} env: BASE_DEPS_OUTCOME: ${{ steps.install_base_dependencies.outcome }} CORE_OUTCOME: ${{ steps.run_core_bot.outcome }} + CORE_RETRY_OUTCOME: ${{ steps.run_core_bot_retry.outcome }} + CORE_EXPECTED: ${{ (github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_core_trading) }} AI_EXPECTED: ${{ (github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading) }} INSTALL_LIGHTNING_OUTCOME: ${{ steps.install_lightning_deps.outcome }} AI_RUNTIME_MODE: ${{ steps.plan_ai_runtime.outputs.runtime_mode }} LAUNCH_LIGHTNING_OUTCOME: ${{ steps.launch_lightning_inference.outcome }} RUN_AI_STUDIO_OUTCOME: ${{ steps.run_ai_bot_in_lightning_studio.outcome }} WARM_CEREBRIUM_OUTCOME: ${{ steps.warm_cerebrium_inference.outcome }} + VERIFY_CEREBRIUM_OUTCOME: ${{ steps.verify_cerebrium_predict.outcome }} RUN_AI_CEREBRIUM_OUTCOME: ${{ steps.run_ai_bot_cerebrium.outcome }} + RUN_AI_CEREBRIUM_RETRY_OUTCOME: ${{ steps.run_ai_bot_cerebrium_retry.outcome }} RUN_AI_DISTILLED_OUTCOME: ${{ steps.run_ai_bot_distilled_local.outcome }} + RUN_AI_DISTILLED_RETRY_OUTCOME: ${{ steps.run_ai_bot_distilled_retry.outcome }} run: | rc=0 if [ "${BASE_DEPS_OUTCOME}" != "success" ]; then echo "Base dependency install failed: ${BASE_DEPS_OUTCOME}" rc=1 fi - if [ "${BASE_DEPS_OUTCOME}" = "success" ] && [ "${CORE_OUTCOME}" != "success" ]; then - echo "Core path failed: ${CORE_OUTCOME}" + if [ "${BASE_DEPS_OUTCOME}" = "success" ] && [ "${CORE_EXPECTED}" = "true" ] && [ "${CORE_OUTCOME}" != "success" ] && [ "${CORE_RETRY_OUTCOME}" != "success" ]; then + echo "Core path failed: initial=${CORE_OUTCOME}, retry=${CORE_RETRY_OUTCOME}" rc=1 fi if [ "${BASE_DEPS_OUTCOME}" = "success" ] && [ "${AI_EXPECTED}" = "true" ]; then if [ "${AI_RUNTIME_MODE}" = "cerebrium_full" ]; then if [ "${WARM_CEREBRIUM_OUTCOME}" != "success" ]; then echo "Cerebrium warmup failed: ${WARM_CEREBRIUM_OUTCOME}" - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then + rc=1 + fi + fi + if [ "${WARM_CEREBRIUM_OUTCOME}" = "success" ] && [ "${VERIFY_CEREBRIUM_OUTCOME}" != "success" ]; then + echo "Cerebrium predict preflight failed: ${VERIFY_CEREBRIUM_OUTCOME}" + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then rc=1 fi fi - if [ "${WARM_CEREBRIUM_OUTCOME}" = "success" ] && [ "${RUN_AI_CEREBRIUM_OUTCOME}" != "success" ]; then - echo "AI Cerebrium run failed: ${RUN_AI_CEREBRIUM_OUTCOME}" - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then + if [ "${WARM_CEREBRIUM_OUTCOME}" = "success" ] && [ "${VERIFY_CEREBRIUM_OUTCOME}" = "success" ] && [ "${RUN_AI_CEREBRIUM_OUTCOME}" != "success" ] && [ "${RUN_AI_CEREBRIUM_RETRY_OUTCOME}" != "success" ]; then + echo "AI Cerebrium run failed: initial=${RUN_AI_CEREBRIUM_OUTCOME}, retry=${RUN_AI_CEREBRIUM_RETRY_OUTCOME}" + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then rc=1 fi fi elif [ "${AI_RUNTIME_MODE}" = "lightning_full" ]; then if [ "${INSTALL_LIGHTNING_OUTCOME}" != "success" ]; then echo "Lightning dependency install failed: ${INSTALL_LIGHTNING_OUTCOME}" - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then rc=1 fi fi if [ "${LAUNCH_LIGHTNING_OUTCOME}" != "success" ]; then echo "Lightning launch failed: ${LAUNCH_LIGHTNING_OUTCOME}" - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then rc=1 fi fi if [ "${LAUNCH_LIGHTNING_OUTCOME}" = "success" ] && [ "${RUN_AI_STUDIO_OUTCOME}" != "success" ]; then echo "AI studio run failed: ${RUN_AI_STUDIO_OUTCOME}" - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then rc=1 fi fi else - if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ]; then - echo "AI distilled fallback run failed: ${RUN_AI_DISTILLED_OUTCOME}" + if [ "${RUN_AI_DISTILLED_OUTCOME}" != "success" ] && [ "${RUN_AI_DISTILLED_RETRY_OUTCOME}" != "success" ]; then + echo "AI distilled fallback run failed: ${RUN_AI_DISTILLED_OUTCOME}, retry: ${RUN_AI_DISTILLED_RETRY_OUTCOME}" rc=1 fi fi diff --git a/email_notifier.py b/email_notifier.py index fd0e930..8c13b1b 100644 --- a/email_notifier.py +++ b/email_notifier.py @@ -82,60 +82,17 @@ def _ai_view_text(row): def _manager_style_reason(pos): - """Generate a concise one-line rationale for AI entries in email output.""" + """Return model-provided reason text for AI entries without rewriting.""" if not isinstance(pos, dict): return "" - raw_reason = str(pos.get("reason") or "").strip() - if not raw_reason: - return "" - side = str(pos.get("side") or "LONG").upper() - conf = pos.get("decision_confidence", pos.get("confidence")) - alloc_pct = pos.get("allocation_pct") - try: - conf_txt = f"{float(conf):.2f}" - except (TypeError, ValueError): - conf_txt = "N/A" - try: - alloc_txt = f"{float(alloc_pct):.1f}%" - except (TypeError, ValueError): - alloc_txt = "N/A" - - tokens = [] - low = raw_reason.lower() - if "momentum" in low: - tokens.append("momentum continuation") - if "trend" in low: - tokens.append("trend persistence") - if "volume" in low: - tokens.append("volume confirmation") - if "rsi" in low: - tokens.append("RSI regime support") - if "feature balance" in low: - tokens.append("cross-factor alignment") - if not tokens: - tokens.append("multi-factor model signal") - thesis = ", ".join(tokens[:3]) - direction = "Upside" if side == "LONG" else "Downside" - return f"{direction} thesis: {thesis}; conviction={conf_txt}; size={alloc_txt}." + return str(pos.get("decision_reason") or pos.get("reason") or "").strip() def _manager_style_close_reason(pos): - """Generate a concise one-line rationale for AI position exits in email output.""" + """Return model-provided reason text for AI exits without rewriting.""" if not isinstance(pos, dict): return "" - raw_reason = str(pos.get("reason") or "").strip() - side = str(pos.get("side") or "LONG").upper() - try: - pnl_pct = float(pos.get("realized_pnl", 0.0) or 0.0) - pnl_txt = f"{pnl_pct:+.2%}" - except (TypeError, ValueError): - pnl_txt = "N/A" - if "AI rotation" in raw_reason: - direction = "long" if side == "LONG" else "short" - return f"Portfolio rebalance exit: {direction} position rotated out after model target update; realized={pnl_txt}." - if raw_reason: - return f"Model-driven exit: {raw_reason}; realized={pnl_txt}." - return f"Model-driven exit executed; realized={pnl_txt}." + return str(pos.get("decision_reason") or pos.get("reason") or "").strip() class EmailNotifier: @@ -216,13 +173,42 @@ def send_daily_report( stocks_scanned_str = str(stocks_scanned_today) if stocks_scanned_today is not None else "N/A" # Build body without leading indentation (some email clients render leading spaces poorly). + period_start = str(report_data.get("performance_period_start") or "").strip() + period_label = f"Performance Period Since Last Reset: {period_start}" if period_start else "Performance Period Since Last Reset: N/A" body_lines = [ "Daily Trading Bot Report", "========================", f"Date: {report_data['date']}", + period_label, "", ] + if str(subject_tag or "").strip().upper() == "AI" and isinstance(pipeline_stats, dict): + ai_status = pipeline_stats.get("ai_trading_llm_status") + if isinstance(ai_status, dict): + backend = str(ai_status.get("selected_backend") or ai_status.get("backend") or "unknown").strip() or "unknown" + provider = str(ai_status.get("backend_provider") or "").strip() + model = str(ai_status.get("model_used") or ai_status.get("model") or "unknown").strip() or "unknown" + fallback_from = str(ai_status.get("fallback_from_backend") or "").strip() + router_reason = str(ai_status.get("router_reason") or "").strip() + skipped_reason = str(ai_status.get("skipped_reason") or "").strip() + if provider and provider.lower() not in backend.lower(): + backend = f"{backend} ({provider})" + body_lines.extend([ + "AI RUNTIME", + "-" * 40, + f"Backend Used: {backend}", + f"Model Used: {model}", + f"Status: {'OK' if ai_status.get('ok') else 'ERROR'}", + ]) + if fallback_from: + body_lines.append(f"Fallback From: {fallback_from}") + if router_reason: + body_lines.append(f"Router Reason: {router_reason}") + if skipped_reason: + body_lines.append(f"Skipped Reason: {skipped_reason}") + body_lines.append("") + if strategies: body_lines.append("This report includes multiple strategy accounts. See STRATEGY DETAILS below.") body_lines.append(f"Stocks Scanned Today: {stocks_scanned_str}") @@ -258,7 +244,7 @@ def send_daily_report( ]) body_lines.append("") - if pipeline_stats: + if pipeline_stats and str(subject_tag or "").strip().upper() != "AI": body_lines.extend([ "PIPELINE SUMMARY", "-" * 40, @@ -410,13 +396,6 @@ def send_daily_report( body_lines.append("No open positions.") body_lines.append("") - if meta_insights and not strategies and str(subject_tag or "").strip().upper() == "AI": - body_lines.extend([ - "META-LEARNER INSIGHTS", - "-" * 40, - str(meta_insights).strip(), - "", - ]) ai_autonomous = _ai_autonomous_mode(report_data, pipeline_stats, subject_tag=subject_tag) diff --git a/ingest_prices.py b/ingest_prices.py index 688cf12..9ca3ec7 100644 --- a/ingest_prices.py +++ b/ingest_prices.py @@ -128,29 +128,37 @@ def fetch_stooq_data(self, symbol): tickers.append(c) seen.add(c) - try: - for tkr in tickers: - stooq_symbol = f"{tkr}.us" - url = f"https://stooq.com/q/d/l/?s={stooq_symbol}&f=sd2t2ohlcv&h&e=csv" - logger.info(f"Downloading {symbol} from Stooq (s={stooq_symbol})...") + for tkr in tickers: + stooq_symbol = f"{tkr}.us" + url = f"https://stooq.com/q/d/l/?s={stooq_symbol}&f=sd2t2ohlcv&h&e=csv" + logger.info(f"Downloading {symbol} from Stooq (s={stooq_symbol})...") + try: response = requests.get(url, timeout=15) response.raise_for_status() if "No data" in response.text or len(response.text) < 50: continue - df = pd.read_csv(StringIO(response.text)) + df = pd.read_csv(StringIO(response.text), on_bad_lines="skip") df.columns = [c.lower() for c in df.columns] + required = {"date", "open", "high", "low", "close", "volume"} + if not required.issubset(set(df.columns)): + logger.warning( + "Stooq returned unexpected columns for %s (s=%s): %s", + symbol, + stooq_symbol, + ",".join(df.columns), + ) + continue # Preserve canonical symbol as provided by universe file (usually uppercase). df['symbol'] = str(symbol or "").strip().upper() return df[['symbol', 'date', 'open', 'high', 'low', 'close', 'volume']] + except Exception as e: + logger.warning("Stooq fetch attempt failed for %s (s=%s): %s", symbol, stooq_symbol, e) + continue - logger.warning(f"No data found for {symbol} (tried: {', '.join([f'{t}.us' for t in tickers])})") - return None - - except Exception as e: - logger.error(f"Failed to fetch {symbol} from Stooq: {e}") - return None + logger.warning(f"No data found for {symbol} (tried: {', '.join([f'{t}.us' for t in tickers])})") + return None def fetch_stooq_latest(self, symbol): """ @@ -186,7 +194,7 @@ def fetch_stooq_latest(self, symbol): text = response.text.strip() if "No data" in text or len(text.splitlines()) < 2: continue - df = pd.read_csv(StringIO(text)) + df = pd.read_csv(StringIO(text), on_bad_lines="skip") df.columns = [c.lower() for c in df.columns] # Expected columns: symbol,date,time,open,high,low,close,volume if "date" not in df.columns: diff --git a/llm_trader.py b/llm_trader.py index 62c0fe3..2140cc0 100644 --- a/llm_trader.py +++ b/llm_trader.py @@ -302,6 +302,8 @@ def _predict_trades_from_client( neutral_predictions = 0 neutral_breakouts = 0 failures = [] + has_open_positions = any(bool(str(c.get("current_position_side") or "").strip()) for c in prompt_candidates if isinstance(c, dict)) + soft_cash_enabled = bool(ai_cfg.get("soft_cash_deploy_enabled", True)) if isinstance(manager_context, dict) and manager_context: try: batch_predictions = client.predict_candidates(prompt_candidates, manager_context=manager_context) @@ -325,7 +327,55 @@ def _predict_trades_from_client( reason = prediction.get("reason") if score == 0.0: neutral_predictions += 1 - continue + breakout = _neutral_breakout_score(prediction, ai_cfg) + if breakout is None: + if soft_cash_enabled and not has_open_positions: + probs = prediction.get("class_probabilities") if isinstance(prediction, dict) else None + if isinstance(probs, dict): + directional = [] + for lbl in _DIRECTIONAL_LABELS: + try: + directional.append((lbl, float(probs.get(lbl, 0.0) or 0.0))) + except (TypeError, ValueError): + directional.append((lbl, 0.0)) + best_label, best_prob = max(directional, key=lambda item: item[1]) + try: + neutral_prob = float(probs.get("NEUTRAL", 0.0) or 0.0) + except (TypeError, ValueError): + neutral_prob = 0.0 + if best_prob >= 0.18 and (neutral_prob - best_prob) <= 0.20: + neutral_breakouts += 1 + score = float(_LABEL_TO_SCORE.get(best_label, 0.0)) + confidence = max(confidence, min(0.99, best_prob)) + prediction = { + **prediction, + "label": best_label, + "reason": prediction.get("reason") + or ( + f"Soft cash deployment bias: directional_prob={best_prob:.2f}, neutral_prob={neutral_prob:.2f}." + ), + } + breakout = { + "label": best_label, + "score": float(_LABEL_TO_SCORE.get(best_label, 0.0)), + "confidence": confidence, + "directional_prob": best_prob, + "neutral_prob": neutral_prob, + } + if breakout is None: + continue + neutral_breakouts += 1 + score = float(breakout.get("score", 0.0) or 0.0) + confidence = max(confidence, float(breakout.get("confidence", confidence) or confidence)) + prediction = { + **prediction, + "label": breakout.get("label") or prediction.get("label"), + "reason": prediction.get("reason") + or ( + f"Neutral breakout override: directional_prob={float(breakout.get('directional_prob', 0.0) or 0.0):.2f}, " + f"neutral_prob={float(breakout.get('neutral_prob', 0.0) or 0.0):.2f}." + ), + } side = "LONG" if score > 0 else "SHORT" diff --git a/main.py b/main.py index 20cdafb..1d45328 100644 --- a/main.py +++ b/main.py @@ -831,8 +831,10 @@ def run_daily_test(self, test_date=None, pipeline_stats=None, backtest_signals=N invested_notional = float((open_positions_now["entry_price"] * open_positions_now["quantity"]).sum() or 0.0) available_cash = float(current_capital) - invested_notional + period_start_core = self.core_tracker.get_performance_period_start() report = { 'date': test_date.date(), + 'performance_period_start': period_start_core, 'new_positions_opened': len(new_positions), 'positions_topped_up': len(top_up_positions), 'positions_closed_at_tp': len(closed_positions), @@ -1230,6 +1232,10 @@ def add_symbol(value): if isinstance(ai_llm_status, dict): ai_llm_status["entries_blocked_due_to_llm_error"] = True else: + skipped_reason = str((ai_llm_status or {}).get("skipped_reason") or "").strip().lower() + suppress_rotation_closes = skipped_reason in {"all_neutral", "no_action"} or bool( + (ai_llm_status or {}).get("entries_blocked_due_to_llm_error") + ) target_map = { str(trade.get("symbol") or "").strip().upper(): trade for trade in ai_trades @@ -1250,6 +1256,19 @@ def add_symbol(value): if exec_price is None: exec_price = _pyfloat(exec_row.get("close")) if target is None or str(target.get("side") or "LONG").upper() != current_side: + if suppress_rotation_closes and target is None: + self.ai_tracker.update_position_decision( + symbol, + trade_date_str, + decision_label="HOLD", + decision_confidence=None, + decision_reason=( + "AI rotation close suppressed because manager produced no actionable targets " + f"(skipped_reason={skipped_reason or 'none'})." + ), + target_price=_pyfloat(pos.get("entry_price")), + ) + continue if exec_price is None: _record_pipeline_issue( pipeline_stats, @@ -1414,8 +1433,10 @@ def add_symbol(value): ai_llm_status["positions_closed_by_ai"] = len(ai_closed) ai_llm_status["positions_topped_up"] = len(ai_topups) + period_start_ai = self.ai_tracker.get_performance_period_start() ai_report = { "date": test_date.date(), + "performance_period_start": period_start_ai, "new_positions_opened": len(ai_new), "positions_topped_up": len(ai_topups), "positions_closed_at_tp": len(ai_closed), @@ -1491,17 +1512,32 @@ def add_symbol(value): ai_email_sent = True if ai_report is not None: + backend_provider = str(ai_llm_status.get('backend_provider') or '').strip() + selected_backend = str(ai_llm_status.get('selected_backend') or ai_llm_status.get('backend') or '').strip() + model_used = str(ai_llm_status.get('model_used') or ai_llm_status.get('model') or '').strip() + fallback_from_backend = str(ai_llm_status.get('fallback_from_backend') or '').strip() + backend_summary = selected_backend or backend_provider or "unknown" + if backend_provider and backend_provider.lower() not in backend_summary.lower(): + backend_summary = f"{backend_summary} ({backend_provider})" if ai_llm_status.get("ok"): ai_insight = ( "AI trading engine status: OK" f" | mode={ai_llm_status.get('manager_mode') or 'unknown'}" + f" | backend={backend_summary}" + f" | model={model_used or 'unknown'}" f" | target_positions={ai_llm_status.get('target_positions', 0)}" f" | closed={ai_llm_status.get('positions_closed_by_ai', 0)}" f" | opened={ai_llm_status.get('positions_opened', 0)}" f" | topped_up={ai_llm_status.get('positions_topped_up', 0)}" ) + if fallback_from_backend: + ai_insight += f" | fallback_from={fallback_from_backend}" else: - ai_insight = f"AI trading engine status: ERROR - {ai_llm_status.get('error')}" + ai_insight = ( + f"AI trading engine status: ERROR - {ai_llm_status.get('error')}" + f" | backend={backend_summary}" + f" | model={model_used or 'unknown'}" + ) ai_email_positions = list(ai_new) + list(ai_topups) ai_email_sent = notifier.send_daily_report( report_data=ai_report, diff --git a/positions.py b/positions.py index 091a37b..cff9b8b 100644 --- a/positions.py +++ b/positions.py @@ -532,6 +532,18 @@ def get_open_positions(self): conn.close() return open_positions + def get_performance_period_start(self): + """Best-effort period start date since the last reset for this table.""" + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute( + f"SELECT MIN(COALESCE(entry_date, '')) FROM {self.table_name} WHERE COALESCE(entry_date, '') <> ''" + ).fetchone() + value = row[0] if row else None + return str(value).strip() if value else None + finally: + conn.close() + def get_portfolio_summary(self): """Get summary of all positions (open and closed)""" conn = sqlite3.connect(self.db_path) diff --git a/quant_platform/scripts/plan_ai_runtime.py b/quant_platform/scripts/plan_ai_runtime.py index d128e70..dfac42f 100644 --- a/quant_platform/scripts/plan_ai_runtime.py +++ b/quant_platform/scripts/plan_ai_runtime.py @@ -28,7 +28,12 @@ def _load_router_config() -> dict: def choose_runtime() -> dict: router_cfg = _load_router_config() - cerebrium_url = str(os.getenv("CEREBRIUM_INFERENCE_URL") or os.getenv("CEREBRIUM_TRAINED_MODEL_URL") or "").strip() + cerebrium_url = str( + os.getenv("CEREBRIUM_INFERENCE_URL") + or os.getenv("CEREBRIUM_TRAINED_MODEL_URL") + or os.getenv("TRAINED_MODEL_INFERENCE_URL") + or "" + ).strip() if cerebrium_url: return { "runtime_mode": "cerebrium_full", diff --git a/trained_model_client.py b/trained_model_client.py index 7faf836..fff1c88 100644 --- a/trained_model_client.py +++ b/trained_model_client.py @@ -77,7 +77,7 @@ def model_identifier(self) -> str: return self.model_name or self.inference_url or "trained-model-http" def is_ready(self) -> bool: - if self.backend not in {"http", "cerebrium", "cerebrium_full"}: + if self.backend not in {"http", "cerebrium", "cerebrium_full", "cerebrum"}: self.last_error = f"Unsupported trained model backend: {self.backend}. Use remote HTTP inference only." return False if not self.inference_url: @@ -128,6 +128,7 @@ def _predict_batch_http(self, candidates: List[dict], manager_context: Optional[ data = None last_exc = None prediction_url = self._prediction_url() + tried_urls = set() for attempt in range(self.max_retries + 1): attempt_started = time.time() logger.info( @@ -162,6 +163,14 @@ def _predict_batch_http(self, candidates: List[dict], manager_context: Optional[ break except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as exc: last_exc = exc + status_code = getattr(getattr(exc, 'response', None), 'status_code', None) + if status_code == 404: + alt_url = self._alternate_prediction_url(prediction_url) + if alt_url and alt_url not in tried_urls: + tried_urls.add(prediction_url) + prediction_url = alt_url + logger.warning("Trained model 404 on %s; retrying once with alternate url %s", tried_urls, prediction_url) + continue if attempt >= self.max_retries: raise sleep_seconds = self.backoff_seconds * (attempt + 1) @@ -213,6 +222,12 @@ def wait_until_ready(self, timeout_seconds: int = 600, poll_seconds: float = 10. last_error = str(payload.get("error") or payload) except Exception as exc: last_error = str(exc) + if self.provider in {"cerebrium", "cerebrium_full", "cerebrum"} and "404" in last_error: + # Some Cerebrium endpoint shapes do not expose a direct /health route. + # Allow inference run to continue and rely on the actual predict call as truth. + self.last_error = None + self.last_model_used = self.model_identifier + return {"ok": True, "model": self.model_identifier, "health_probe": "skipped_after_404"} self.last_error = last_error remaining = deadline - time.time() if remaining <= 0: @@ -232,6 +247,16 @@ def _prediction_url(self) -> str: return url.rstrip("/") + "/predict_trade_candidates" return url + def _alternate_prediction_url(self, url: str) -> str | None: + raw = str(url or "").strip() + if not raw: + return None + if raw.endswith('/predict_trade_candidates'): + return raw[: -len('/predict_trade_candidates')] + if self.provider in {"cerebrium", "cerebrium_full", "cerebrum"}: + return raw.rstrip('/') + '/predict_trade_candidates' + return None + def _health_url(self) -> str: url = (self.inference_url or "").strip() if not url: