diff --git a/new_ui/backend/api/routes/workflows.py b/new_ui/backend/api/routes/workflows.py index 7ee9f977..f4448deb 100644 --- a/new_ui/backend/api/routes/workflows.py +++ b/new_ui/backend/api/routes/workflows.py @@ -99,6 +99,7 @@ async def get_workflow_status(task_id: str): "message": task.message, "result": task.result, "error": task.error, + "error_details": task.error_details, "started_at": task.started_at.isoformat() if task.started_at else None, "completed_at": task.completed_at.isoformat() if task.completed_at else None, } @@ -238,6 +239,7 @@ async def get_recent_tasks(limit: int = 10): "message": task.message, "result": task.result, "error": task.error, + "error_details": task.error_details, "started_at": task.started_at, "completed_at": task.completed_at, } diff --git a/new_ui/backend/api/websockets/workflow_ws.py b/new_ui/backend/api/websockets/workflow_ws.py index 974e6a7e..85e76795 100644 --- a/new_ui/backend/api/websockets/workflow_ws.py +++ b/new_ui/backend/api/websockets/workflow_ws.py @@ -102,11 +102,13 @@ async def workflow_websocket(websocket: WebSocket, task_id: str): "type": "status", "task_id": task_id, "status": task.status, - "progress": task.progress, - "message": task.message, - "timestamp": datetime.utcnow().isoformat(), - } - ) + "progress": task.progress, + "message": task.message, + "error": task.error, + "error_details": task.error_details, + "timestamp": datetime.utcnow().isoformat(), + } + ) # Send pending interaction if any (fixes race condition where interaction_required # was broadcast before WebSocket connected) @@ -149,6 +151,7 @@ async def workflow_websocket(websocket: WebSocket, task_id: str): "type": "error", "task_id": task_id, "error": task.error, + "error_details": task.error_details, "timestamp": datetime.utcnow().isoformat(), } ) diff --git a/new_ui/backend/models/responses.py b/new_ui/backend/models/responses.py index c996c0db..4e57039a 100644 --- a/new_ui/backend/models/responses.py +++ b/new_ui/backend/models/responses.py @@ -25,6 +25,7 @@ class WorkflowStatusResponse(BaseModel): message: str = "" result: Optional[Dict[str, Any]] = None error: Optional[str] = None + error_details: Optional[Dict[str, Any]] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None diff --git a/new_ui/backend/services/workflow_service.py b/new_ui/backend/services/workflow_service.py index 768012ee..5e179261 100644 --- a/new_ui/backend/services/workflow_service.py +++ b/new_ui/backend/services/workflow_service.py @@ -10,6 +10,7 @@ import asyncio import uuid import os +import re import shutil from datetime import datetime from pathlib import Path @@ -30,6 +31,7 @@ class WorkflowTask: message: str = "" result: Optional[Dict[str, Any]] = None error: Optional[str] = None + error_details: Optional[Dict[str, Any]] = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None cancel_event: asyncio.Event = field(default_factory=asyncio.Event) @@ -181,7 +183,13 @@ def callback(progress: int, message: str, error: Optional[str] = None): task.progress = progress task.message = message if error: - task.error = error + task.error = self._sanitize_error_message(error) + task.error_details = self._build_error_details( + task, + error, + stage=message, + progress=progress, + ) # Broadcast to all subscribers asyncio.create_task( @@ -192,7 +200,8 @@ def callback(progress: int, message: str, error: Optional[str] = None): "task_id": task_id, "progress": progress, "message": message, - "error": error, + "error": self._sanitize_error_message(error), + "error_details": task.error_details if task else None, "timestamp": datetime.utcnow().isoformat(), }, ) @@ -297,6 +306,9 @@ async def execute_paper_to_code( task.status = "running" task.started_at = datetime.utcnow() + task.error = None + task.error_details = None + original_cwd = os.getcwd() # Bind task_id into the async context so every loguru call and # provider/MCP record made downstream is automatically attributed @@ -326,7 +338,6 @@ async def execute_paper_to_code( progress_callback = await self._create_progress_callback(task_id) # Change to project root directory for MCP server paths to work correctly - original_cwd = os.getcwd() os.chdir(PROJECT_ROOT) # Create MCP app context with explicit config path @@ -350,58 +361,32 @@ async def execute_paper_to_code( ), ) - result_status = self._pipeline_status(result) - task.status = result_status - task.progress = 100 if result_status == "completed" else 95 - task.result = self._build_workflow_result(result, result_status) - task.completed_at = datetime.utcnow() - self._record_session_outcome( - task, - role="assistant", - body=self._pipeline_summary(result), - metadata=self._pipeline_metadata(result), + return await self._finish_task_with_pipeline_result( + task_id, task, result ) - # Broadcast completion signal to all subscribers - await self._broadcast( - task_id, - { - "type": "complete", - "task_id": task_id, - "status": result_status, - "result": task.result, - "timestamp": datetime.utcnow().isoformat(), - }, - ) - # Give WebSocket handlers time to receive the completion message - await asyncio.sleep(0.5) - - return task.result - except (PlanReviewCancelled, asyncio.CancelledError) as e: reason = getattr(e, "reason", None) or str(e) or "Workflow cancelled" return await self._mark_task_cancelled(task_id, reason) except Exception as e: - task.status = "error" - task.error = str(e) + self._mark_task_error(task, e) task.completed_at = datetime.utcnow() self._record_session_outcome( - task, role="system", body=f"Workflow failed: {e}" + task, + role="system", + body=f"Workflow failed: {task.error}", + metadata=self._pipeline_metadata(None, task), ) # Broadcast error signal to all subscribers - await self._broadcast( - task_id, - { - "type": "error", - "task_id": task_id, - "error": str(e), - "timestamp": datetime.utcnow().isoformat(), - }, - ) + await self._broadcast(task_id, self._error_payload(task)) - return {"status": "error", "error": str(e)} + return { + "status": "error", + "error": task.error, + "error_details": task.error_details, + } finally: # Restore original working directory @@ -435,6 +420,9 @@ async def execute_chat_planning( task.status = "running" task.started_at = datetime.utcnow() + task.error = None + task.error_details = None + original_cwd = os.getcwd() short_task_id = str(task_id)[:8] if task_id else None task.task_short_id = short_task_id @@ -459,7 +447,6 @@ async def execute_chat_planning( progress_callback = await self._create_progress_callback(task_id) # Change to project root directory for MCP server paths to work correctly - original_cwd = os.getcwd() os.chdir(PROJECT_ROOT) # Create MCP app context with explicit config path @@ -531,58 +518,32 @@ async def execute_chat_planning( ), ) - result_status = self._pipeline_status(result) - task.status = result_status - task.progress = 100 if result_status == "completed" else 95 - task.result = self._build_workflow_result(result, result_status) - task.completed_at = datetime.utcnow() - self._record_session_outcome( - task, - role="assistant", - body=self._pipeline_summary(result), - metadata=self._pipeline_metadata(result), - ) - - # Broadcast completion signal to all subscribers - await self._broadcast( - task_id, - { - "type": "complete", - "task_id": task_id, - "status": result_status, - "result": task.result, - "timestamp": datetime.utcnow().isoformat(), - }, + return await self._finish_task_with_pipeline_result( + task_id, task, result ) - # Give WebSocket handlers time to receive the completion message - await asyncio.sleep(0.5) - - return task.result except (PlanReviewCancelled, asyncio.CancelledError) as e: reason = getattr(e, "reason", None) or str(e) or "Workflow cancelled" return await self._mark_task_cancelled(task_id, reason) except Exception as e: - task.status = "error" - task.error = str(e) + self._mark_task_error(task, e) task.completed_at = datetime.utcnow() self._record_session_outcome( - task, role="system", body=f"Workflow failed: {e}" + task, + role="system", + body=f"Workflow failed: {task.error}", + metadata=self._pipeline_metadata(None, task), ) # Broadcast error signal to all subscribers - await self._broadcast( - task_id, - { - "type": "error", - "task_id": task_id, - "error": str(e), - "timestamp": datetime.utcnow().isoformat(), - }, - ) + await self._broadcast(task_id, self._error_payload(task)) - return {"status": "error", "error": str(e)} + return { + "status": "error", + "error": task.error, + "error_details": task.error_details, + } finally: # Restore original working directory @@ -780,14 +741,20 @@ def _pipeline_summary(result: Any) -> str: return str(result.get("summary") or result) return str(result) - @staticmethod - def _pipeline_metadata(result: Any) -> Dict[str, Any]: + def _pipeline_metadata( + self, result: Any, task: Optional["WorkflowTask"] = None + ) -> Dict[str, Any]: if not isinstance(result, dict): - return {} + metadata: Dict[str, Any] = {} + if task and task.error_details: + metadata["error_details"] = task.error_details + return metadata metadata: Dict[str, Any] = {} implementation = result.get("implementation") if isinstance(implementation, dict): metadata["implementation"] = implementation + if task and task.error_details: + metadata["error_details"] = task.error_details return metadata @staticmethod @@ -800,6 +767,235 @@ def _build_workflow_result(result: Any, result_status: str) -> Dict[str, Any]: payload["implementation"] = result["implementation"] return payload + async def _finish_task_with_pipeline_result( + self, + task_id: str, + task: "WorkflowTask", + result: Any, + ) -> Dict[str, Any]: + result_status = self._pipeline_status(result) + task.status = result_status + task.progress = 100 if result_status == "completed" else 95 + task.result = self._build_workflow_result(result, result_status) + task.completed_at = datetime.utcnow() + + if result_status == "error": + error_message = self._extract_result_error(result) + task.error = error_message + task.error_details = self._build_error_details( + task, + error_message, + stage=task.message or "Workflow failed", + progress=task.progress, + result=result, + ) + + self._record_session_outcome( + task, + role="system" if result_status == "error" else "assistant", + body=( + f"Workflow failed: {task.error}" + if result_status == "error" + else self._pipeline_summary(result) + ), + metadata=self._pipeline_metadata(result, task), + ) + + if result_status == "error": + await self._broadcast(task_id, self._error_payload(task)) + else: + await self._broadcast( + task_id, + { + "type": "complete", + "task_id": task_id, + "status": result_status, + "result": task.result, + "timestamp": datetime.utcnow().isoformat(), + }, + ) + + # Give WebSocket handlers time to receive the terminal message. + await asyncio.sleep(0.5) + return task.result + + @classmethod + def _sanitize_error_message(cls, error: Any) -> str: + message = str(error or "Unknown error").strip() or "Unknown error" + replacements = [ + (r"sk-[A-Za-z0-9_-]{12,}", "sk-***"), + (r"sk-or-v1-[A-Za-z0-9_-]{12,}", "sk-or-v1-***"), + (r"(?i)(api[_-]?key\s*[=:]\s*)([^\s,'\"}]+)", r"\1***"), + (r"(?i)(authorization:\s*bearer\s+)([^\s,'\"}]+)", r"\1***"), + (r"(?i)(token\s*[=:]\s*)([^\s,'\"}]+)", r"\1***"), + ] + for pattern, replacement in replacements: + message = re.sub(pattern, replacement, message) + return message[:4000] + + @staticmethod + def _extract_result_error(result: Any) -> str: + if not isinstance(result, dict): + return "Workflow failed" + + candidates = [ + result.get("error"), + result.get("message"), + ] + implementation = result.get("implementation") + if isinstance(implementation, dict): + candidates.extend( + [ + implementation.get("message"), + implementation.get("abort_reason"), + implementation.get("error"), + ] + ) + candidates.append(result.get("summary")) + + for candidate in candidates: + if candidate: + return WorkflowService._sanitize_error_message(candidate) + return "Workflow failed; see task logs for details" + + def _mark_task_error(self, task: "WorkflowTask", error: Any) -> None: + task.status = "error" + task.error = self._sanitize_error_message(error) + task.error_details = self._build_error_details( + task, + error, + stage=task.message or "Workflow failed", + progress=task.progress, + ) + + def _error_payload(self, task: "WorkflowTask") -> Dict[str, Any]: + return { + "type": "error", + "task_id": task.task_id, + "error": task.error, + "error_details": task.error_details, + "timestamp": datetime.utcnow().isoformat(), + } + + def _build_error_details( + self, + task: "WorkflowTask", + error: Any, + *, + stage: Optional[str] = None, + progress: Optional[int] = None, + result: Any = None, + ) -> Dict[str, Any]: + message = self._sanitize_error_message(error) + error_type = type(error).__name__ if isinstance(error, BaseException) else None + category = self._classify_error(message) + short_id = task.task_short_id or task.task_id[:8] + task_dir = self._resolve_task_dir(task) + log_paths = self._build_log_paths(task_dir) + + details: Dict[str, Any] = { + "message": message, + "category": category, + "stage": stage or task.message or "Workflow failed", + "progress": progress if progress is not None else task.progress, + "task_id": task.task_id, + "task_short_id": short_id, + "task_kind": task.task_kind, + "session_id": task.session_id, + "hint": self._error_hint(category), + } + if error_type: + details["error_type"] = error_type + if task_dir: + details["task_dir"] = str(task_dir) + details["log_stream_url"] = f"/ws/tasks/{short_id}/logs" + if log_paths: + details["log_paths"] = log_paths + if isinstance(result, dict): + details["result_status"] = result.get("status") + implementation = result.get("implementation") + if isinstance(implementation, dict): + details["implementation_status"] = implementation.get("status") + details["implementation_message"] = implementation.get("message") + return details + + @staticmethod + def _classify_error(message: str) -> str: + lowered = message.lower() + if ( + "504" in lowered + or "gateway time-out" in lowered + or "gateway timeout" in lowered + ): + return "provider_timeout" + if "timeout" in lowered or "timed out" in lowered: + return "timeout" + if "rate limit" in lowered or "429" in lowered: + return "rate_limit" + if "quota" in lowered or "insufficient" in lowered or "billing" in lowered: + return "quota_or_billing" + if "api" in lowered or "llm" in lowered or "provider" in lowered: + return "llm_provider" + if "document" in lowered or "pdf" in lowered or "preprocessing" in lowered: + return "document_preprocessing" + return "workflow" + + @staticmethod + def _error_hint(category: str) -> str: + hints = { + "provider_timeout": ( + "The LLM provider or upstream gateway timed out. Check provider " + "status, proxy settings, and task logs; retrying with a smaller " + "input or another model may help." + ), + "timeout": ( + "A workflow step exceeded its wait time. Check the task logs to " + "see which agent or tool stalled." + ), + "rate_limit": ( + "The LLM provider rejected the request due to rate limits. Wait " + "before retrying or use another provider/model." + ), + "quota_or_billing": ( + "The provider appears to be out of quota or billing credit. " + "Check the configured API account." + ), + "llm_provider": ( + "The failure came from the model provider path. Check provider " + "configuration, model id, API base, and task LLM logs." + ), + "document_preprocessing": ( + "The input document could not be processed cleanly. Check the " + "uploaded file and preprocessing logs." + ), + } + return hints.get( + category, + "Check the task logs for the failing stage and retry after fixing the underlying issue.", + ) + + @staticmethod + def _build_log_paths(task_dir: Optional[Path]) -> Dict[str, str]: + if not task_dir: + return {} + return { + channel: str(task_dir / "logs" / f"{channel}.jsonl") + for channel in ("system", "llm", "mcp") + } + + @staticmethod + def _resolve_task_dir(task: "WorkflowTask") -> Optional[Path]: + if task.task_dir: + return Path(task.task_dir) + short_id = task.task_short_id or task.task_id[:8] + session = session_store.find_session_by_task(short_id) + if session is None: + return None + for stored_task in session.tasks: + if stored_task.task_id == short_id and stored_task.task_dir: + return Path(stored_task.task_dir) + return None + def _record_session_outcome( self, task: "WorkflowTask", diff --git a/new_ui/frontend/src/components/streaming/WorkflowErrorDetails.tsx b/new_ui/frontend/src/components/streaming/WorkflowErrorDetails.tsx new file mode 100644 index 00000000..87e82d5b --- /dev/null +++ b/new_ui/frontend/src/components/streaming/WorkflowErrorDetails.tsx @@ -0,0 +1,95 @@ +import type { WorkflowErrorDetails as WorkflowErrorDetailsType } from "../../types/api"; + +interface WorkflowErrorDetailsProps { + error: string; + details?: WorkflowErrorDetailsType | null; +} + +function labelForCategory(category?: string): string { + const labels: Record = { + provider_timeout: "Provider timeout", + timeout: "Timeout", + rate_limit: "Rate limit", + quota_or_billing: "Quota or billing", + llm_provider: "LLM provider", + document_preprocessing: "Document preprocessing", + workflow: "Workflow", + }; + return category ? labels[category] || category : "Workflow"; +} + +export function WorkflowErrorDetails({ + error, + details, +}: WorkflowErrorDetailsProps) { + const logEntries = details?.log_paths + ? Object.entries(details.log_paths) + : []; + + return ( +
+

