From 03ea68ce71a21835aaadf345e413ae87a87a64af Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 19 Jan 2026 22:28:58 +0000 Subject: [PATCH 1/3] feat: add mlflow tracing and eval logging Co-authored-by: m --- Makefile | 4 + examples/python/a2a_server.py | 80 ++++++----------- examples/python/compose.yml | 3 + examples/python/test_client.py | 87 ++++++++++++++++-- examples/typescript/a2a_server.ts | 86 ++++++++++++++++-- examples/typescript/compose.yml | 2 + examples/typescript/mlflow.ts | 140 +++++++++++++++++++++++++++++ examples/typescript/test_client.ts | 83 ++++++++++++++++- 8 files changed, 415 insertions(+), 70 deletions(-) create mode 100644 examples/typescript/mlflow.ts diff --git a/Makefile b/Makefile index 74c6ecfc..d78e975d 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,10 @@ help: @echo "Environment variables:" @echo " TEST_MESSAGE - Custom message to send to the test client (default: 'What's the weather in Oakland?')" @echo " OPENAI_API_KEY - OpenAI API key (required)" + @echo " MLFLOW_TRACKING_URI - MLflow tracking server URI" + @echo " MLFLOW_EXPERIMENT_NAME - MLflow experiment name" + @echo " MLFLOW_TRACING_ENABLED - Enable MLflow tracing (default: true)" + @echo " MLFLOW_EVAL_ENABLED - Enable MLflow evals (default: true)" @echo "" @echo "Note: TypeScript example is pending MCP SDK v2 release (expected Q1 2026)" diff --git a/examples/python/a2a_server.py b/examples/python/a2a_server.py index 1f113ae6..91f3e2e8 100644 --- a/examples/python/a2a_server.py +++ b/examples/python/a2a_server.py @@ -1,6 +1,7 @@ # /// script # dependencies = [ # "a2a-sdk[http-server]", +# "mlflow", # "openai", # "uvicorn", # ] @@ -13,13 +14,12 @@ import os import uuid -import json -import datetime -from pathlib import Path from typing import Dict, List, Any, Optional from fastapi import FastAPI, Request, HTTPException, APIRouter from fastapi.responses import JSONResponse from openai import OpenAI +import mlflow +import mlflow.openai from a2a.server.agent_execution.agent_executor import AgentExecutor from a2a.server.agent_execution.context import RequestContext from a2a.server.events.event_queue import EventQueue @@ -32,6 +32,28 @@ # Initialize OpenAI client openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) +# MLflow tracing configuration +MLFLOW_EXPERIMENT_NAME = os.getenv("MLFLOW_EXPERIMENT_NAME", "timestep-a2a") +MLFLOW_TRACING_ENABLED = os.getenv("MLFLOW_TRACING_ENABLED", "true").lower() in {"1", "true", "yes"} +_MLFLOW_TRACING_CONFIGURED = False + + +def setup_mlflow_tracing() -> None: + """Configure MLflow tracing for OpenAI calls.""" + global _MLFLOW_TRACING_CONFIGURED + if _MLFLOW_TRACING_CONFIGURED or not MLFLOW_TRACING_ENABLED: + return + + tracking_uri = os.getenv("MLFLOW_TRACKING_URI") + if tracking_uri: + mlflow.set_tracking_uri(tracking_uri) + mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME) + mlflow.openai.autolog() + _MLFLOW_TRACING_CONFIGURED = True + + +setup_mlflow_tracing() + # Agent IDs PERSONAL_ASSISTANT_ID = "00000000-0000-0000-0000-000000000000" WEATHER_ASSISTANT_ID = "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF" @@ -148,35 +170,6 @@ def build_system_message(agent_id: str, tools: List[Dict[str, Any]]) -> str: # Track all task IDs per agent for listing agent_task_ids: Dict[str, List[str]] = {} -def write_trace(task_id: str, agent_id: str, input_messages: List[Dict], input_tools: List[Dict], output_message: Dict) -> None: - """Write trace to traces/ folder.""" - traces_dir = Path("/workspace/traces") - traces_dir.mkdir(exist_ok=True) - - timestamp = datetime.datetime.now().isoformat().replace(":", "-") - # Use short task_id for filename (first 8 chars) - task_id_short = task_id[:8] if task_id else "unknown" - agent_id_short = agent_id[:8] if agent_id else "unknown" - trace_file = traces_dir / f"{timestamp}_{task_id_short}_{agent_id_short}.json" - - trace = { - "task_id": task_id, - "agent_id": agent_id, - "timestamp": timestamp, - "input": { - "messages": input_messages, - "tools": input_tools, - }, - "output": { - "content": output_message.get("content", ""), - "tool_calls": output_message.get("tool_calls", []), - } - } - - with open(trace_file, "w") as f: - json.dump(trace, f, indent=2) - - class MultiAgentExecutor(AgentExecutor): """Agent executor that uses OpenAI directly and configures tools based on agent_id.""" @@ -291,29 +284,6 @@ async def execute( # Convert OpenAI response to A2A format assistant_content = assistant_message.content or "" - # Capture trace: input messages + output message - output_message_dict = { - "content": assistant_content, - "tool_calls": [ - { - "id": tc.id, - "type": "function", - "function": { - "name": tc.function.name, - "arguments": tc.function.arguments, - }, - } - for tc in tool_calls - ] if tool_calls else [], - } - write_trace( - task_id=task_id or "", - agent_id=self.agent_id, - input_messages=openai_messages_with_system, - input_tools=self.tools or [], - output_message=output_message_dict, - ) - # Build A2A message using helper function # Role.agent is the correct role for assistant messages in A2A a2a_message = create_text_message_object( diff --git a/examples/python/compose.yml b/examples/python/compose.yml index dab7ba62..48b0729c 100644 --- a/examples/python/compose.yml +++ b/examples/python/compose.yml @@ -10,6 +10,9 @@ services: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} + - MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI} + - MLFLOW_EXPERIMENT_NAME=${MLFLOW_EXPERIMENT_NAME} + - MLFLOW_TRACING_ENABLED=${MLFLOW_TRACING_ENABLED} - UV_CACHE_DIR=/workspace/.cache/uv develop: watch: diff --git a/examples/python/test_client.py b/examples/python/test_client.py index d8bd968c..a3253c13 100644 --- a/examples/python/test_client.py +++ b/examples/python/test_client.py @@ -1,6 +1,8 @@ # /// script # dependencies = [ # "a2a-sdk", +# "mlflow", +# "pandas", # "mcp", # "httpx", # ] @@ -19,6 +21,8 @@ import datetime from pathlib import Path from typing import Dict, Any, List, Optional +import mlflow +import pandas as pd from a2a.client import ClientFactory, ClientConfig from a2a.client.helpers import create_text_message_object from a2a.types import TransportProtocol, Role @@ -35,6 +39,24 @@ PERSONAL_ASSISTANT_ID = "00000000-0000-0000-0000-000000000000" WEATHER_ASSISTANT_ID = "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF" +# MLflow configuration +MLFLOW_EXPERIMENT_NAME = os.getenv("MLFLOW_EXPERIMENT_NAME", "timestep-evals") +MLFLOW_EVAL_ENABLED = os.getenv("MLFLOW_EVAL_ENABLED", "true").lower() in {"1", "true", "yes"} +_MLFLOW_CONFIGURED = False + + +def setup_mlflow() -> None: + """Configure MLflow tracking for evals.""" + global _MLFLOW_CONFIGURED + if _MLFLOW_CONFIGURED: + return + + tracking_uri = os.getenv("MLFLOW_TRACKING_URI") + if tracking_uri: + mlflow.set_tracking_uri(tracking_uri) + mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME) + _MLFLOW_CONFIGURED = True + def write_task(task: Any, agent_id: str) -> None: """Write task to tasks/ folder in proper A2A Task format.""" @@ -135,6 +157,52 @@ def parse_tool_call(tool_call: Dict[str, Any]) -> tuple[Optional[str], Dict[str, return tool_name, tool_args +def run_mlflow_eval(prompt: str, response: str, agent_id: str, task_id: Optional[str]) -> None: + """Run MLflow evals and log results.""" + if not MLFLOW_EVAL_ENABLED: + return + + try: + setup_mlflow() + eval_df = pd.DataFrame( + [ + { + "inputs": prompt, + "predictions": response, + "targets": "", + } + ] + ) + run_name = f"eval-{agent_id[:8]}-{task_id[:8] if task_id else 'unknown'}" + + with mlflow.start_run(run_name=run_name): + mlflow.set_tags( + { + "a2a.agent_id": agent_id, + "a2a.task_id": task_id or "", + } + ) + mlflow.log_text(prompt, "prompt.txt") + mlflow.log_text(response, "response.txt") + mlflow.log_metric("response_length", float(len(response))) + + try: + from mlflow.metrics.genai import relevance + + mlflow.evaluate( + data=eval_df, + model_type="question-answering", + targets="targets", + predictions="predictions", + extra_metrics=[relevance()], + ) + except Exception as eval_error: + mlflow.log_param("eval_error", str(eval_error)) + print(f"[MLflow eval skipped: {eval_error}]", file=sys.stderr) + except Exception as e: + print(f"[MLflow eval setup failed: {e}]", file=sys.stderr) + + async def mcp_sampling_callback( context: RequestContext["ClientSession", Any], params: mcp_types.CreateMessageRequestParams, @@ -303,8 +371,9 @@ async def run_client_loop( message = create_text_message_object(role="user", content=initial_message) print(f"\n[DEBUG: Starting to send message to A2A server]", file=sys.stderr) - async def process_with_output(a2a_client: Any, message_obj: Any, agent_id: str) -> None: - """Process message stream and print output.""" + async def process_with_output(a2a_client: Any, message_obj: Any, agent_id: str) -> str: + """Process message stream, print output, and return final response.""" + final_message = "" async for event in a2a_client.send_message(message_obj): task = extract_task_from_event(event) print(f"\n[DEBUG: Received task, id={getattr(task, 'id', 'NO_ID')}, type={type(task)}]", file=sys.stderr) @@ -326,6 +395,7 @@ async def process_with_output(a2a_client: Any, message_obj: Any, agent_id: str) if task.status.state.value == "completed": print("\n[Task completed]") + final_message = extract_final_message(task) break if task.status.state.value == "input-required": @@ -349,10 +419,17 @@ async def process_with_output(a2a_client: Any, message_obj: Any, agent_id: str) tool_result_msg.context_id = task.context_id # Recursively process tool result - await process_with_output(a2a_client, tool_result_msg, agent_id) + result_message = await process_with_output(a2a_client, tool_result_msg, agent_id) + if result_message: + final_message = result_message break - - await process_with_output(a2a_client, message, agent_id) + + return final_message.strip() + + final_message = await process_with_output(a2a_client, message, agent_id) + if final_message: + task_id = task_ids[-1] if task_ids else None + run_mlflow_eval(initial_message, final_message, agent_id, task_id) except Exception as e: print(f"\n[Error in client loop: {e}]") raise diff --git a/examples/typescript/a2a_server.ts b/examples/typescript/a2a_server.ts index 54885922..8e393a79 100644 --- a/examples/typescript/a2a_server.ts +++ b/examples/typescript/a2a_server.ts @@ -26,6 +26,7 @@ import { } from '@a2a-js/sdk/server'; import { agentCardHandler, jsonRpcHandler, UserBuilder } from '@a2a-js/sdk/server/express'; import { Message, Task } from '@a2a-js/sdk'; +import { getMlflowClient } from './mlflow'; // Initialize OpenAI client const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); @@ -91,6 +92,8 @@ const AGENT_DESCRIPTIONS: Record = { [WEATHER_ASSISTANT_ID]: 'Weather Assistant', }; +const MLFLOW_TRACING_ENABLED = ['1', 'true', 'yes'].includes((process.env.MLFLOW_TRACING_ENABLED || 'true').toLowerCase()); + function buildSystemMessage(agentId: string, tools: any[]): string { /**Build system message explaining who the agent is and what tools are available.*/ const agentName = AGENT_DESCRIPTIONS[agentId] || 'Assistant'; @@ -145,16 +148,77 @@ const taskMessages: Record = {}; // Track all task IDs per agent for listing const agentTaskIds: Record = {}; -function writeTrace( +function truncateText(value: string, maxLength: number = 500): string { + if (value.length <= maxLength) { + return value; + } + return `${value.slice(0, maxLength)}...`; +} + +async function writeTrace( taskId: string, agentId: string, + model: string, inputMessages: any[], inputTools: any[], - outputMessage: any -): void { - /**Write trace to traces/ folder.*/ - // Implementation would write to traces/ folder - // Similar to Python version + outputMessage: any, + usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number } +): Promise { + /**Write trace to MLflow if configured.""" + if (!MLFLOW_TRACING_ENABLED) { + return; + } + try { + const client = await getMlflowClient(); + if (!client) { + return; + } + + const runName = `trace-${agentId.substring(0, 8)}-${taskId ? taskId.substring(0, 8) : 'unknown'}`; + const runId = await client.createRun(runName, { + 'a2a.agent_id': agentId, + 'a2a.task_id': taskId || '', + }); + + const userInput = inputMessages + .filter((msg) => msg.role === 'user') + .map((msg) => msg.content || '') + .join('\n'); + const outputContent = outputMessage?.content || ''; + const toolNames = inputTools + .map((tool) => tool?.function?.name) + .filter((name) => typeof name === 'string') + .join(','); + + const metrics: Record = { + input_messages: inputMessages.length, + tool_count: inputTools.length, + tool_calls: Array.isArray(outputMessage?.tool_calls) ? outputMessage.tool_calls.length : 0, + response_length: outputContent.length, + }; + + if (usage?.prompt_tokens !== undefined) { + metrics.prompt_tokens = usage.prompt_tokens; + } + if (usage?.completion_tokens !== undefined) { + metrics.completion_tokens = usage.completion_tokens; + } + if (usage?.total_tokens !== undefined) { + metrics.total_tokens = usage.total_tokens; + } + + await client.logBatch(runId, { + params: { + model: model, + tool_names: toolNames, + input_preview: truncateText(userInput), + output_preview: truncateText(outputContent), + }, + metrics, + }); + } catch (error) { + console.error('MLflow trace logging failed:', error); + } } class MultiAgentExecutor implements AgentExecutor { @@ -273,7 +337,15 @@ class MultiAgentExecutor implements AgentExecutor { })), }; - writeTrace(taskId || '', this.agent_id, openai_messages_with_system, this.tools || [], output_message_dict); + await writeTrace( + taskId || '', + this.agent_id, + this.model, + openai_messages_with_system, + this.tools || [], + output_message_dict, + response.usage + ); // Build A2A message const agentMessage: Message = { diff --git a/examples/typescript/compose.yml b/examples/typescript/compose.yml index a9223ff3..8f051741 100644 --- a/examples/typescript/compose.yml +++ b/examples/typescript/compose.yml @@ -10,6 +10,8 @@ services: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} + - MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI} + - MLFLOW_EXPERIMENT_NAME=${MLFLOW_EXPERIMENT_NAME} develop: watch: - action: restart diff --git a/examples/typescript/mlflow.ts b/examples/typescript/mlflow.ts new file mode 100644 index 00000000..fac5e361 --- /dev/null +++ b/examples/typescript/mlflow.ts @@ -0,0 +1,140 @@ +type MlflowConfig = { + trackingUri: string; + experimentName: string; +}; + +type MlflowTag = { key: string; value: string }; +type MlflowParam = { key: string; value: string }; +type MlflowMetric = { key: string; value: number; timestamp: number; step: number }; + +type LogBatchPayload = { + run_id: string; + metrics?: MlflowMetric[]; + params?: MlflowParam[]; + tags?: MlflowTag[]; +}; + +type CreateRunResponse = { + run: { + info: { + run_id: string; + }; + }; +}; + +type GetExperimentResponse = { + experiment: { + experiment_id: string; + }; +}; + +type CreateExperimentResponse = { + experiment_id: string; +}; + +function getMlflowConfig(): MlflowConfig | null { + const trackingUri = process.env.MLFLOW_TRACKING_URI; + if (!trackingUri || !(trackingUri.startsWith('http://') || trackingUri.startsWith('https://'))) { + return null; + } + return { + trackingUri, + experimentName: process.env.MLFLOW_EXPERIMENT_NAME || 'timestep-evals', + }; +} + +async function postJson(url: string, body: unknown): Promise { + const response = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`MLflow request failed (${response.status}): ${text}`); + } + return (await response.json()) as T; +} + +async function getOrCreateExperimentId(trackingUri: string, experimentName: string): Promise { + try { + const data = await postJson(`${trackingUri}/api/2.0/mlflow/experiments/get-by-name`, { + experiment_name: experimentName, + }); + return data.experiment.experiment_id; + } catch (error) { + const created = await postJson(`${trackingUri}/api/2.0/mlflow/experiments/create`, { + name: experimentName, + }); + return created.experiment_id; + } +} + +export class MlflowClient { + private trackingUri: string; + private experimentId: string; + + private constructor(trackingUri: string, experimentId: string) { + this.trackingUri = trackingUri; + this.experimentId = experimentId; + } + + static async create(): Promise { + const config = getMlflowConfig(); + if (!config) { + return null; + } + const experimentId = await getOrCreateExperimentId(config.trackingUri, config.experimentName); + return new MlflowClient(config.trackingUri, experimentId); + } + + async createRun(runName: string, tags: Record): Promise { + const tagList: MlflowTag[] = [ + { key: 'mlflow.runName', value: runName }, + ...Object.entries(tags).map(([key, value]) => ({ key, value })), + ]; + const data = await postJson(`${this.trackingUri}/api/2.0/mlflow/runs/create`, { + experiment_id: this.experimentId, + tags: tagList, + }); + return data.run.info.run_id; + } + + async logBatch(runId: string, payload: { params?: Record; metrics?: Record; tags?: Record }): Promise { + const timestamp = Date.now(); + const params = payload.params + ? Object.entries(payload.params).map(([key, value]) => ({ key, value: String(value) })) + : undefined; + const metrics = payload.metrics + ? Object.entries(payload.metrics).map(([key, value]) => ({ key, value: Number(value), timestamp, step: 0 })) + : undefined; + const tags = payload.tags + ? Object.entries(payload.tags).map(([key, value]) => ({ key, value })) + : undefined; + + const body: LogBatchPayload = { + run_id: runId, + ...(params ? { params } : {}), + ...(metrics ? { metrics } : {}), + ...(tags ? { tags } : {}), + }; + + if (!params && !metrics && !tags) { + return; + } + await postJson(`${this.trackingUri}/api/2.0/mlflow/runs/log-batch`, body); + } +} + +let cachedClient: Promise | null = null; + +export async function getMlflowClient(): Promise { + if (!cachedClient) { + cachedClient = MlflowClient.create().catch((error) => { + console.error('MLflow client init failed:', error); + return null; + }); + } + return cachedClient; +} diff --git a/examples/typescript/test_client.ts b/examples/typescript/test_client.ts index e0bbcd41..0f33b1bf 100644 --- a/examples/typescript/test_client.ts +++ b/examples/typescript/test_client.ts @@ -27,6 +27,7 @@ import { } from '@modelcontextprotocol/sdk/client'; import { writeFileSync, mkdirSync } from 'fs'; import { join } from 'path'; +import { getMlflowClient } from './mlflow'; // Server URLs const A2A_BASE_URL = process.env.A2A_URL || 'http://localhost:8000'; @@ -36,6 +37,8 @@ const MCP_URL = process.env.MCP_URL || 'http://localhost:8080/mcp'; const PERSONAL_ASSISTANT_ID = '00000000-0000-0000-0000-000000000000'; const WEATHER_ASSISTANT_ID = 'FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF'; +const MLFLOW_EVAL_ENABLED = ['1', 'true', 'yes'].includes((process.env.MLFLOW_EVAL_ENABLED || 'true').toLowerCase()); + function writeTask(task: any, agent_id: string): void { /**Write task to tasks/ folder in proper A2A Task format.*/ const tasks_dir = 'tasks'; @@ -146,6 +149,67 @@ function parseToolCall(tool_call: Record): [string | null, Record + text + .toLowerCase() + .split(/\W+/) + .map((token) => token.trim()) + .filter((token) => token.length > 0); + + const promptTokens = new Set(tokenize(prompt)); + if (promptTokens.size === 0) { + return 0; + } + const responseTokens = new Set(tokenize(response)); + let overlap = 0; + for (const token of promptTokens) { + if (responseTokens.has(token)) { + overlap += 1; + } + } + return overlap / promptTokens.size; +} + +async function runMlflowEval(prompt: string, response: string, agentId: string, taskId?: string): Promise { + if (!MLFLOW_EVAL_ENABLED) { + return; + } + const client = await getMlflowClient(); + if (!client) { + return; + } + + try { + const runName = `eval-${agentId.substring(0, 8)}-${taskId ? taskId.substring(0, 8) : 'unknown'}`; + const runId = await client.createRun(runName, { + 'a2a.agent_id': agentId, + 'a2a.task_id': taskId || '', + }); + + await client.logBatch(runId, { + params: { + prompt_preview: truncateText(prompt), + response_preview: truncateText(response), + }, + metrics: { + prompt_length: prompt.length, + response_length: response.length, + prompt_overlap: promptOverlapScore(prompt, response), + }, + }); + } catch (error) { + console.error('MLflow eval logging failed:', error); + } +} + // MCP client for calling tools let mcpClient: Client | null = null; let mcpTransport: StreamableHTTPClientTransport | null = null; @@ -335,8 +399,9 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA }; console.error('\n[DEBUG: Starting to send message to A2A server]'); - async function processWithOutput(a2a_client: any, message_obj: any, agent_id: string): Promise { + async function processWithOutput(a2a_client: any, message_obj: any, agent_id: string): Promise { /**Process message stream and print output.*/ + let finalMessage = ''; for await (const event of a2a_client.sendMessageStream(message_obj)) { const task = extractTaskFromEvent(event); console.error(`\n[DEBUG: Received task, id=${task.id || task.taskId || 'NO_ID'}, type=${task.kind || typeof task}]`); @@ -364,6 +429,7 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA if (task.kind === 'status-update' && task.status?.state === 'completed') { console.log('\n[Task completed]'); + finalMessage = extractFinalMessage(task); break; } @@ -389,15 +455,26 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA }; // Recursively process tool result - await processWithOutput(a2a_client, tool_result_msg, agent_id); + const resultMessage = await processWithOutput(a2a_client, tool_result_msg, agent_id); + if (resultMessage) { + finalMessage = resultMessage; + } break; } } } + if (finalMessage) { + break; + } } + return finalMessage.trim(); } - await processWithOutput(a2a_client, message, agent_id); + const finalMessage = await processWithOutput(a2a_client, message, agent_id); + if (finalMessage) { + const lastTaskId = task_ids.length > 0 ? task_ids[task_ids.length - 1] : undefined; + await runMlflowEval(initial_message, finalMessage, agent_id, lastTaskId); + } } catch (e: any) { console.error(`\n[Error in client loop: ${e}]`); throw e; From 229ed89e19aecc66a69e4cf43622c593937e1c90 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 19 Jan 2026 22:38:30 +0000 Subject: [PATCH 2/3] chore: revert non-python mlflow changes Co-authored-by: m --- examples/typescript/mlflow.ts | 140 ---------------------------------- 1 file changed, 140 deletions(-) delete mode 100644 examples/typescript/mlflow.ts diff --git a/examples/typescript/mlflow.ts b/examples/typescript/mlflow.ts deleted file mode 100644 index fac5e361..00000000 --- a/examples/typescript/mlflow.ts +++ /dev/null @@ -1,140 +0,0 @@ -type MlflowConfig = { - trackingUri: string; - experimentName: string; -}; - -type MlflowTag = { key: string; value: string }; -type MlflowParam = { key: string; value: string }; -type MlflowMetric = { key: string; value: number; timestamp: number; step: number }; - -type LogBatchPayload = { - run_id: string; - metrics?: MlflowMetric[]; - params?: MlflowParam[]; - tags?: MlflowTag[]; -}; - -type CreateRunResponse = { - run: { - info: { - run_id: string; - }; - }; -}; - -type GetExperimentResponse = { - experiment: { - experiment_id: string; - }; -}; - -type CreateExperimentResponse = { - experiment_id: string; -}; - -function getMlflowConfig(): MlflowConfig | null { - const trackingUri = process.env.MLFLOW_TRACKING_URI; - if (!trackingUri || !(trackingUri.startsWith('http://') || trackingUri.startsWith('https://'))) { - return null; - } - return { - trackingUri, - experimentName: process.env.MLFLOW_EXPERIMENT_NAME || 'timestep-evals', - }; -} - -async function postJson(url: string, body: unknown): Promise { - const response = await fetch(url, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(body), - }); - - if (!response.ok) { - const text = await response.text(); - throw new Error(`MLflow request failed (${response.status}): ${text}`); - } - return (await response.json()) as T; -} - -async function getOrCreateExperimentId(trackingUri: string, experimentName: string): Promise { - try { - const data = await postJson(`${trackingUri}/api/2.0/mlflow/experiments/get-by-name`, { - experiment_name: experimentName, - }); - return data.experiment.experiment_id; - } catch (error) { - const created = await postJson(`${trackingUri}/api/2.0/mlflow/experiments/create`, { - name: experimentName, - }); - return created.experiment_id; - } -} - -export class MlflowClient { - private trackingUri: string; - private experimentId: string; - - private constructor(trackingUri: string, experimentId: string) { - this.trackingUri = trackingUri; - this.experimentId = experimentId; - } - - static async create(): Promise { - const config = getMlflowConfig(); - if (!config) { - return null; - } - const experimentId = await getOrCreateExperimentId(config.trackingUri, config.experimentName); - return new MlflowClient(config.trackingUri, experimentId); - } - - async createRun(runName: string, tags: Record): Promise { - const tagList: MlflowTag[] = [ - { key: 'mlflow.runName', value: runName }, - ...Object.entries(tags).map(([key, value]) => ({ key, value })), - ]; - const data = await postJson(`${this.trackingUri}/api/2.0/mlflow/runs/create`, { - experiment_id: this.experimentId, - tags: tagList, - }); - return data.run.info.run_id; - } - - async logBatch(runId: string, payload: { params?: Record; metrics?: Record; tags?: Record }): Promise { - const timestamp = Date.now(); - const params = payload.params - ? Object.entries(payload.params).map(([key, value]) => ({ key, value: String(value) })) - : undefined; - const metrics = payload.metrics - ? Object.entries(payload.metrics).map(([key, value]) => ({ key, value: Number(value), timestamp, step: 0 })) - : undefined; - const tags = payload.tags - ? Object.entries(payload.tags).map(([key, value]) => ({ key, value })) - : undefined; - - const body: LogBatchPayload = { - run_id: runId, - ...(params ? { params } : {}), - ...(metrics ? { metrics } : {}), - ...(tags ? { tags } : {}), - }; - - if (!params && !metrics && !tags) { - return; - } - await postJson(`${this.trackingUri}/api/2.0/mlflow/runs/log-batch`, body); - } -} - -let cachedClient: Promise | null = null; - -export async function getMlflowClient(): Promise { - if (!cachedClient) { - cachedClient = MlflowClient.create().catch((error) => { - console.error('MLflow client init failed:', error); - return null; - }); - } - return cachedClient; -} From 0339a343f2710c7905b3aa32d539271713c9501f Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 19 Jan 2026 22:39:03 +0000 Subject: [PATCH 3/3] chore: restore non-python files Co-authored-by: m --- Makefile | 4 -- examples/typescript/a2a_server.ts | 86 +++--------------------------- examples/typescript/compose.yml | 2 - examples/typescript/test_client.ts | 83 ++-------------------------- 4 files changed, 10 insertions(+), 165 deletions(-) diff --git a/Makefile b/Makefile index d78e975d..74c6ecfc 100644 --- a/Makefile +++ b/Makefile @@ -13,10 +13,6 @@ help: @echo "Environment variables:" @echo " TEST_MESSAGE - Custom message to send to the test client (default: 'What's the weather in Oakland?')" @echo " OPENAI_API_KEY - OpenAI API key (required)" - @echo " MLFLOW_TRACKING_URI - MLflow tracking server URI" - @echo " MLFLOW_EXPERIMENT_NAME - MLflow experiment name" - @echo " MLFLOW_TRACING_ENABLED - Enable MLflow tracing (default: true)" - @echo " MLFLOW_EVAL_ENABLED - Enable MLflow evals (default: true)" @echo "" @echo "Note: TypeScript example is pending MCP SDK v2 release (expected Q1 2026)" diff --git a/examples/typescript/a2a_server.ts b/examples/typescript/a2a_server.ts index 8e393a79..54885922 100644 --- a/examples/typescript/a2a_server.ts +++ b/examples/typescript/a2a_server.ts @@ -26,7 +26,6 @@ import { } from '@a2a-js/sdk/server'; import { agentCardHandler, jsonRpcHandler, UserBuilder } from '@a2a-js/sdk/server/express'; import { Message, Task } from '@a2a-js/sdk'; -import { getMlflowClient } from './mlflow'; // Initialize OpenAI client const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); @@ -92,8 +91,6 @@ const AGENT_DESCRIPTIONS: Record = { [WEATHER_ASSISTANT_ID]: 'Weather Assistant', }; -const MLFLOW_TRACING_ENABLED = ['1', 'true', 'yes'].includes((process.env.MLFLOW_TRACING_ENABLED || 'true').toLowerCase()); - function buildSystemMessage(agentId: string, tools: any[]): string { /**Build system message explaining who the agent is and what tools are available.*/ const agentName = AGENT_DESCRIPTIONS[agentId] || 'Assistant'; @@ -148,77 +145,16 @@ const taskMessages: Record = {}; // Track all task IDs per agent for listing const agentTaskIds: Record = {}; -function truncateText(value: string, maxLength: number = 500): string { - if (value.length <= maxLength) { - return value; - } - return `${value.slice(0, maxLength)}...`; -} - -async function writeTrace( +function writeTrace( taskId: string, agentId: string, - model: string, inputMessages: any[], inputTools: any[], - outputMessage: any, - usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number } -): Promise { - /**Write trace to MLflow if configured.""" - if (!MLFLOW_TRACING_ENABLED) { - return; - } - try { - const client = await getMlflowClient(); - if (!client) { - return; - } - - const runName = `trace-${agentId.substring(0, 8)}-${taskId ? taskId.substring(0, 8) : 'unknown'}`; - const runId = await client.createRun(runName, { - 'a2a.agent_id': agentId, - 'a2a.task_id': taskId || '', - }); - - const userInput = inputMessages - .filter((msg) => msg.role === 'user') - .map((msg) => msg.content || '') - .join('\n'); - const outputContent = outputMessage?.content || ''; - const toolNames = inputTools - .map((tool) => tool?.function?.name) - .filter((name) => typeof name === 'string') - .join(','); - - const metrics: Record = { - input_messages: inputMessages.length, - tool_count: inputTools.length, - tool_calls: Array.isArray(outputMessage?.tool_calls) ? outputMessage.tool_calls.length : 0, - response_length: outputContent.length, - }; - - if (usage?.prompt_tokens !== undefined) { - metrics.prompt_tokens = usage.prompt_tokens; - } - if (usage?.completion_tokens !== undefined) { - metrics.completion_tokens = usage.completion_tokens; - } - if (usage?.total_tokens !== undefined) { - metrics.total_tokens = usage.total_tokens; - } - - await client.logBatch(runId, { - params: { - model: model, - tool_names: toolNames, - input_preview: truncateText(userInput), - output_preview: truncateText(outputContent), - }, - metrics, - }); - } catch (error) { - console.error('MLflow trace logging failed:', error); - } + outputMessage: any +): void { + /**Write trace to traces/ folder.*/ + // Implementation would write to traces/ folder + // Similar to Python version } class MultiAgentExecutor implements AgentExecutor { @@ -337,15 +273,7 @@ class MultiAgentExecutor implements AgentExecutor { })), }; - await writeTrace( - taskId || '', - this.agent_id, - this.model, - openai_messages_with_system, - this.tools || [], - output_message_dict, - response.usage - ); + writeTrace(taskId || '', this.agent_id, openai_messages_with_system, this.tools || [], output_message_dict); // Build A2A message const agentMessage: Message = { diff --git a/examples/typescript/compose.yml b/examples/typescript/compose.yml index 8f051741..a9223ff3 100644 --- a/examples/typescript/compose.yml +++ b/examples/typescript/compose.yml @@ -10,8 +10,6 @@ services: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - - MLFLOW_TRACKING_URI=${MLFLOW_TRACKING_URI} - - MLFLOW_EXPERIMENT_NAME=${MLFLOW_EXPERIMENT_NAME} develop: watch: - action: restart diff --git a/examples/typescript/test_client.ts b/examples/typescript/test_client.ts index 0f33b1bf..e0bbcd41 100644 --- a/examples/typescript/test_client.ts +++ b/examples/typescript/test_client.ts @@ -27,7 +27,6 @@ import { } from '@modelcontextprotocol/sdk/client'; import { writeFileSync, mkdirSync } from 'fs'; import { join } from 'path'; -import { getMlflowClient } from './mlflow'; // Server URLs const A2A_BASE_URL = process.env.A2A_URL || 'http://localhost:8000'; @@ -37,8 +36,6 @@ const MCP_URL = process.env.MCP_URL || 'http://localhost:8080/mcp'; const PERSONAL_ASSISTANT_ID = '00000000-0000-0000-0000-000000000000'; const WEATHER_ASSISTANT_ID = 'FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF'; -const MLFLOW_EVAL_ENABLED = ['1', 'true', 'yes'].includes((process.env.MLFLOW_EVAL_ENABLED || 'true').toLowerCase()); - function writeTask(task: any, agent_id: string): void { /**Write task to tasks/ folder in proper A2A Task format.*/ const tasks_dir = 'tasks'; @@ -149,67 +146,6 @@ function parseToolCall(tool_call: Record): [string | null, Record - text - .toLowerCase() - .split(/\W+/) - .map((token) => token.trim()) - .filter((token) => token.length > 0); - - const promptTokens = new Set(tokenize(prompt)); - if (promptTokens.size === 0) { - return 0; - } - const responseTokens = new Set(tokenize(response)); - let overlap = 0; - for (const token of promptTokens) { - if (responseTokens.has(token)) { - overlap += 1; - } - } - return overlap / promptTokens.size; -} - -async function runMlflowEval(prompt: string, response: string, agentId: string, taskId?: string): Promise { - if (!MLFLOW_EVAL_ENABLED) { - return; - } - const client = await getMlflowClient(); - if (!client) { - return; - } - - try { - const runName = `eval-${agentId.substring(0, 8)}-${taskId ? taskId.substring(0, 8) : 'unknown'}`; - const runId = await client.createRun(runName, { - 'a2a.agent_id': agentId, - 'a2a.task_id': taskId || '', - }); - - await client.logBatch(runId, { - params: { - prompt_preview: truncateText(prompt), - response_preview: truncateText(response), - }, - metrics: { - prompt_length: prompt.length, - response_length: response.length, - prompt_overlap: promptOverlapScore(prompt, response), - }, - }); - } catch (error) { - console.error('MLflow eval logging failed:', error); - } -} - // MCP client for calling tools let mcpClient: Client | null = null; let mcpTransport: StreamableHTTPClientTransport | null = null; @@ -399,9 +335,8 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA }; console.error('\n[DEBUG: Starting to send message to A2A server]'); - async function processWithOutput(a2a_client: any, message_obj: any, agent_id: string): Promise { + async function processWithOutput(a2a_client: any, message_obj: any, agent_id: string): Promise { /**Process message stream and print output.*/ - let finalMessage = ''; for await (const event of a2a_client.sendMessageStream(message_obj)) { const task = extractTaskFromEvent(event); console.error(`\n[DEBUG: Received task, id=${task.id || task.taskId || 'NO_ID'}, type=${task.kind || typeof task}]`); @@ -429,7 +364,6 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA if (task.kind === 'status-update' && task.status?.state === 'completed') { console.log('\n[Task completed]'); - finalMessage = extractFinalMessage(task); break; } @@ -455,26 +389,15 @@ async function runClientLoop(initial_message: string, agent_id: string = PERSONA }; // Recursively process tool result - const resultMessage = await processWithOutput(a2a_client, tool_result_msg, agent_id); - if (resultMessage) { - finalMessage = resultMessage; - } + await processWithOutput(a2a_client, tool_result_msg, agent_id); break; } } } - if (finalMessage) { - break; - } } - return finalMessage.trim(); } - const finalMessage = await processWithOutput(a2a_client, message, agent_id); - if (finalMessage) { - const lastTaskId = task_ids.length > 0 ? task_ids[task_ids.length - 1] : undefined; - await runMlflowEval(initial_message, finalMessage, agent_id, lastTaskId); - } + await processWithOutput(a2a_client, message, agent_id); } catch (e: any) { console.error(`\n[Error in client loop: ${e}]`); throw e;