Skip to content

Commit 44267f4

Browse files
committed
feat: Enhance workflow logging and progress tracking across multiple workflows
- Added custom logging for workflow step start, progress, and completion in the content studio workflow. - Implemented detailed progress updates for each step in the document processing workflow, including error handling. - Enhanced the financial report workflow with custom logging for fetching price data, including parallel progress tracking. - Improved the governed RAG index workflow with detailed progress updates during document indexing. - Updated the weather workflow to include progress tracking for fetching weather data and planning activities. - Standardized the logging format across workflows to include step IDs and status messages for better traceability.
1 parent 75a06f6 commit 44267f4

12 files changed

Lines changed: 1213 additions & 248 deletions

app/globals.css

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
@import 'tailwindcss';
22
@import 'tw-animate-css';
3-
3+
@source "../node_modules/streamdown/dist/index.js";
44
/* Tailwind CSS v4 Custom Variants */
55
@custom-variant dark (&:is(.dark *));
66
@custom-variant starting (&:is(:popover-open, :modal, [data-state="open"]):starting-style);

app/workflows/providers/workflow-context.tsx

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
useEffect,
1212
type ReactNode,
1313
} from "react"
14+
import type { AgentDataPart } from "@mastra/ai-sdk"
1415
import {
1516
getWorkflowConfig,
1617
WORKFLOW_CONFIGS,
@@ -61,6 +62,15 @@ export interface WorkflowRun {
6162
error?: string
6263
}
6364

65+
export interface WorkflowDataPart {
66+
messageId: string
67+
partIndex: number
68+
part: {
69+
type: string
70+
data?: unknown
71+
}
72+
}
73+
6474
/* eslint-disable no-unused-vars */
6575
export interface WorkflowContextValue {
6676
selectedWorkflow: WorkflowId
@@ -70,6 +80,7 @@ export interface WorkflowContextValue {
7080
activeStepIndex: number
7181
progressEvents: WorkflowProgressEvent[]
7282
suspendPayload: WorkflowSuspendPayload | null
83+
dataParts: WorkflowDataPart[]
7384
selectWorkflow: (workflowId: WorkflowId) => void
7485
runWorkflow: (inputData?: Record<string, unknown>) => void
7586
pauseWorkflow: () => void
@@ -204,6 +215,7 @@ export function WorkflowProvider({
204215
const [activeStepIndex, setActiveStepIndex] = useState(-1)
205216
const [progressEvents, setProgressEvents] = useState<WorkflowProgressEvent[]>([])
206217
const [suspendPayload, setSuspendPayload] = useState<WorkflowSuspendPayload | null>(null)
218+
const [dataParts, setDataParts] = useState<WorkflowDataPart[]>([])
207219

208220
const workflowConfig = useMemo(
209221
() => getWorkflowConfig(selectedWorkflow),
@@ -253,14 +265,24 @@ export function WorkflowProvider({
253265
}
254266
}, [status, workflowStatus])
255267

256-
// Extract progress events from custom data parts
268+
// Extract progress events and data parts from custom data parts
257269
useEffect(() => {
258270
const allProgressEvents: WorkflowProgressEvent[] = []
271+
const allDataParts: WorkflowDataPart[] = []
259272
let detectedSuspendPayload: WorkflowSuspendPayload | null = null
260273

261274
for (const message of messages) {
262275
if (message.role === "assistant" && message.parts !== null) {
263276
for (const [partIndex, part] of message.parts.entries()) {
277+
// Collect all data parts for custom component rendering
278+
if (typeof part.type === "string" && part.type.startsWith("data-")) {
279+
allDataParts.push({
280+
messageId: message.id,
281+
partIndex,
282+
part,
283+
})
284+
}
285+
264286
// Handle workflow progress events (data-workflow, data-tool-workflow)
265287
if (part.type === "data-workflow" || part.type === "data-tool-workflow") {
266288
const workflowPart = part as { data?: { text?: string; status?: string; stepId?: string } }
@@ -279,6 +301,77 @@ export function WorkflowProvider({
279301
}
280302
}
281303

304+
// Handle network aggregation events (data-network)
305+
if (part.type === "data-network") {
306+
const networkPart = part as { data?: { text?: string; status?: string; networkId?: string } }
307+
const eventData = networkPart.data
308+
309+
if (eventData && eventData.text !== null && typeof eventData.text === "string" && eventData.text.trim()) {
310+
allProgressEvents.push({
311+
id: `${message.id}-${part.type}-${partIndex}`,
312+
stage: "network",
313+
status: "in-progress",
314+
message: eventData.text,
315+
stepId: eventData.networkId,
316+
timestamp: new Date(),
317+
data: eventData,
318+
})
319+
}
320+
}
321+
322+
// Handle tool agent events (data-tool-agent)
323+
if (part.type === "data-tool-agent") {
324+
const agentPart = part as { data?: AgentDataPart }
325+
const eventData = agentPart.data
326+
327+
if (eventData) {
328+
allProgressEvents.push({
329+
id: `${message.id}-${part.type}-${partIndex}`,
330+
stage: "tool agent",
331+
status: "in-progress",
332+
message: `Agent executing tool`,
333+
timestamp: new Date(),
334+
data: eventData,
335+
})
336+
}
337+
}
338+
339+
// Handle nested workflow events (data-tool-workflow)
340+
if (part.type === "data-tool-workflow") {
341+
const nestedWorkflowPart = part as { data?: { text?: string; status?: string; workflowId?: string } }
342+
const eventData = nestedWorkflowPart.data
343+
344+
if (eventData && eventData.text !== null && typeof eventData.text === "string" && eventData.text.trim()) {
345+
allProgressEvents.push({
346+
id: `${message.id}-${part.type}-${partIndex}`,
347+
stage: "nested workflow",
348+
status: "in-progress",
349+
message: eventData.text,
350+
stepId: eventData.workflowId,
351+
timestamp: new Date(),
352+
data: eventData,
353+
})
354+
}
355+
}
356+
357+
// Handle nested network events (data-tool-network)
358+
if (part.type === "data-tool-network") {
359+
const nestedNetworkPart = part as { data?: { text?: string; status?: string; networkId?: string } }
360+
const eventData = nestedNetworkPart.data
361+
362+
if (eventData && eventData.text !== null && typeof eventData.text === "string" && eventData.text.trim()) {
363+
allProgressEvents.push({
364+
id: `${message.id}-${part.type}-${partIndex}`,
365+
stage: "nested network",
366+
status: "in-progress",
367+
message: eventData.text,
368+
stepId: eventData.networkId,
369+
timestamp: new Date(),
370+
data: eventData,
371+
})
372+
}
373+
}
374+
282375
// Handle suspend payloads (data-workflow-suspend)
283376
if (part.type === "data-workflow-suspend") {
284377
const suspendPart = part as { data?: { message?: string; requestId?: string; stepId?: string } }
@@ -313,11 +406,29 @@ export function WorkflowProvider({
313406
})
314407
}
315408
}
409+
410+
// Handle custom tool progress events
411+
if (typeof part.type === "string" && part.type.startsWith("data-tool-progress")) {
412+
const toolProgressPart = part as { type: string; data?: { status?: string; message?: string; toolName?: string } }
413+
const eventData = toolProgressPart.data
414+
415+
if (eventData && eventData.status !== null && typeof eventData.status === "string") {
416+
allProgressEvents.push({
417+
id: `${message.id}-${part.type}-${partIndex}`,
418+
stage: "tool",
419+
status: eventData.status === "success" ? "done" : eventData.status === "pending" ? "in-progress" : "error",
420+
message: eventData.message ?? `${eventData.toolName ?? 'Tool'} ${eventData.status}`,
421+
timestamp: new Date(),
422+
data: eventData,
423+
})
424+
}
425+
}
316426
}
317427
}
318428
}
319429

320430
setProgressEvents(allProgressEvents)
431+
setDataParts(allDataParts)
321432
if (detectedSuspendPayload) {
322433
setSuspendPayload(detectedSuspendPayload)
323434
}
@@ -331,6 +442,7 @@ export function WorkflowProvider({
331442
setActiveStepIndex(-1)
332443
setProgressEvents([])
333444
setSuspendPayload(null)
445+
setDataParts([])
334446
}
335447
}, [])
336448

@@ -351,6 +463,7 @@ export function WorkflowProvider({
351463
setActiveStepIndex(0)
352464
setProgressEvents([]) // Clear previous progress events
353465
setSuspendPayload(null) // Clear any previous suspend state
466+
setDataParts([]) // Clear previous data parts
354467

355468
// Send message to trigger workflow via AI SDK
356469
const inputText = inputData?.input?.toString() ?? `Run ${workflowConfig.name}`
@@ -394,6 +507,7 @@ export function WorkflowProvider({
394507
setActiveStepIndex(-1)
395508
setProgressEvents([])
396509
setSuspendPayload(null)
510+
setDataParts([])
397511
}, [stop])
398512

399513
const runStep = useCallback(
@@ -475,6 +589,7 @@ export function WorkflowProvider({
475589
activeStepIndex,
476590
progressEvents,
477591
suspendPayload,
592+
dataParts,
478593
selectWorkflow,
479594
runWorkflow,
480595
pauseWorkflow,
@@ -496,6 +611,7 @@ export function WorkflowProvider({
496611
activeStepIndex,
497612
progressEvents,
498613
suspendPayload,
614+
dataParts,
499615
selectWorkflow,
500616
runWorkflow,
501617
pauseWorkflow,

src/mastra/config/upstash.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ export const upstashVector = new UpstashVector({
1818
token: process.env.UPSTASH_VECTOR_REST_TOKEN ?? 'your-vector-token'
1919
});
2020

21-
await upstashVector.createIndex({
22-
indexName: "vector_messages",
23-
dimension: 1536,
24-
metric: 'cosine'
25-
});
21+
export async function initializeUpstashVector() {
22+
await upstashVector.createIndex({
23+
indexName: "vector_messages",
24+
dimension: 1536,
25+
metric: 'cosine'
26+
});
27+
}
2628

2729
/**
2830
* Shared Mastra agent memory instance using Upstash for distributed storage and [Pinecone] for vector search.
@@ -154,7 +156,7 @@ export const upstashQueryTool = createVectorQueryTool({
154156
'PostgreSQL vector similarity search using PgVector for semantic content retrieval and question answering.',
155157
// Supported vector store and index options
156158
vectorStoreName: 'vector_messages',
157-
indexName: 'messages',
159+
indexName: 'vector_messages',
158160
model: google.textEmbedding('gemini-embedding-001'),
159161
// Supported database configuration for PgVector
160162
providerOptions: {

src/mastra/tools/document-chunking.tool.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
rerankWithScorer as rerank,
88
MastraAgentRelevanceScorer,
99
} from '@mastra/rag';
10+
import { ModelRouterEmbeddingModel } from "@mastra/core/llm";
1011
import { embed, embedMany } from 'ai';
1112
import { z } from 'zod';
1213
import {
@@ -777,6 +778,7 @@ Use this tool to improve retrieval quality by re-ranking initial search results.
777778
processingTimeMs: processingTime,
778779
}
779780
}
781+
// Create a relevance scorer
780782

781783
// Step 3: Re-rank results using semantic relevance scorer
782784
await context?.writer?.custom({ type: 'data-tool-progress', data: { message: `⚖️ Reranking ${initialResults.length} documents` } });
@@ -789,7 +791,7 @@ Use this tool to improve retrieval quality by re-ranking initial search results.
789791
score: result.score || 0,
790792
})),
791793
query: inputData.userQuery,
792-
scorer: new MastraAgentRelevanceScorer('relevance-scorer', google('gemini-1.5-flash')),
794+
scorer: new MastraAgentRelevanceScorer('relevance-scorer', google('gemini-2.5-flash')),
793795
options: {
794796
weights: {
795797
semantic: inputData.semanticWeight,

0 commit comments

Comments
 (0)