{error}

+ + {details && ( +
+
+
+
Category
+
{labelForCategory(details.category)}
+
+ {details.stage && ( +
+
Failed stage
+
{details.stage}
+
+ )} + {typeof details.progress === "number" && ( +
+
Progress
+
{details.progress}%
+
+ )} + {details.error_type && ( +
+
Error type
+
{details.error_type}
+
+ )} + {details.task_short_id && ( +
+
Task
+
{details.task_short_id}
+
+ )} + {details.log_stream_url && ( +
+
Log stream
+
{details.log_stream_url}
+
+ )} +
+ + {details.hint && ( +
+
What to check
+
{details.hint}
+
+ )} + + {logEntries.length > 0 && ( +
+
Task logs
+
+ {logEntries.map(([channel, path]) => ( +
+ {channel}: {path} +
+ ))} +
+
+ )} +
+ )} +
+ ); +} diff --git a/new_ui/frontend/src/components/streaming/index.ts b/new_ui/frontend/src/components/streaming/index.ts index dc158c24..02242266 100644 --- a/new_ui/frontend/src/components/streaming/index.ts +++ b/new_ui/frontend/src/components/streaming/index.ts @@ -2,3 +2,4 @@ export { default as CodeStreamViewer } from './CodeStreamViewer'; export { default as ProgressTracker } from './ProgressTracker'; export { default as LogViewer } from './LogViewer'; export { default as ActivityLogViewer } from './ActivityLogViewer'; +export { WorkflowErrorDetails } from './WorkflowErrorDetails'; diff --git a/new_ui/frontend/src/hooks/useStreaming.ts b/new_ui/frontend/src/hooks/useStreaming.ts index 5c330b48..728c39ab 100644 --- a/new_ui/frontend/src/hooks/useStreaming.ts +++ b/new_ui/frontend/src/hooks/useStreaming.ts @@ -95,6 +95,14 @@ export function useStreaming(taskId: string | null) { break; case 'complete': + if (message.status === 'error') { + const fallbackError = 'Workflow failed; see task logs for details'; + setStatus('error'); + setError(fallbackError); + clearInteraction(); + addActivityLog(`Error: ${fallbackError}`, 0, 'error'); + break; + } console.log('[useStreaming] Workflow complete!'); console.log('[useStreaming] Result:', JSON.stringify(message.result, null, 2)); setStatus( @@ -130,7 +138,7 @@ export function useStreaming(taskId: string | null) { } else { // Real error - mark as error state setStatus('error'); // This will make isFinished = true - setError(message.error); + setError(message.error, message.error_details ?? null); clearInteraction(); // Clear any pending interaction addActivityLog(`❌ Error: ${message.error}`, 0, 'error'); } diff --git a/new_ui/frontend/src/hooks/useTaskRecovery.ts b/new_ui/frontend/src/hooks/useTaskRecovery.ts index e59d4d78..5b2b0a63 100644 --- a/new_ui/frontend/src/hooks/useTaskRecovery.ts +++ b/new_ui/frontend/src/hooks/useTaskRecovery.ts @@ -165,7 +165,7 @@ export function useTaskRecovery() { console.log('[TaskRecovery] Task errored, syncing error state...'); setStatus('error'); - setError(taskStatus.error || 'Unknown error'); + setError(taskStatus.error || 'Unknown error', taskStatus.error_details ?? null); setNeedsRecovery(false); setRecoveryState({ diff --git a/new_ui/frontend/src/pages/ChatPlanningPage.tsx b/new_ui/frontend/src/pages/ChatPlanningPage.tsx index 089465f5..74c8d238 100644 --- a/new_ui/frontend/src/pages/ChatPlanningPage.tsx +++ b/new_ui/frontend/src/pages/ChatPlanningPage.tsx @@ -2,7 +2,7 @@ import { useState, useEffect, useRef } from 'react'; import { motion, AnimatePresence } from 'framer-motion'; import { Card } from '../components/common'; import { ChatInput } from '../components/input'; -import { ProgressTracker, ActivityLogViewer } from '../components/streaming'; +import { ProgressTracker, ActivityLogViewer, WorkflowErrorDetails } from '../components/streaming'; import { FileTree } from '../components/results'; import { InlineChatInteraction } from '../components/interaction'; import { useWorkflowStore } from '../stores/workflowStore'; @@ -37,6 +37,7 @@ export default function ChatPlanningPage() { isWaitingForInput, result, error, + errorDetails, setActiveTask, setSteps, setStatus, @@ -407,9 +408,9 @@ export default function ChatPlanningPage() {

Generation Failed

-

- {error} -

+
+ +
diff --git a/new_ui/frontend/src/pages/PaperToCodePage.tsx b/new_ui/frontend/src/pages/PaperToCodePage.tsx index c8271bb7..dfadc63f 100644 --- a/new_ui/frontend/src/pages/PaperToCodePage.tsx +++ b/new_ui/frontend/src/pages/PaperToCodePage.tsx @@ -2,7 +2,7 @@ import { useState, useEffect } from 'react'; import { motion, AnimatePresence } from 'framer-motion'; import { Card, Button } from '../components/common'; import { FileUploader, UrlInput } from '../components/input'; -import { ProgressTracker, ActivityLogViewer } from '../components/streaming'; +import { ProgressTracker, ActivityLogViewer, WorkflowErrorDetails } from '../components/streaming'; import { FileTree } from '../components/results'; import { InteractionPanel } from '../components/interaction'; import { useWorkflowStore } from '../stores/workflowStore'; @@ -35,6 +35,7 @@ export default function PaperToCodePage() { isWaitingForInput, result, error, + errorDetails, setActiveTask, setSteps, setStatus, @@ -348,9 +349,9 @@ export default function PaperToCodePage() {

Processing Failed

-

- {error} -

+
+ +
diff --git a/new_ui/frontend/src/stores/workflowStore.ts b/new_ui/frontend/src/stores/workflowStore.ts index dfdac90f..805c15c6 100644 --- a/new_ui/frontend/src/stores/workflowStore.ts +++ b/new_ui/frontend/src/stores/workflowStore.ts @@ -4,6 +4,7 @@ import type { WorkflowStatus, WorkflowStep, } from '../types/workflow'; +import type { WorkflowErrorDetails } from '../types/api'; // Activity log entry type interface ActivityLogEntry { @@ -63,6 +64,7 @@ interface WorkflowState { // Results result: Record | null; error: string | null; + errorDetails: WorkflowErrorDetails | null; // Recovery needsRecovery: boolean; // Flag to indicate if we need to recover a task @@ -80,7 +82,7 @@ interface WorkflowState { setPendingInteraction: (interaction: PendingInteraction | null) => void; clearInteraction: () => void; setResult: (result: Record | null) => void; - setError: (error: string | null) => void; + setError: (error: string | null, details?: WorkflowErrorDetails | null) => void; setNeedsRecovery: (needs: boolean) => void; reset: () => void; } @@ -101,6 +103,7 @@ const initialState = { isWaitingForInput: false, result: null, error: null, + errorDetails: null, needsRecovery: false, }; @@ -212,7 +215,7 @@ export const useWorkflowStore = create()( set({ result }); }, - setError: (error) => set({ error, status: error ? 'error' : get().status }), + setError: (error, details = null) => set({ error, errorDetails: details, status: error ? 'error' : get().status }), setNeedsRecovery: (needs) => set({ needsRecovery: needs }), diff --git a/new_ui/frontend/src/types/api.ts b/new_ui/frontend/src/types/api.ts index 881f304a..a050718a 100644 --- a/new_ui/frontend/src/types/api.ts +++ b/new_ui/frontend/src/types/api.ts @@ -18,10 +18,30 @@ export interface WorkflowStatusResponse { message: string; result?: Record; error?: string; + error_details?: WorkflowErrorDetails; started_at?: string; completed_at?: string; } +export interface WorkflowErrorDetails { + message: string; + category?: string; + error_type?: string; + stage?: string; + progress?: number; + task_id?: string; + task_short_id?: string; + task_kind?: string; + session_id?: string | null; + task_dir?: string; + log_stream_url?: string; + log_paths?: Record; + hint?: string; + result_status?: string; + implementation_status?: string; + implementation_message?: string; +} + export interface QuestionsResponse { questions: Question[]; status: string; @@ -160,6 +180,7 @@ export interface WSErrorMessage { type: 'error'; task_id: string; error: string; + error_details?: WorkflowErrorDetails; timestamp: string; } diff --git a/tests/ui_session_resume_test.py b/tests/ui_session_resume_test.py index f8687556..40a07a53 100644 --- a/tests/ui_session_resume_test.py +++ b/tests/ui_session_resume_test.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import sys from pathlib import Path @@ -62,3 +63,85 @@ def test_hydrate_marks_running_session_tasks_interrupted(tmp_path, monkeypatch): assert reloaded is not None assert reloaded.tasks[0].status == "interrupted" assert reloaded.tasks[0].metadata["interrupted"] is True + + +def test_workflow_error_details_classify_timeout_and_include_logs(tmp_path, monkeypatch): + store = SessionStore(root=tmp_path / "sessions") + session = store.create_session(title="debug errors") + task_dir = tmp_path / "deepcode_lab" / "tasks" / "paper_deadbeef" + store.attach_task( + session.session_id, + "deadbeef", + task_kind="paper", + task_dir=str(task_dir), + status="running", + ) + monkeypatch.setattr(workflow_service_module, "session_store", store) + + service = WorkflowService() + task = service.create_task(session_id=session.session_id, task_kind="paper") + task.task_short_id = "deadbeef" + details = service._build_error_details( + task, + RuntimeError( + "Workflow execution failed:

504 Gateway Time-out

api_key=sk-secret123456" + ), + stage="Planning", + progress=65, + ) + + assert details["category"] == "provider_timeout" + assert details["stage"] == "Planning" + assert details["progress"] == 65 + assert details["task_short_id"] == "deadbeef" + assert details["task_dir"] == str(task_dir) + assert details["log_stream_url"] == "/ws/tasks/deadbeef/logs" + assert Path(details["log_paths"]["llm"]).name == "llm.jsonl" + assert "sk-secret" not in details["message"] + assert "provider" in details["hint"].lower() + + +def test_workflow_error_payload_preserves_legacy_error_string(): + service = WorkflowService() + task = service.create_task(session_id="sess-1", task_kind="chat") + task.task_short_id = "cafebabe" + + service._mark_task_error(task, ValueError("LLM provider returned 429 rate limit")) + payload = service._error_payload(task) + + assert task.status == "error" + assert payload["type"] == "error" + assert payload["error"] == "LLM provider returned 429 rate limit" + assert payload["error_details"]["category"] == "rate_limit" + + +def test_pipeline_error_result_broadcasts_error_message(monkeypatch): + service = WorkflowService() + task = service.create_task(session_id="sess-1", task_kind="paper") + messages = [] + + async def fake_broadcast(task_id, message): + messages.append((task_id, message)) + + monkeypatch.setattr(service, "_broadcast", fake_broadcast) + + result = asyncio.run( + service._finish_task_with_pipeline_result( + task.task_id, + task, + { + "status": "error", + "summary": "Pipeline failed after planning", + "implementation": { + "status": "error", + "message": "File tree structure not found", + }, + }, + ) + ) + + assert result["status"] == "error" + assert task.status == "error" + assert task.error == "File tree structure not found" + assert messages[0][1]["type"] == "error" + assert messages[0][1]["error_details"]["result_status"] == "error"