Skip to content

Commit 6c2bc67

Browse files
committed
feat(mcp): implement experimental tasks API for streaming workflow status to chat
Signed-off-by: betterclever <paliwal.pranjal83@gmail.com>
1 parent 557918a commit 6c2bc67

2 files changed

Lines changed: 276 additions & 58 deletions

File tree

backend/src/studio-mcp/__tests__/studio-mcp.service.spec.ts

Lines changed: 146 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
44
import type { AuthContext } from '../../auth/types';
55
import type { WorkflowsService } from '../../workflows/workflows.service';
66

7-
// Helper to access private _registeredTools on McpServer (plain object at runtime)
8-
type ToolHandler = (...args: unknown[]) => unknown;
9-
type RegisteredToolsMap = Record<string, { handler: ToolHandler }>;
7+
// Helper to access private _registeredTools and experimental tasks on McpServer (plain object at runtime)
8+
type RegisteredToolsMap = Record<string, any>;
9+
1010
function getRegisteredTools(server: McpServer): RegisteredToolsMap {
1111
return (server as unknown as { _registeredTools: RegisteredToolsMap })._registeredTools;
1212
}
@@ -60,12 +60,11 @@ describe('StudioMcpService Unit Tests', () => {
6060
expect(server).toBeInstanceOf(McpServer);
6161
});
6262

63-
it('registers all 9 expected tools', () => {
63+
it('registers all expected tools and tasks', () => {
6464
const server = service.createServer(mockAuthContext);
6565
const registeredTools = getRegisteredTools(server);
6666

6767
expect(registeredTools).toBeDefined();
68-
expect(Object.keys(registeredTools).length).toBe(9);
6968

7069
const toolNames = Object.keys(registeredTools).sort();
7170
expect(toolNames).toEqual([
@@ -110,16 +109,27 @@ describe('StudioMcpService Unit Tests', () => {
110109
expect(workflowsService.findById).toHaveBeenCalledWith(workflowId, mockAuthContext);
111110
});
112111

113-
it('run_workflow tool uses auth context passed at creation time', async () => {
112+
it('run_workflow task uses auth context passed at creation time', async () => {
114113
const workflowId = '11111111-1111-4111-8111-111111111111';
115114
const inputs = { key: 'value' };
116115

117116
const server = service.createServer(mockAuthContext);
118117
const registeredTools = getRegisteredTools(server);
119-
const runWorkflowTool = registeredTools['run_workflow'];
118+
const runWorkflowTask = registeredTools['run_workflow'];
119+
120+
expect(runWorkflowTask).toBeDefined();
121+
122+
// Need to mock the extra params for the experimental tasks
123+
const mockExtra = {
124+
taskStore: {
125+
createTask: jest.fn().mockResolvedValue({ taskId: 'mockTaskId', status: 'working' }),
126+
getTask: jest.fn().mockResolvedValue({ taskId: 'mockTaskId', status: 'working' }),
127+
updateTaskStatus: jest.fn().mockResolvedValue(true),
128+
storeTaskResult: jest.fn().mockResolvedValue(true),
129+
},
130+
};
120131

121-
expect(runWorkflowTool).toBeDefined();
122-
await runWorkflowTool.handler({ workflowId, inputs });
132+
await runWorkflowTask.handler.createTask({ workflowId, inputs }, mockExtra);
123133

124134
expect(workflowsService.run).toHaveBeenCalledWith(
125135
workflowId,
@@ -129,7 +139,7 @@ describe('StudioMcpService Unit Tests', () => {
129139
trigger: {
130140
type: 'api',
131141
sourceId: mockAuthContext.userId,
132-
label: 'Studio MCP',
142+
label: 'Studio MCP Task',
133143
},
134144
},
135145
);
@@ -230,12 +240,21 @@ describe('StudioMcpService Unit Tests', () => {
230240

231241
it('denies run_workflow when workflows.run is false', async () => {
232242
const server = service.createServer(restrictedAuth);
233-
const tools = getRegisteredTools(server);
234-
const result = (await tools['run_workflow'].handler({
235-
workflowId: '11111111-1111-4111-8111-111111111111',
236-
})) as { isError?: boolean; content: { text: string }[] };
237-
expect(result.isError).toBe(true);
238-
expect(result.content[0].text).toContain('workflows.run');
243+
const tasks = getRegisteredTools(server);
244+
245+
let errorThrown = false;
246+
try {
247+
await tasks['run_workflow'].handler.createTask(
248+
{
249+
workflowId: '11111111-1111-4111-8111-111111111111',
250+
},
251+
{} as any,
252+
);
253+
} catch (_e: any) {
254+
errorThrown = true;
255+
expect(_e.message).toContain('workflows.run');
256+
}
257+
expect(errorThrown).toBe(true);
239258
});
240259

241260
it('denies cancel_run when runs.cancel is false', async () => {
@@ -260,15 +279,28 @@ describe('StudioMcpService Unit Tests', () => {
260279
it('allows all tools when no apiKeyPermissions (non-API-key auth)', async () => {
261280
const server = service.createServer(mockAuthContext); // no apiKeyPermissions
262281
const tools = getRegisteredTools(server);
282+
const tasks = getRegisteredTools(server);
263283

264284
// All workflow/run tools should work without permission errors
265285
const listResult = (await tools['list_workflows'].handler({})) as { isError?: boolean };
266286
expect(listResult.isError).toBeUndefined();
267287

268-
const runResult = (await tools['run_workflow'].handler({
269-
workflowId: '11111111-1111-4111-8111-111111111111',
270-
})) as { isError?: boolean };
271-
expect(runResult.isError).toBeUndefined();
288+
const mockExtra = {
289+
taskStore: {
290+
createTask: jest.fn().mockResolvedValue({ taskId: 'mock', status: 'working' }),
291+
getTask: jest.fn().mockResolvedValue({ taskId: 'mock', status: 'working' }),
292+
updateTaskStatus: jest.fn().mockResolvedValue(true),
293+
storeTaskResult: jest.fn().mockResolvedValue(true),
294+
},
295+
};
296+
297+
const runResult = await tasks['run_workflow'].handler.createTask(
298+
{
299+
workflowId: '11111111-1111-4111-8111-111111111111',
300+
},
301+
mockExtra,
302+
);
303+
expect(runResult.task.taskId).toEqual('mock');
272304

273305
const cancelResult = (await tools['cancel_run'].handler({
274306
runId: 'test-run-id',
@@ -308,11 +340,11 @@ describe('StudioMcpService Unit Tests', () => {
308340
};
309341
const server = service.createServer(noPermsAuth);
310342
const tools = getRegisteredTools(server);
343+
const tasks = getRegisteredTools(server);
311344

312345
const gatedTools = [
313346
'list_workflows',
314347
'get_workflow',
315-
'run_workflow',
316348
'list_runs',
317349
'get_run_status',
318350
'get_run_result',
@@ -326,6 +358,20 @@ describe('StudioMcpService Unit Tests', () => {
326358
})) as { isError?: boolean };
327359
expect(result.isError).toBe(true);
328360
}
361+
362+
// Test run_workflow separately since it's a task now
363+
let errorThrown = false;
364+
try {
365+
await tasks['run_workflow'].handler.createTask(
366+
{
367+
workflowId: '11111111-1111-4111-8111-111111111111',
368+
},
369+
{} as any,
370+
);
371+
} catch (_e: any) {
372+
errorThrown = true;
373+
}
374+
expect(errorThrown).toBe(true);
329375
});
330376
});
331377

@@ -366,4 +412,83 @@ describe('StudioMcpService Unit Tests', () => {
366412
expect(workflowsService.list).toHaveBeenNthCalledWith(2, authContext2);
367413
});
368414
});
415+
416+
describe('monitorWorkflowRun', () => {
417+
it('polls status and saves result on completion', async () => {
418+
const mockTaskStore = {
419+
updateTaskStatus: jest.fn().mockResolvedValue(true),
420+
storeTaskResult: jest.fn().mockResolvedValue(true),
421+
};
422+
423+
const mockServer = {} as McpServer;
424+
const taskId = 'test-task-id';
425+
const runId = 'test-run-id';
426+
427+
// Mock getRunStatus to return RUNNING first, then COMPLETED
428+
let callCount = 0;
429+
(workflowsService.getRunStatus as jest.Mock).mockImplementation(() => {
430+
callCount++;
431+
return Promise.resolve({
432+
status: callCount === 1 ? 'RUNNING' : 'COMPLETED',
433+
});
434+
});
435+
436+
(workflowsService.getRunResult as jest.Mock).mockResolvedValue({
437+
output: 'test-output',
438+
});
439+
440+
// We overwrite the 2000ms timeout temporarily for the test to avoid slow running loop
441+
const originalSetTimeout = global.setTimeout;
442+
(global as any).setTimeout = (fn: any) => originalSetTimeout(fn, 1);
443+
444+
try {
445+
await (service as any).monitorWorkflowRun(
446+
runId,
447+
undefined,
448+
taskId,
449+
mockTaskStore,
450+
mockServer,
451+
mockAuthContext,
452+
);
453+
} finally {
454+
global.setTimeout = originalSetTimeout as any;
455+
}
456+
457+
expect(mockTaskStore.updateTaskStatus).toHaveBeenCalledWith(taskId, 'working', 'RUNNING');
458+
expect(mockTaskStore.updateTaskStatus).toHaveBeenCalledWith(taskId, 'completed', 'COMPLETED');
459+
expect(workflowsService.getRunResult).toHaveBeenCalledWith(runId, undefined, mockAuthContext);
460+
expect(mockTaskStore.storeTaskResult).toHaveBeenCalledWith(taskId, 'completed', {
461+
content: [{ type: 'text', text: JSON.stringify({ output: 'test-output' }, null, 2) }],
462+
});
463+
});
464+
465+
it('handles failures by storing the failure reason', async () => {
466+
const mockTaskStore = {
467+
updateTaskStatus: jest.fn().mockResolvedValue(true),
468+
storeTaskResult: jest.fn().mockResolvedValue(true),
469+
};
470+
471+
const taskId = 'test-task-id';
472+
const runId = 'test-run-id';
473+
474+
(workflowsService.getRunStatus as jest.Mock).mockResolvedValue({
475+
status: 'FAILED',
476+
failure: { message: 'boom' },
477+
});
478+
479+
await (service as any).monitorWorkflowRun(
480+
runId,
481+
undefined,
482+
taskId,
483+
mockTaskStore,
484+
{} as McpServer,
485+
mockAuthContext,
486+
);
487+
488+
expect(mockTaskStore.updateTaskStatus).toHaveBeenCalledWith(taskId, 'failed', 'FAILED');
489+
expect(mockTaskStore.storeTaskResult).toHaveBeenCalledWith(taskId, 'failed', {
490+
content: [{ type: 'text', text: JSON.stringify({ message: 'boom' }, null, 2) }],
491+
});
492+
});
493+
});
369494
});

0 commit comments

Comments
 (0)