diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index ecbd18e78..358b35b4d 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -11,7 +11,12 @@ import { serving, WRITE_ACTIONS, } from "@databricks/appkit"; -import { agents, createAgent, tool } from "@databricks/appkit/beta"; +import { + agents, + createAgent, + DatabricksAdapter, + tool, +} from "@databricks/appkit/beta"; import { WorkspaceClient } from "@databricks/sdk-experimental"; import { z } from "zod"; import { lakebaseExamples } from "./lakebase-examples-plugin"; @@ -68,6 +73,35 @@ const helper = createAgent({ }, }); +// Supervisor API demo agent. The Databricks AI Gateway executes hosted +// tools server-side; declare them via `createAgent({ tools })` like any +// other agent tool — the agents plugin classifies the tagged record and +// routes it to the adapter via AgentInput.extensions. Import +// `supervisorTools` from '@databricks/appkit/beta' and uncomment an +// entry below to give the model real powers. +// +// `createAgent({ model })` accepts an adapter promise, so the factory's +// host/credential resolution is awaited lazily on first dispatch (via +// `resolveAdapter` in the agents plugin). A misconfigured workspace will +// surface at first chat request, not at module init. +const supervisor = createAgent({ + instructions: + "You are an assistant powered by the Databricks Supervisor API.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4-5", + }), + tools: () => ({ + // nyc: supervisorTools.genieSpace({ + // id: "01ABCDEF12345678", + // description: "NYC taxi trip records and zones", + // }), + // add: supervisorTools.ucFunction({ + // name: "main.default.add", + // description: "Adds two integers and returns the sum.", + // }), + }), +}); + /* * Smart-Dashboard agents. * @@ -385,7 +419,7 @@ createApp({ }), serving(), agents({ - agents: { helper, sql_analyst, dashboard_pilot }, + agents: { helper, sql_analyst, dashboard_pilot, supervisor }, // `query` (markdown dispatcher) + `sql_analyst` + `dashboard_pilot` // wire the /smart-dashboard route. `insights` and `anomaly` are // ephemeral markdown agents auto-fired by the route's AgentSidebar. diff --git a/docs/docs/api/appkit/Class.DatabricksAdapter.md b/docs/docs/api/appkit/Class.DatabricksAdapter.md index ba4a8a187..8492aa875 100644 --- a/docs/docs/api/appkit/Class.DatabricksAdapter.md +++ b/docs/docs/api/appkit/Class.DatabricksAdapter.md @@ -150,3 +150,40 @@ serving surface — no bespoke `fetch()` + `authenticate()` plumbing. #### Returns `Promise`\<`DatabricksAdapter`\> + +*** + +### fromSupervisorApi() + +```ts +static fromSupervisorApi(options: SupervisorApiAdapterOptions): Promise; +``` + +Discoverability shim for the Supervisor API adapter. Returns a +import("./supervisor-api").SupervisorApiAdapter, NOT a +DatabricksAdapter — the two are separate classes (different +wire formats, different lifecycle). Surfaced here so application +developers see a single `DatabricksAdapter.from*` autocomplete root. + +Dynamic-imports `./supervisor-api` to avoid forming a load-time cycle: +both files share `connectors/serving/client.ts`. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `options` | [`SupervisorApiAdapterOptions`](Interface.SupervisorApiAdapterOptions.md) | + +#### Returns + +`Promise`\<[`SupervisorApiAdapter`](Class.SupervisorApiAdapter.md)\> + +#### Example + +```ts +import { DatabricksAdapter } from "@databricks/appkit/beta"; + +const model = await DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4-5", +}); +``` diff --git a/docs/docs/api/appkit/Class.SupervisorApiAdapter.md b/docs/docs/api/appkit/Class.SupervisorApiAdapter.md new file mode 100644 index 000000000..e1721b51b --- /dev/null +++ b/docs/docs/api/appkit/Class.SupervisorApiAdapter.md @@ -0,0 +1,141 @@ +# Class: SupervisorApiAdapter + +Adapter that calls the Databricks AI Gateway Responses API +(`/ai-gateway/mlflow/v1/responses`). + +Streams SSE events in the OpenAI Responses API wire format and maps them +to the AppKit `AgentEvent` protocol. Tool execution is handled +server-side, so the adapter ignores the agents-plugin tool index. + +Authentication is handled via the Databricks SDK credential chain — the +same mechanism used by `DatabricksAdapter.fromModelServing`. The transport +is injected via [SupervisorApiAdapterCtorOptions.streamBody](Interface.SupervisorApiAdapterCtorOptions.md#streambody); the +[fromSupervisorApi](Function.fromSupervisorApi.md) factory wires it through the SDK's +`apiClient.request({ raw: true })`. + +Set `DEBUG=appkit:agents:supervisor-api` to log the outbound request +shape (model, instructions length, input shape, tool count) and to be +notified when the recovery path engages (no incremental deltas, text +pulled from `response.completed.output[]`). The no-delta warning includes +a per-turn event-type histogram and the SA-reported status/error/ +incomplete_details, so it's already actionable without DEBUG. + +Tools are not configured on the adapter. Declare them via +`createAgent({ tools: () => ({ key: supervisorTools.genieSpace({...}) }) })` +(or markdown frontmatter referencing an ambient `supervisorTools.*` entry); +the agents plugin / standalone `runAgent` aggregates hosted-supervisor +entries and routes them to the adapter via +`AgentInput.extensions[SUPERVISOR_EXTENSION_KEY]`. Advanced callers +invoking `adapter.run(...)` directly populate that key themselves. + +## Example + +```ts +import { createApp, createAgent } from "@databricks/appkit"; +import { + agents, + DatabricksAdapter, + supervisorTools, +} from "@databricks/appkit/beta"; + +await createApp({ + plugins: [ + agents({ + agents: { + assistant: createAgent({ + instructions: "You are a helpful assistant.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + }), + tools: () => ({ + nyc: supervisorTools.genieSpace({ + id: "01ABCDEF12345678", + description: "NYC taxi trip records and zones", + }), + }), + }), + }, + }), + ], +}); +``` + +## Implements + +- [`AgentAdapter`](Interface.AgentAdapter.md) + +## Constructors + +### Constructor + +```ts +new SupervisorApiAdapter(options: SupervisorApiAdapterCtorOptions): SupervisorApiAdapter; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `options` | [`SupervisorApiAdapterCtorOptions`](Interface.SupervisorApiAdapterCtorOptions.md) | + +#### Returns + +`SupervisorApiAdapter` + +## Properties + +### acceptsExtensions + +```ts +readonly acceptsExtensions: readonly ["databricks.supervisor"]; +``` + +Capability negotiation: the adapter reads its hosted-tool payload +from [AgentInput.extensions](Interface.AgentInput.md#extensions) under [SUPERVISOR\_EXTENSION\_KEY](Variable.SUPERVISOR_EXTENSION_KEY.md). +The agents plugin uses this list to warn at registration when the tool +index produces extensions the adapter wouldn't consume. + +#### Implementation of + +[`AgentAdapter`](Interface.AgentAdapter.md).[`acceptsExtensions`](Interface.AgentAdapter.md#acceptsextensions) + +*** + +### consumesInputTools + +```ts +readonly consumesInputTools: false = false; +``` + +Capability negotiation: the adapter does not consume `input.tools`. +Tool execution is owned by the Databricks AI Gateway server-side, so +any function tools or local sub-agents declared on this agent would +be silently dropped — the agents plugin warns at registration when +that combination is detected. + +#### Implementation of + +[`AgentAdapter`](Interface.AgentAdapter.md).[`consumesInputTools`](Interface.AgentAdapter.md#consumesinputtools) + +## Methods + +### run() + +```ts +run(input: AgentInput, context: AgentRunContext): AsyncGenerator; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `input` | [`AgentInput`](Interface.AgentInput.md) | +| `context` | [`AgentRunContext`](Interface.AgentRunContext.md) | + +#### Returns + +`AsyncGenerator`\<[`AgentEvent`](TypeAlias.AgentEvent.md), `void`, `unknown`\> + +#### Implementation of + +[`AgentAdapter`](Interface.AgentAdapter.md).[`run`](Interface.AgentAdapter.md#run) diff --git a/docs/docs/api/appkit/Function.fromSupervisorApi.md b/docs/docs/api/appkit/Function.fromSupervisorApi.md new file mode 100644 index 000000000..cb54e7b60 --- /dev/null +++ b/docs/docs/api/appkit/Function.fromSupervisorApi.md @@ -0,0 +1,73 @@ +# Function: fromSupervisorApi() + +```ts +function fromSupervisorApi(options: SupervisorApiAdapterOptions): Promise; +``` + +Creates an [AgentAdapter](Interface.AgentAdapter.md) backed by the Databricks AI Gateway +Responses API (`/ai-gateway/mlflow/v1/responses`). + +Uses the SDK's default credential chain for auth (reads DATABRICKS_HOST, +DATABRICKS_TOKEN, OAuth config, etc.). Tools are declared on the agent +(via `createAgent({ tools })`), not on this factory. + +Application code should prefer the +[DatabricksAdapter.fromSupervisorApi](Class.DatabricksAdapter.md#fromsupervisorapi) static — it delegates here +and keeps a single `DatabricksAdapter.from*` autocomplete root for all +Databricks-backed adapters. This free function is the implementation +behind the static and remains exported for callers that want to import +it directly without pulling in [DatabricksAdapter](Class.DatabricksAdapter.md). + +## Parameters + +| Parameter | Type | +| ------ | ------ | +| `options` | [`SupervisorApiAdapterOptions`](Interface.SupervisorApiAdapterOptions.md) | + +## Returns + +`Promise`\<[`SupervisorApiAdapter`](Class.SupervisorApiAdapter.md)\> + +## Example + +```ts +import { createApp, createAgent } from "@databricks/appkit"; +import { + agents, + DatabricksAdapter, + supervisorTools, +} from "@databricks/appkit/beta"; + +await createApp({ + plugins: [ + agents({ + agents: { + assistant: createAgent({ + instructions: "You are a helpful assistant.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + }), + tools: () => ({ + nyc: supervisorTools.genieSpace({ + id: "01ABCDEF12345678", + description: "NYC taxi trip records and zones", + }), + }), + }), + }, + }), + ], +}); +``` + +## Remarks + +⚠ When passing your own `workspaceClient`, see the warning on +[SupervisorApiAdapterOptions.workspaceClient](Interface.SupervisorApiAdapterOptions.md#workspaceclient) — the client is +captured once and reused, so per-request OBO clients would leak +identity across requests. + +## See + +[DatabricksAdapter.fromSupervisorApi](Class.DatabricksAdapter.md#fromsupervisorapi) — the recommended +application-facing entry point. diff --git a/docs/docs/api/appkit/Function.isSupervisorTool.md b/docs/docs/api/appkit/Function.isSupervisorTool.md new file mode 100644 index 000000000..e34696a71 --- /dev/null +++ b/docs/docs/api/appkit/Function.isSupervisorTool.md @@ -0,0 +1,20 @@ +# Function: isSupervisorTool() + +```ts +function isSupervisorTool(value: unknown): value is HostedSupervisorTool; +``` + +Type guard for [HostedSupervisorTool](Interface.HostedSupervisorTool.md). Used by the agents plugin +(`buildToolIndex`) and standalone `runAgent` (`classifyTool`) to route +supervisor-hosted tools to the extensions payload rather than the +adapter's `tools` array. + +## Parameters + +| Parameter | Type | +| ------ | ------ | +| `value` | `unknown` | + +## Returns + +`value is HostedSupervisorTool` diff --git a/docs/docs/api/appkit/Interface.AgentAdapter.md b/docs/docs/api/appkit/Interface.AgentAdapter.md index 52083157e..7e9f6ef22 100644 --- a/docs/docs/api/appkit/Interface.AgentAdapter.md +++ b/docs/docs/api/appkit/Interface.AgentAdapter.md @@ -1,5 +1,34 @@ # Interface: AgentAdapter +## Properties + +### acceptsExtensions? + +```ts +readonly optional acceptsExtensions: readonly string[]; +``` + +Extension keys this adapter consumes from [AgentInput.extensions](Interface.AgentInput.md#extensions). +The agents plugin (and standalone `runAgent`) warns at registration +if the tool index produces extensions whose keys aren't listed here. + +Adapters that don't read extensions can omit this field. + +*** + +### consumesInputTools? + +```ts +readonly optional consumesInputTools: boolean; +``` + +Whether the adapter consumes tools from `input.tools`. Defaults to +true. Adapters whose tool execution happens elsewhere (e.g. the +Supervisor API, where SA owns the tool loop server-side) declare +false; the agents plugin warns at registration if the agent declares +function tools or local sub-agents alongside such an adapter, since +those tools would never reach the model. + ## Methods ### run() diff --git a/docs/docs/api/appkit/Interface.AgentInput.md b/docs/docs/api/appkit/Interface.AgentInput.md index 6d2eff8b0..17102bb87 100644 --- a/docs/docs/api/appkit/Interface.AgentInput.md +++ b/docs/docs/api/appkit/Interface.AgentInput.md @@ -2,6 +2,23 @@ ## Properties +### extensions? + +```ts +optional extensions: Readonly>; +``` + +Adapter-specific opaque payloads, keyed by adapter namespace. The +shared contract intentionally does not enumerate keys — see each +adapter's docs for which keys it reads and the shape of each value. + +The agents plugin and standalone `runAgent` populate this from the +agent's tool index when entries declare an adapter-side spec (e.g. +Supervisor API hosted tools). Adapters that don't read extensions +should leave it untouched. + +*** + ### messages ```ts diff --git a/docs/docs/api/appkit/Interface.HostedSupervisorTool.md b/docs/docs/api/appkit/Interface.HostedSupervisorTool.md new file mode 100644 index 000000000..2f1b45d25 --- /dev/null +++ b/docs/docs/api/appkit/Interface.HostedSupervisorTool.md @@ -0,0 +1,24 @@ +# Interface: HostedSupervisorTool + +Tagged record returned by every [supervisorTools](Variable.supervisorTools.md) factory. The +`__kind` discriminator lets the agents plugin (and standalone +`runAgent`) classify these tools without a structural match against the +wire format — keeps the SA wire shape free to evolve and avoids +namespace collisions with MCP hosted tools (which use `type: "genie-space"` +hyphenated, vs SA's `type: "genie_space"` underscored). + +## Properties + +### \_\_kind + +```ts +readonly __kind: "hosted-supervisor"; +``` + +*** + +### spec + +```ts +readonly spec: SupervisorTool; +``` diff --git a/docs/docs/api/appkit/Interface.SupervisorApiAdapterCtorOptions.md b/docs/docs/api/appkit/Interface.SupervisorApiAdapterCtorOptions.md new file mode 100644 index 000000000..6e332d67c --- /dev/null +++ b/docs/docs/api/appkit/Interface.SupervisorApiAdapterCtorOptions.md @@ -0,0 +1,17 @@ +# Interface: SupervisorApiAdapterCtorOptions + +## Properties + +### model + +```ts +model: string; +``` + +*** + +### streamBody + +```ts +streamBody: StreamBody; +``` diff --git a/docs/docs/api/appkit/Interface.SupervisorApiAdapterOptions.md b/docs/docs/api/appkit/Interface.SupervisorApiAdapterOptions.md new file mode 100644 index 000000000..d5884d872 --- /dev/null +++ b/docs/docs/api/appkit/Interface.SupervisorApiAdapterOptions.md @@ -0,0 +1,31 @@ +# Interface: SupervisorApiAdapterOptions + +## Properties + +### model + +```ts +model: string; +``` + +Model identifier to pass in the request body +(e.g. "databricks-claude-sonnet-4"). + +*** + +### workspaceClient? + +```ts +optional workspaceClient: WorkspaceClientLike; +``` + +A WorkspaceClient (or structural equivalent) used for host resolution +and per-request authentication. When omitted, a `WorkspaceClient({})` +is created internally using the default SDK credential chain +(`DATABRICKS_HOST`, OAuth, PAT, etc.). + +⚠ The `workspaceClient` is captured at construction and reused across +every request. Passing a per-request OBO (On-Behalf-Of) client here +would silently leak the first request's identity into all subsequent +requests served by this adapter instance. Use the default credential +chain or pass a service-principal client. (CWE-664) diff --git a/docs/docs/api/appkit/Interface.SupervisorExtension.md b/docs/docs/api/appkit/Interface.SupervisorExtension.md new file mode 100644 index 000000000..28cc6c7cc --- /dev/null +++ b/docs/docs/api/appkit/Interface.SupervisorExtension.md @@ -0,0 +1,13 @@ +# Interface: SupervisorExtension + +Shape of the value at `AgentInput.extensions[SUPERVISOR_EXTENSION_KEY]`. +The agents plugin / `runAgent` build this from the tool index; advanced +callers invoking `adapter.run(...)` directly populate it themselves. + +## Properties + +### hostedTools? + +```ts +optional hostedTools: SupervisorTool[]; +``` diff --git a/docs/docs/api/appkit/TypeAlias.AgentTool.md b/docs/docs/api/appkit/TypeAlias.AgentTool.md index e165cec66..060c457ec 100644 --- a/docs/docs/api/appkit/TypeAlias.AgentTool.md +++ b/docs/docs/api/appkit/TypeAlias.AgentTool.md @@ -4,9 +4,11 @@ type AgentTool = | FunctionTool | HostedTool - | ToolkitEntry; + | ToolkitEntry + | HostedSupervisorTool; ``` Any tool an agent can invoke: inline function tools (`tool()`), hosted MCP -tools (`mcpServer()` / raw hosted), or toolkit references from plugins -(`analytics().toolkit()`). +tools (`mcpServer()` / raw hosted), toolkit references from plugins +(`analytics().toolkit()`), or adapter-hosted Supervisor-API tools +(`supervisorTools.*`). diff --git a/docs/docs/api/appkit/TypeAlias.ResolvedToolEntry.md b/docs/docs/api/appkit/TypeAlias.ResolvedToolEntry.md index e97b3ef97..d03c0afda 100644 --- a/docs/docs/api/appkit/TypeAlias.ResolvedToolEntry.md +++ b/docs/docs/api/appkit/TypeAlias.ResolvedToolEntry.md @@ -22,7 +22,160 @@ type ResolvedToolEntry = agentName: string; def: AgentToolDefinition; source: "subagent"; +} + | { + def: AgentToolDefinition; + source: "hosted-supervisor"; + spec: SupervisorTool; }; ``` Internal tool-index entry after a tool record has been resolved to a dispatchable form. + +## Type Declaration + +```ts +{ + def: AgentToolDefinition; + localName: string; + pluginName: string; + source: "toolkit"; +} +``` + +### def + +```ts +def: AgentToolDefinition; +``` + +### localName + +```ts +localName: string; +``` + +### pluginName + +```ts +pluginName: string; +``` + +### source + +```ts +source: "toolkit"; +``` + +```ts +{ + def: AgentToolDefinition; + functionTool: FunctionTool; + source: "function"; +} +``` + +### def + +```ts +def: AgentToolDefinition; +``` + +### functionTool + +```ts +functionTool: FunctionTool; +``` + +### source + +```ts +source: "function"; +``` + +```ts +{ + def: AgentToolDefinition; + mcpToolName: string; + source: "mcp"; +} +``` + +### def + +```ts +def: AgentToolDefinition; +``` + +### mcpToolName + +```ts +mcpToolName: string; +``` + +### source + +```ts +source: "mcp"; +``` + +```ts +{ + agentName: string; + def: AgentToolDefinition; + source: "subagent"; +} +``` + +### agentName + +```ts +agentName: string; +``` + +### def + +```ts +def: AgentToolDefinition; +``` + +### source + +```ts +source: "subagent"; +``` + +```ts +{ + def: AgentToolDefinition; + source: "hosted-supervisor"; + spec: SupervisorTool; +} +``` + +### def + +```ts +def: AgentToolDefinition; +``` + +### source + +```ts +source: "hosted-supervisor"; +``` + +Adapter-side hosted tool (executed by the model-host, not by the +Node process). Today: Supervisor API hosted tools (Genie spaces, +UC functions, etc.). The `spec` is opaque to the agents plugin — +it routes the entry into `AgentInput.extensions` for the adapter +that declared the matching `acceptsExtensions` key. `def` is a +synthetic placeholder kept so the index has a uniform shape; it +is intentionally NOT included in the `tools` array passed to +`adapter.run()` (those entries are not callable functions). + +### spec + +```ts +spec: SupervisorTool; +``` diff --git a/docs/docs/api/appkit/TypeAlias.SupervisorTool.md b/docs/docs/api/appkit/TypeAlias.SupervisorTool.md new file mode 100644 index 000000000..b4fe497b1 --- /dev/null +++ b/docs/docs/api/appkit/TypeAlias.SupervisorTool.md @@ -0,0 +1,49 @@ +# Type Alias: SupervisorTool + +```ts +type SupervisorTool = + | { + genie_space: { + description: string; + id: string; + }; + type: "genie_space"; +} + | { + type: "uc_function"; + uc_function: { + description: string; + name: string; + }; +} + | { + knowledge_assistant: { + description: string; + knowledge_assistant_id: string; + }; + type: "knowledge_assistant"; +} + | { + app: { + description: string; + name: string; + }; + type: "app"; +} + | { + type: "uc_connection"; + uc_connection: { + description: string; + name: string; + }; +}; +``` + +Tools supported by the Databricks AI Gateway Responses API. The shapes match +the wire format the endpoint expects, so the adapter passes the array +straight into the request body. + +This is an adapter-internal wire type. Application code authors tools via +the [supervisorTools](Variable.supervisorTools.md) factories, which return tagged +[HostedSupervisorTool](Interface.HostedSupervisorTool.md) records — the agents plugin then unwraps +the `.spec` when routing through [AgentInput.extensions](Interface.AgentInput.md#extensions). diff --git a/docs/docs/api/appkit/Variable.SUPERVISOR_EXTENSION_KEY.md b/docs/docs/api/appkit/Variable.SUPERVISOR_EXTENSION_KEY.md new file mode 100644 index 000000000..40d610b56 --- /dev/null +++ b/docs/docs/api/appkit/Variable.SUPERVISOR_EXTENSION_KEY.md @@ -0,0 +1,10 @@ +# Variable: SUPERVISOR\_EXTENSION\_KEY + +```ts +const SUPERVISOR_EXTENSION_KEY: "databricks.supervisor"; +``` + +Namespace key under which the adapter reads its hosted-tool payload +from [AgentInput.extensions](Interface.AgentInput.md#extensions). Exported so the agents plugin and +standalone `runAgent` (the producers) can write under the same key the +adapter reads. diff --git a/docs/docs/api/appkit/Variable.supervisorTools.md b/docs/docs/api/appkit/Variable.supervisorTools.md new file mode 100644 index 000000000..666dbe025 --- /dev/null +++ b/docs/docs/api/appkit/Variable.supervisorTools.md @@ -0,0 +1,178 @@ +# Variable: supervisorTools + +```ts +const supervisorTools: { + app: (__namedParameters: { + description: string; + name: string; + }) => HostedSupervisorTool; + genieSpace: (__namedParameters: { + description: string; + id: string; + }) => HostedSupervisorTool; + knowledgeAssistant: (__namedParameters: { + description: string; + knowledgeAssistantId: string; + }) => HostedSupervisorTool; + ucConnection: (__namedParameters: { + description: string; + name: string; + }) => HostedSupervisorTool; + ucFunction: (__namedParameters: { + description: string; + name: string; + }) => HostedSupervisorTool; +}; +``` + +Concise factories for declaring Supervisor API tools. + +Each factory accepts a single named-options object: routing-critical +strings (`id`, `name`, `description`) get labels at the call site so +"we swapped the args and didn't notice for two weeks" bugs are +impossible. + +`description` is required: SA's protobuf validation rejects `null`/`""`, +AND the LLM running on SA reads this string to decide when to route to +the tool. Two genie spaces both labelled "Genie space" give the model +nothing to discriminate on, so callers always own the routing hint. + +⚠ The `description` is read by the LLM at routing time — it is a +prompt-injection sink. Do **not** derive it from untrusted input (user +messages, request bodies, external systems). Treat it as application +configuration. (CWE-1427) + +## Type Declaration + +### app() + +```ts +app: (__namedParameters: { + description: string; + name: string; +}) => HostedSupervisorTool; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `__namedParameters` | \{ `description`: `string`; `name`: `string`; \} | +| `__namedParameters.description` | `string` | +| `__namedParameters.name` | `string` | + +#### Returns + +[`HostedSupervisorTool`](Interface.HostedSupervisorTool.md) + +### genieSpace() + +```ts +genieSpace: (__namedParameters: { + description: string; + id: string; +}) => HostedSupervisorTool; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `__namedParameters` | \{ `description`: `string`; `id`: `string`; \} | +| `__namedParameters.description` | `string` | +| `__namedParameters.id` | `string` | + +#### Returns + +[`HostedSupervisorTool`](Interface.HostedSupervisorTool.md) + +### knowledgeAssistant() + +```ts +knowledgeAssistant: (__namedParameters: { + description: string; + knowledgeAssistantId: string; +}) => HostedSupervisorTool; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `__namedParameters` | \{ `description`: `string`; `knowledgeAssistantId`: `string`; \} | +| `__namedParameters.description` | `string` | +| `__namedParameters.knowledgeAssistantId` | `string` | + +#### Returns + +[`HostedSupervisorTool`](Interface.HostedSupervisorTool.md) + +### ucConnection() + +```ts +ucConnection: (__namedParameters: { + description: string; + name: string; +}) => HostedSupervisorTool; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `__namedParameters` | \{ `description`: `string`; `name`: `string`; \} | +| `__namedParameters.description` | `string` | +| `__namedParameters.name` | `string` | + +#### Returns + +[`HostedSupervisorTool`](Interface.HostedSupervisorTool.md) + +### ucFunction() + +```ts +ucFunction: (__namedParameters: { + description: string; + name: string; +}) => HostedSupervisorTool; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `__namedParameters` | \{ `description`: `string`; `name`: `string`; \} | +| `__namedParameters.description` | `string` | +| `__namedParameters.name` | `string` | + +#### Returns + +[`HostedSupervisorTool`](Interface.HostedSupervisorTool.md) + +## Example + +```ts +import { createAgent } from "@databricks/appkit"; +import { + agents, + DatabricksAdapter, + supervisorTools, +} from "@databricks/appkit/beta"; + +const assistant = createAgent({ + instructions: "You are a helpful assistant.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + }), + tools: () => ({ + nyc: supervisorTools.genieSpace({ + id: "01ABCDEF12345678", + description: "NYC taxi trip records and zones", + }), + add: supervisorTools.ucFunction({ + name: "main.default.add", + description: "Adds two integers and returns the sum.", + }), + }), +}); +``` diff --git a/docs/docs/api/appkit/index.md b/docs/docs/api/appkit/index.md index 6ac54fa47..e246d4c83 100644 --- a/docs/docs/api/appkit/index.md +++ b/docs/docs/api/appkit/index.md @@ -26,6 +26,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [PolicyDeniedError](Class.PolicyDeniedError.md) | Thrown when a policy denies an action. | | [ResourceRegistry](Class.ResourceRegistry.md) | Central registry for tracking plugin resource requirements. Deduplication uses type + resourceKey (machine-stable); alias is for display only. | | [ServerError](Class.ServerError.md) | Error thrown when server lifecycle operations fail. Use for server start/stop issues, configuration conflicts, etc. | +| [SupervisorApiAdapter](Class.SupervisorApiAdapter.md) | Adapter that calls the Databricks AI Gateway Responses API (`/ai-gateway/mlflow/v1/responses`). | | [TunnelError](Class.TunnelError.md) | Error thrown when remote tunnel operations fail. Use for tunnel connection issues, message parsing failures, etc. | | [ValidationError](Class.ValidationError.md) | Error thrown when input validation fails. Use for invalid parameters, missing required fields, or type mismatches. | @@ -48,6 +49,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [FileResource](Interface.FileResource.md) | Describes the file or directory being acted upon. | | [FunctionTool](Interface.FunctionTool.md) | - | | [GenerateDatabaseCredentialRequest](Interface.GenerateDatabaseCredentialRequest.md) | Request parameters for generating database OAuth credentials | +| [HostedSupervisorTool](Interface.HostedSupervisorTool.md) | Tagged record returned by every [supervisorTools](Variable.supervisorTools.md) factory. The `__kind` discriminator lets the agents plugin (and standalone `runAgent`) classify these tools without a structural match against the wire format — keeps the SA wire shape free to evolve and avoids namespace collisions with MCP hosted tools (which use `type: "genie-space"` hyphenated, vs SA's `type: "genie_space"` underscored). | | [IJobsConfig](Interface.IJobsConfig.md) | Configuration for the Jobs plugin. | | [ITelemetry](Interface.ITelemetry.md) | Plugin-facing interface for OpenTelemetry instrumentation. Provides a thin abstraction over OpenTelemetry APIs for plugins. | | [JobAPI](Interface.JobAPI.md) | User-facing API for a single configured job. | @@ -72,6 +74,9 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [ServingEndpointEntry](Interface.ServingEndpointEntry.md) | Shape of a single registry entry. | | [ServingEndpointRegistry](Interface.ServingEndpointRegistry.md) | Registry interface for serving endpoint type generation. Empty by default — augmented by the Vite type generator's `.d.ts` output via module augmentation. When populated, provides autocomplete for alias names and typed request/response/chunk per endpoint. | | [StreamExecutionSettings](Interface.StreamExecutionSettings.md) | Execution settings for streaming endpoints. Extends PluginExecutionSettings with SSE stream configuration. | +| [SupervisorApiAdapterCtorOptions](Interface.SupervisorApiAdapterCtorOptions.md) | - | +| [SupervisorApiAdapterOptions](Interface.SupervisorApiAdapterOptions.md) | - | +| [SupervisorExtension](Interface.SupervisorExtension.md) | Shape of the value at `AgentInput.extensions[SUPERVISOR_EXTENSION_KEY]`. The agents plugin / `runAgent` build this from the tool index; advanced callers invoking `adapter.run(...)` directly populate it themselves. | | [TelemetryConfig](Interface.TelemetryConfig.md) | OpenTelemetry configuration for AppKit applications | | [Thread](Interface.Thread.md) | - | | [ThreadStore](Interface.ThreadStore.md) | - | @@ -88,7 +93,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | Type Alias | Description | | ------ | ------ | | [AgentEvent](TypeAlias.AgentEvent.md) | - | -| [AgentTool](TypeAlias.AgentTool.md) | Any tool an agent can invoke: inline function tools (`tool()`), hosted MCP tools (`mcpServer()` / raw hosted), or toolkit references from plugins (`analytics().toolkit()`). | +| [AgentTool](TypeAlias.AgentTool.md) | Any tool an agent can invoke: inline function tools (`tool()`), hosted MCP tools (`mcpServer()` / raw hosted), toolkit references from plugins (`analytics().toolkit()`), or adapter-hosted Supervisor-API tools (`supervisorTools.*`). | | [AgentTools](TypeAlias.AgentTools.md) | Per-agent tool record. String keys map to inline tools, toolkit entries, hosted tools, etc. | | [AgentToolsFn](TypeAlias.AgentToolsFn.md) | Function form of `AgentDefinition.tools`. Receives the typed [Plugins](TypeAlias.Plugins.md) map and returns a tool record. Invoked exactly once at setup (or once per `runAgent` call in standalone mode); the result is cached as the agent's resolved tool record. | | [BaseSystemPromptOption](TypeAlias.BaseSystemPromptOption.md) | - | @@ -105,6 +110,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [ResolvedToolEntry](TypeAlias.ResolvedToolEntry.md) | Internal tool-index entry after a tool record has been resolved to a dispatchable form. | | [ResourcePermission](TypeAlias.ResourcePermission.md) | Union of all possible permission levels across all resource types. | | [ServingFactory](TypeAlias.ServingFactory.md) | Factory function returned by `AppKit.serving`. | +| [SupervisorTool](TypeAlias.SupervisorTool.md) | Tools supported by the Databricks AI Gateway Responses API. The shapes match the wire format the endpoint expects, so the adapter passes the array straight into the request body. | | [ToolRegistry](TypeAlias.ToolRegistry.md) | - | | [ToPlugin](TypeAlias.ToPlugin.md) | Factory function type returned by `toPlugin()`. Accepts optional config and returns a PluginData tuple. | @@ -115,6 +121,8 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [agents](Variable.agents.md) | Plugin factory for the agents plugin. Reads `config/agents/*.md` by default, resolves toolkits/tools from registered plugins, exposes `appkit.agents.*` runtime API and mounts `/invocations`. | | [READ\_ACTIONS](Variable.READ_ACTIONS.md) | Actions that only read data. | | [sql](Variable.sql.md) | SQL helper namespace | +| [SUPERVISOR\_EXTENSION\_KEY](Variable.SUPERVISOR_EXTENSION_KEY.md) | Namespace key under which the adapter reads its hosted-tool payload from [AgentInput.extensions](Interface.AgentInput.md#extensions). Exported so the agents plugin and standalone `runAgent` (the producers) can write under the same key the adapter reads. | +| [supervisorTools](Variable.supervisorTools.md) | Concise factories for declaring Supervisor API tools. | | [WRITE\_ACTIONS](Variable.WRITE_ACTIONS.md) | Actions that mutate data. | ## Functions @@ -132,6 +140,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [executeFromRegistry](Function.executeFromRegistry.md) | Validates tool-call arguments against the entry's schema and invokes its handler. On validation failure, returns an LLM-friendly error string (matching the behavior of `tool()`) rather than throwing, so the model can self-correct on its next turn. | | [extractServingEndpoints](Function.extractServingEndpoints.md) | Extract serving endpoint config from a server file by AST-parsing it. Looks for `serving({ endpoints: { alias: { env: "..." }, ... } })` calls and extracts the endpoint alias names and their environment variable mappings. | | [findServerFile](Function.findServerFile.md) | Find the server entry file by checking candidate paths in order. | +| [fromSupervisorApi](Function.fromSupervisorApi.md) | Creates an [AgentAdapter](Interface.AgentAdapter.md) backed by the Databricks AI Gateway Responses API (`/ai-gateway/mlflow/v1/responses`). | | [functionToolToDefinition](Function.functionToolToDefinition.md) | - | | [generateDatabaseCredential](Function.generateDatabaseCredential.md) | Generate OAuth credentials for Postgres database connection using the proper Postgres API. | | [getExecutionContext](Function.getExecutionContext.md) | Get the current execution context. | @@ -144,6 +153,7 @@ surface with `@databricks/appkit/beta`. Not meant for application imports. | [isFunctionTool](Function.isFunctionTool.md) | - | | [isHostedTool](Function.isHostedTool.md) | - | | [isSQLTypeMarker](Function.isSQLTypeMarker.md) | Type guard to check if a value is a SQL type marker | +| [isSupervisorTool](Function.isSupervisorTool.md) | Type guard for [HostedSupervisorTool](Interface.HostedSupervisorTool.md). Used by the agents plugin (`buildToolIndex`) and standalone `runAgent` (`classifyTool`) to route supervisor-hosted tools to the extensions payload rather than the adapter's `tools` array. | | [isToolkitEntry](Function.isToolkitEntry.md) | Type guard for `ToolkitEntry` — used by the agents plugin to differentiate toolkit references from inline tools in a mixed `tools` record. | | [loadAgentFromFile](Function.loadAgentFromFile.md) | Loads a single markdown agent file and resolves its frontmatter against registered plugin toolkits + ambient tool library. | | [loadAgentsFromDir](Function.loadAgentsFromDir.md) | Scans a directory for one subdirectory per agent, each containing `agent.md` (frontmatter + body). Produces an `AgentDefinition` record keyed by agent id (folder name). Throws on frontmatter errors or unresolved references. Returns an empty map if the directory does not exist. | diff --git a/docs/docs/api/appkit/typedoc-sidebar.ts b/docs/docs/api/appkit/typedoc-sidebar.ts index e7c06eefc..c1cbd88be 100644 --- a/docs/docs/api/appkit/typedoc-sidebar.ts +++ b/docs/docs/api/appkit/typedoc-sidebar.ts @@ -81,6 +81,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Class.ServerError", label: "ServerError" }, + { + type: "doc", + id: "api/appkit/Class.SupervisorApiAdapter", + label: "SupervisorApiAdapter" + }, { type: "doc", id: "api/appkit/Class.TunnelError", @@ -172,6 +177,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Interface.GenerateDatabaseCredentialRequest", label: "GenerateDatabaseCredentialRequest" }, + { + type: "doc", + id: "api/appkit/Interface.HostedSupervisorTool", + label: "HostedSupervisorTool" + }, { type: "doc", id: "api/appkit/Interface.IJobsConfig", @@ -292,6 +302,21 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Interface.StreamExecutionSettings", label: "StreamExecutionSettings" }, + { + type: "doc", + id: "api/appkit/Interface.SupervisorApiAdapterCtorOptions", + label: "SupervisorApiAdapterCtorOptions" + }, + { + type: "doc", + id: "api/appkit/Interface.SupervisorApiAdapterOptions", + label: "SupervisorApiAdapterOptions" + }, + { + type: "doc", + id: "api/appkit/Interface.SupervisorExtension", + label: "SupervisorExtension" + }, { type: "doc", id: "api/appkit/Interface.TelemetryConfig", @@ -438,6 +463,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/TypeAlias.ServingFactory", label: "ServingFactory" }, + { + type: "doc", + id: "api/appkit/TypeAlias.SupervisorTool", + label: "SupervisorTool" + }, { type: "doc", id: "api/appkit/TypeAlias.ToolRegistry", @@ -469,6 +499,16 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Variable.sql", label: "sql" }, + { + type: "doc", + id: "api/appkit/Variable.SUPERVISOR_EXTENSION_KEY", + label: "SUPERVISOR_EXTENSION_KEY" + }, + { + type: "doc", + id: "api/appkit/Variable.supervisorTools", + label: "supervisorTools" + }, { type: "doc", id: "api/appkit/Variable.WRITE_ACTIONS", @@ -535,6 +575,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Function.findServerFile", label: "findServerFile" }, + { + type: "doc", + id: "api/appkit/Function.fromSupervisorApi", + label: "fromSupervisorApi" + }, { type: "doc", id: "api/appkit/Function.functionToolToDefinition", @@ -595,6 +640,11 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Function.isSQLTypeMarker", label: "isSQLTypeMarker" }, + { + type: "doc", + id: "api/appkit/Function.isSupervisorTool", + label: "isSupervisorTool" + }, { type: "doc", id: "api/appkit/Function.isToolkitEntry", diff --git a/docs/docs/plugins/agents.md b/docs/docs/plugins/agents.md index 0ba2ab301..cf2123366 100644 --- a/docs/docs/plugins/agents.md +++ b/docs/docs/plugins/agents.md @@ -16,6 +16,8 @@ This page covers the full lifecycle. For the hand-written primitives (`tool()`, The agents plugin drives the LLM over Server-Sent Events. Foundation Model APIs (Claude, Llama, GPT, etc.) and other chat-style endpoints support streaming and work out of the box. Custom model endpoints that return a single JSON response (e.g. typical `sklearn` or MLflow `pyfunc` deployments) do **not** stream — pointing an agent at one will fail with "Response body is null — streaming not supported" on the first turn. If you list a serving endpoint in `apps init`, pick one whose model implements the chat-completions streaming protocol; the agents plugin reads its name from `DATABRICKS_SERVING_ENDPOINT_NAME` whenever an agent doesn't pin `model:` itself. For the non-streaming path against a custom endpoint, use the `serving` plugin's `/invoke` route with `useServingInvoke` instead. + +Or skip serving-endpoint setup entirely with the managed [Supervisor API adapter](#managed-agents-the-supervisor-api-adapter) (beta). ::: ## Install @@ -215,7 +217,131 @@ const result = await runAgent(classifier, { `runAgent` eagerly constructs each plugin in `RunAgentInput.plugins`, runs the standard `attachContext({})` + `await setup()` lifecycle, and shares the instances across the top-level run and every sub-agent dispatch. Plugins whose `setup()` requires `createApp`-only runtime (e.g. `WorkspaceClient`, `ServiceContext`) throw at standalone-init with a clear "use createApp instead" message rather than mid-stream. -Hosted tools (MCP) are still `agents()`-only since they require the live MCP client. Plugin tool dispatch in standalone mode runs as the service principal (no OBO) and **bypasses the agents-plugin approval gate** — treat standalone runAgent as a trusted-prompt environment (CI, batch eval, internal scripts), not as an exposed user-facing surface. +MCP hosted tools (`mcpServer(...)`) still require `agents()` (they need a live MCP client). Supervisor-API hosted tools (`supervisorTools.*`), by contrast, **work in standalone `runAgent`** — the adapter has everything it needs to execute them server-side. This makes batch-eval / CI use of supervisor agents possible without `createApp`. Plugin tool dispatch in standalone mode runs as the service principal (no OBO) and **bypasses the agents-plugin approval gate** — treat standalone runAgent as a trusted-prompt environment (CI, batch eval, internal scripts), not as an exposed user-facing surface. + +## Managed agents: the Supervisor API adapter + +`DatabricksAdapter.fromSupervisorApi` (beta) is the zero-config way to run an agent: instead of provisioning and pointing at a model-serving endpoint, you run the agentic loop in the Databricks workspace by targeting the AI Gateway Responses API (`/ai-gateway/mlflow/v1/responses`), which runs the LLM — and any hosted tools — as a managed service on Databricks. No `DATABRICKS_SERVING_ENDPOINT_NAME`, no stream-capability check, no JS tool plumbing for the common cases. + +The minimal agent is one extra line versus a markdown agent: + +```ts +import { createApp, createAgent } from "@databricks/appkit"; +import { agents, DatabricksAdapter } from "@databricks/appkit/beta"; + +await createApp({ + plugins: [ + agents({ + agents: { + assistant: createAgent({ + instructions: "You are a helpful assistant.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4-5", + }), + }), + }, + }), + ], +}); +``` + +`createAgent({ model })` already accepts adapters and adapter promises in addition to the model-name string used in earlier examples, so you can drop the factory result straight in. The factory resolves credentials through the SDK chain (`DATABRICKS_HOST`, OAuth, PAT, …); pass `workspaceClient` to reuse an existing client. + +### Hosted tools + +Expose Genie spaces, Unity Catalog functions/connections, Knowledge Assistants, or other AppKit apps to the model by declaring them as agent tools — same place every other tool is declared. Execution stays server-side; you write no tool code: + +```ts +import { createAgent } from "@databricks/appkit"; +import { + DatabricksAdapter, + supervisorTools, +} from "@databricks/appkit/beta"; + +const assistant = createAgent({ + instructions: "You are a helpful data assistant.", + model: DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4-5", + }), + tools: () => ({ + nyc: supervisorTools.genieSpace({ + id: "01ABCDEF12345678", + description: "NYC taxi trip records and zones", + }), + add: supervisorTools.ucFunction({ + name: "main.default.add", + description: "Adds two integers and returns the sum.", + }), + }), +}); +``` + +Each `supervisorTools.*` factory takes a single named-options object — routing-critical strings get labels at the call site, so positional-argument swap bugs are impossible. + +`description` is **required and non-empty** — the LLM uses it to route between tools, so two Genie spaces both labelled "Genie space" will be indistinguishable. + +:::warning Hosted-tool descriptions are trusted application configuration (CWE-1427) +A hosted tool's `description` is read by the LLM to decide when to route to that tool. **Do not derive it from untrusted input** — user messages, request bodies, freeform fields from external systems, or any value an attacker could influence. Treat `description` (and `id`/`name`) as application-controlled, alongside the agent's `instructions`. Allowing a user-controlled string here is a prompt-injection sink: a hostile description can convince the model to route to (or away from) a tool for any future request handled by the agent. + +The same caution applies to MCP `description`s and to any other field the model reads at routing time. +::: + +| Factory | Tool kind | Identifier | +|---|---|---| +| `supervisorTools.genieSpace({ id, description })` | Genie space | space id | +| `supervisorTools.ucFunction({ name, description })` | Unity Catalog function | three-part name | +| `supervisorTools.knowledgeAssistant({ knowledgeAssistantId, description })` | Knowledge Assistant | assistant id | +| `supervisorTools.app({ name, description })` | Databricks App | app name | +| `supervisorTools.ucConnection({ name, description })` | UC connection | connection name | + +### Declaring hosted tools in markdown agents + +Hosted-supervisor tools also work in markdown-driven agents: declare the tool in code (under `agents({ tools: { ... } })`) and reference its key in frontmatter: + +```ts +// server.ts +agents({ + agents: { /* ... */ }, + tools: { + nyc_taxi: supervisorTools.genieSpace({ + id: "01ABCDEF12345678", + description: "NYC taxi trip records and zones", + }), + }, +}); +``` + +```md +--- +endpoint: databricks-claude-sonnet-4-5 +tools: + - nyc_taxi +--- + +You answer questions about NYC taxi data using the Genie space. +``` + +No new frontmatter syntax — the ambient-tool lookup in `tools:` already resolves bare keys against `agents({ tools })`, and the tagged-record shape of `supervisorTools.*` lets the plugin classify them automatically. + +### What does *not* apply to Supervisor-API agents + +The managed runtime owns its own tool execution, so the adapter intentionally **ignores function tools and sub-agents from the agents-plugin tool index**. For any agent whose `model:` is a Supervisor adapter: + +- Only `supervisorTools.*` entries reach the model. Function tools (`tool({...})`), MCP hosted tools (`mcpServer(...)`), and local sub-agents (`agents: { ... }`) declared alongside a supervisor adapter will trigger a registration-time warning and **will not be exposed to the model**. The capability check fires from `consumesInputTools: false` on the adapter. +- The **human-in-the-loop approval gate** does not fire (tool calls never enter the Node process; `effect: "destructive"` annotations are irrelevant for hosted tools). +- `limits.maxToolCalls` is not enforced (the managed runtime accounts for its own calls). +- Per-call **OBO** does not apply to hosted tools; they run with the credentials the managed runtime uses for the target resource. + +### Cross-adapter sub-agent composition + +Supervisor and chat-completions adapters can both appear in the same `agents({ agents: { ... } })` map, but composition only goes one direction: + +- **Chat-completions parent → supervisor sub-agent** works natively. The parent dispatches via `agent-{key}` as a regular function tool; the child's adapter runs entirely on the AI Gateway. +- **Supervisor parent → function-tool / local sub-agent children** is not yet wired. The capability check warns at registration; those tools will not reach the supervisor model. Future work will lift this restriction by routing SA's `response.function_call` events through `context.executeTool`. + +:::note Recovery path for non-streaming tool turns +Some hosted tool kinds return their final assistant text without incremental `output_text.delta` events. The adapter has a recovery path that pulls the text out of `response.completed.output[]` so the turn is not silently empty. Set `DEBUG=appkit:agents:supervisor-api` to log the per-turn event-type histogram if you want to verify which path a turn took. +::: ## Configuration reference diff --git a/packages/appkit/src/agents/databricks.ts b/packages/appkit/src/agents/databricks.ts index 6e2e78d60..b2f50663e 100644 --- a/packages/appkit/src/agents/databricks.ts +++ b/packages/appkit/src/agents/databricks.ts @@ -373,6 +373,32 @@ export class DatabricksAdapter implements AgentAdapter { }); } + /** + * Discoverability shim for the Supervisor API adapter. Returns a + * {@link import("./supervisor-api").SupervisorApiAdapter}, NOT a + * {@link DatabricksAdapter} — the two are separate classes (different + * wire formats, different lifecycle). Surfaced here so application + * developers see a single `DatabricksAdapter.from*` autocomplete root. + * + * Dynamic-imports `./supervisor-api` to avoid forming a load-time cycle: + * both files share `connectors/serving/client.ts`. + * + * @example + * ```ts + * import { DatabricksAdapter } from "@databricks/appkit/beta"; + * + * const model = await DatabricksAdapter.fromSupervisorApi({ + * model: "databricks-claude-sonnet-4-5", + * }); + * ``` + */ + static async fromSupervisorApi( + options: import("./supervisor-api").SupervisorApiAdapterOptions, + ): Promise { + const { fromSupervisorApi } = await import("./supervisor-api"); + return fromSupervisorApi(options); + } + async *run( input: AgentInput, context: AgentRunContext, diff --git a/packages/appkit/src/agents/supervisor-api.ts b/packages/appkit/src/agents/supervisor-api.ts new file mode 100644 index 000000000..b59d2dd10 --- /dev/null +++ b/packages/appkit/src/agents/supervisor-api.ts @@ -0,0 +1,824 @@ +import type { + AgentAdapter, + AgentEvent, + AgentInput, + AgentRunContext, + Message, + ResponseStreamEvent, +} from "shared"; +import { type ApiClientLike, streamPath } from "../connectors/serving/client"; +import { createLogger } from "../logging/logger"; +import { readSseEvents } from "../stream"; + +const logger = createLogger("agents:supervisor-api"); + +/** + * Stable client-facing error codes. We never surface raw upstream error + * strings to the client (CWE-209) — the helper logs the verbose detail + * server-side and returns one of these codes in the {@link AgentEvent}. + */ +type SupervisorErrorCode = + | "transport" + | "upstream_failed" + | "upstream_tool" + | "upstream_unknown"; + +/** + * Single sink for all error events emitted by the adapter. Logs the verbose + * detail (stack, upstream payload, etc.) at `warn` level and returns a + * sanitised {@link AgentEvent} carrying only a stable code so the client + * never sees raw upstream text. + */ +function emitError(code: SupervisorErrorCode, detail: unknown): AgentEvent { + logger.warn("supervisor-api error code=%s detail=%O", code, detail); + return { + type: "status", + status: "error", + error: `Supervisor API error (${code})`, + }; +} + +/** + * Renders an upstream error / incomplete_details payload as a short + * single-line string for log lines. Avoids dumping the full JSON tree + * (CWE-532): we keep the discriminator (`type`/`code`) plus a trimmed + * message, and that's it. Full payloads are still available via + * `DEBUG=appkit:agents:supervisor-api`. + */ +function summariseErrorPayload(payload: unknown): string { + if (payload == null) return ""; + if (typeof payload === "string") { + return payload.length > 80 ? `${payload.slice(0, 80)}…` : payload; + } + if (typeof payload !== "object") return String(payload); + const obj = payload as Record; + const kind = + (typeof obj.type === "string" && obj.type) || + (typeof obj.code === "string" && obj.code) || + (typeof obj.reason === "string" && obj.reason) || + "object"; + const message = + (typeof obj.message === "string" && obj.message) || + (typeof obj.detail === "string" && obj.detail) || + ""; + const trimmed = message.length > 80 ? `${message.slice(0, 80)}…` : message; + return trimmed ? `${kind}: ${trimmed}` : kind; +} + +/** + * Transport shim: given a request body, returns the raw SSE byte stream from + * the Supervisor API endpoint. Injected at construction time so callers can + * swap in the workspace SDK (the {@link fromSupervisorApi} factory), a bare + * `fetch` (a reverse proxy / mock), or a test fake. Mirrors `StreamBody` in + * `agents/databricks.ts` so both adapters share one transport surface. + */ +type StreamBody = ( + body: Record, + signal?: AbortSignal, +) => Promise>; + +/** + * Structural shape of a Databricks SDK client used by {@link fromSupervisorApi}. + * Only what we need: `apiClient.request` for streaming and + * `config.ensureResolved` to materialise the host/credentials. + */ +interface WorkspaceClientLike extends ApiClientLike { + config: { ensureResolved(): Promise }; +} + +// --------------------------------------------------------------------------- +// Supervisor API tool surface (wire format) +// --------------------------------------------------------------------------- + +/** + * Tools supported by the Databricks AI Gateway Responses API. The shapes match + * the wire format the endpoint expects, so the adapter passes the array + * straight into the request body. + * + * This is an adapter-internal wire type. Application code authors tools via + * the {@link supervisorTools} factories, which return tagged + * {@link HostedSupervisorTool} records — the agents plugin then unwraps + * the `.spec` when routing through {@link AgentInput.extensions}. + */ +export type SupervisorTool = + | { type: "genie_space"; genie_space: { id: string; description: string } } + | { type: "uc_function"; uc_function: { name: string; description: string } } + | { + type: "knowledge_assistant"; + knowledge_assistant: { + knowledge_assistant_id: string; + description: string; + }; + } + | { type: "app"; app: { name: string; description: string } } + | { + type: "uc_connection"; + uc_connection: { name: string; description: string }; + }; + +/** + * Tagged record returned by every {@link supervisorTools} factory. The + * `__kind` discriminator lets the agents plugin (and standalone + * `runAgent`) classify these tools without a structural match against the + * wire format — keeps the SA wire shape free to evolve and avoids + * namespace collisions with MCP hosted tools (which use `type: "genie-space"` + * hyphenated, vs SA's `type: "genie_space"` underscored). + */ +export interface HostedSupervisorTool { + readonly __kind: "hosted-supervisor"; + readonly spec: SupervisorTool; +} + +/** + * Type guard for {@link HostedSupervisorTool}. Used by the agents plugin + * (`buildToolIndex`) and standalone `runAgent` (`classifyTool`) to route + * supervisor-hosted tools to the extensions payload rather than the + * adapter's `tools` array. + */ +export function isSupervisorTool( + value: unknown, +): value is HostedSupervisorTool { + return ( + typeof value === "object" && + value !== null && + (value as Record).__kind === "hosted-supervisor" + ); +} + +/** + * Concise factories for declaring Supervisor API tools. + * + * Each factory accepts a single named-options object: routing-critical + * strings (`id`, `name`, `description`) get labels at the call site so + * "we swapped the args and didn't notice for two weeks" bugs are + * impossible. + * + * `description` is required: SA's protobuf validation rejects `null`/`""`, + * AND the LLM running on SA reads this string to decide when to route to + * the tool. Two genie spaces both labelled "Genie space" give the model + * nothing to discriminate on, so callers always own the routing hint. + * + * ⚠ The `description` is read by the LLM at routing time — it is a + * prompt-injection sink. Do **not** derive it from untrusted input (user + * messages, request bodies, external systems). Treat it as application + * configuration. (CWE-1427) + * + * @example + * ```ts + * import { createAgent } from "@databricks/appkit"; + * import { + * agents, + * DatabricksAdapter, + * supervisorTools, + * } from "@databricks/appkit/beta"; + * + * const assistant = createAgent({ + * instructions: "You are a helpful assistant.", + * model: DatabricksAdapter.fromSupervisorApi({ + * model: "databricks-claude-sonnet-4", + * }), + * tools: () => ({ + * nyc: supervisorTools.genieSpace({ + * id: "01ABCDEF12345678", + * description: "NYC taxi trip records and zones", + * }), + * add: supervisorTools.ucFunction({ + * name: "main.default.add", + * description: "Adds two integers and returns the sum.", + * }), + * }), + * }); + * ``` + */ +export const supervisorTools = { + genieSpace: ({ + id, + description, + }: { + id: string; + description: string; + }): HostedSupervisorTool => ({ + __kind: "hosted-supervisor", + spec: { type: "genie_space", genie_space: { id, description } }, + }), + ucFunction: ({ + name, + description, + }: { + name: string; + description: string; + }): HostedSupervisorTool => ({ + __kind: "hosted-supervisor", + spec: { type: "uc_function", uc_function: { name, description } }, + }), + knowledgeAssistant: ({ + knowledgeAssistantId, + description, + }: { + knowledgeAssistantId: string; + description: string; + }): HostedSupervisorTool => ({ + __kind: "hosted-supervisor", + spec: { + type: "knowledge_assistant", + knowledge_assistant: { + knowledge_assistant_id: knowledgeAssistantId, + description, + }, + }, + }), + app: ({ + name, + description, + }: { + name: string; + description: string; + }): HostedSupervisorTool => ({ + __kind: "hosted-supervisor", + spec: { type: "app", app: { name, description } }, + }), + ucConnection: ({ + name, + description, + }: { + name: string; + description: string; + }): HostedSupervisorTool => ({ + __kind: "hosted-supervisor", + spec: { type: "uc_connection", uc_connection: { name, description } }, + }), +}; + +// --------------------------------------------------------------------------- +// AgentInput.extensions integration +// --------------------------------------------------------------------------- + +/** + * Namespace key under which the adapter reads its hosted-tool payload + * from {@link AgentInput.extensions}. Exported so the agents plugin and + * standalone `runAgent` (the producers) can write under the same key the + * adapter reads. + */ +export const SUPERVISOR_EXTENSION_KEY = "databricks.supervisor" as const; + +/** + * Shape of the value at `AgentInput.extensions[SUPERVISOR_EXTENSION_KEY]`. + * The agents plugin / `runAgent` build this from the tool index; advanced + * callers invoking `adapter.run(...)` directly populate it themselves. + */ +export interface SupervisorExtension { + hostedTools?: SupervisorTool[]; +} + +function readSupervisorExtension(input: AgentInput): SupervisorExtension { + const raw = input.extensions?.[SUPERVISOR_EXTENSION_KEY]; + // Single cast at the boundary. The contract on `extensions` is opaque; + // we trust the producer (agents plugin / runAgent / caller) to use the + // shape declared here. + if (!raw || typeof raw !== "object") return {}; + return raw as SupervisorExtension; +} + +// --------------------------------------------------------------------------- +// Adapter +// --------------------------------------------------------------------------- + +export interface SupervisorApiAdapterOptions { + /** + * Model identifier to pass in the request body + * (e.g. "databricks-claude-sonnet-4"). + */ + model: string; + /** + * A WorkspaceClient (or structural equivalent) used for host resolution + * and per-request authentication. When omitted, a `WorkspaceClient({})` + * is created internally using the default SDK credential chain + * (`DATABRICKS_HOST`, OAuth, PAT, etc.). + * + * ⚠ The `workspaceClient` is captured at construction and reused across + * every request. Passing a per-request OBO (On-Behalf-Of) client here + * would silently leak the first request's identity into all subsequent + * requests served by this adapter instance. Use the default credential + * chain or pass a service-principal client. (CWE-664) + */ + workspaceClient?: WorkspaceClientLike; +} + +export interface SupervisorApiAdapterCtorOptions { + streamBody: StreamBody; + model: string; +} + +/** + * Adapter that calls the Databricks AI Gateway Responses API + * (`/ai-gateway/mlflow/v1/responses`). + * + * Streams SSE events in the OpenAI Responses API wire format and maps them + * to the AppKit `AgentEvent` protocol. Tool execution is handled + * server-side, so the adapter ignores the agents-plugin tool index. + * + * Authentication is handled via the Databricks SDK credential chain — the + * same mechanism used by `DatabricksAdapter.fromModelServing`. The transport + * is injected via {@link SupervisorApiAdapterCtorOptions.streamBody}; the + * {@link fromSupervisorApi} factory wires it through the SDK's + * `apiClient.request({ raw: true })`. + * + * Set `DEBUG=appkit:agents:supervisor-api` to log the outbound request + * shape (model, instructions length, input shape, tool count) and to be + * notified when the recovery path engages (no incremental deltas, text + * pulled from `response.completed.output[]`). The no-delta warning includes + * a per-turn event-type histogram and the SA-reported status/error/ + * incomplete_details, so it's already actionable without DEBUG. + * + * Tools are not configured on the adapter. Declare them via + * `createAgent({ tools: () => ({ key: supervisorTools.genieSpace({...}) }) })` + * (or markdown frontmatter referencing an ambient `supervisorTools.*` entry); + * the agents plugin / standalone `runAgent` aggregates hosted-supervisor + * entries and routes them to the adapter via + * `AgentInput.extensions[SUPERVISOR_EXTENSION_KEY]`. Advanced callers + * invoking `adapter.run(...)` directly populate that key themselves. + * + * @example + * ```ts + * import { createApp, createAgent } from "@databricks/appkit"; + * import { + * agents, + * DatabricksAdapter, + * supervisorTools, + * } from "@databricks/appkit/beta"; + * + * await createApp({ + * plugins: [ + * agents({ + * agents: { + * assistant: createAgent({ + * instructions: "You are a helpful assistant.", + * model: DatabricksAdapter.fromSupervisorApi({ + * model: "databricks-claude-sonnet-4", + * }), + * tools: () => ({ + * nyc: supervisorTools.genieSpace({ + * id: "01ABCDEF12345678", + * description: "NYC taxi trip records and zones", + * }), + * }), + * }), + * }, + * }), + * ], + * }); + * ``` + */ +export class SupervisorApiAdapter implements AgentAdapter { + private streamBody: StreamBody; + private model: string; + + /** + * Capability negotiation: the adapter reads its hosted-tool payload + * from {@link AgentInput.extensions} under {@link SUPERVISOR_EXTENSION_KEY}. + * The agents plugin uses this list to warn at registration when the tool + * index produces extensions the adapter wouldn't consume. + */ + readonly acceptsExtensions = [SUPERVISOR_EXTENSION_KEY] as const; + + /** + * Capability negotiation: the adapter does not consume `input.tools`. + * Tool execution is owned by the Databricks AI Gateway server-side, so + * any function tools or local sub-agents declared on this agent would + * be silently dropped — the agents plugin warns at registration when + * that combination is detected. + */ + readonly consumesInputTools = false; + + constructor(options: SupervisorApiAdapterCtorOptions) { + this.streamBody = options.streamBody; + this.model = options.model; + } + + async *run( + input: AgentInput, + context: AgentRunContext, + ): AsyncGenerator { + if (context.signal?.aborted) return; + + yield { type: "status", status: "running" }; + + const { instructions, input: payloadInput } = this.buildInput( + input.messages, + ); + const hostedTools = readSupervisorExtension(input).hostedTools ?? []; + yield* this.streamResponse( + instructions, + payloadInput, + hostedTools, + context.signal, + ); + } + + private async *streamResponse( + instructions: string | undefined, + input: ResponseInput, + hostedTools: SupervisorTool[], + signal?: AbortSignal, + ): AsyncGenerator { + const body: Record = { + model: this.model, + input, + stream: true, + }; + if (instructions) { + body.instructions = instructions; + } + // SA's protobuf validation rejects `tools: []` and `tools: null`. Only + // include the field when at least one tool is configured. + if (hostedTools.length > 0) { + body.tools = hostedTools; + } + + logger.debug( + "model=%s instructionsLen=%d inputType=%s tools=%d", + this.model, + instructions?.length ?? 0, + typeof input === "string" ? "string" : `array[${input.length}]`, + hostedTools.length, + ); + + let stream: ReadableStream; + try { + stream = await this.streamBody(body, signal); + } catch (err) { + // Aborts surface as exceptions thrown by `fetch`/SDK transports when + // the consumer cancels mid-request. Treat as a clean stop so consumers + // don't see a contradictory terminal `error` after their own abort. + if (signal?.aborted) return; + yield emitError("transport", err); + return; + } + + let receivedAnyDelta = false; + // Tracks `item_id`s we've already streamed text deltas for. Used by + // `mapEvent` to fall back to the final item text on `output_item.done` + // only when no incremental deltas streamed for that item — avoids + // double-emitting text when SA does both delta and done. + const streamedItemIds = new Set(); + // Histogram of received event types — surfaced in the no-delta warning + // so it's actionable without re-running with DEBUG. + const eventCounts = new Map(); + // Set to true once we've yielded a terminal `{status:"error"}` event so + // the recovery / completion / no-delta-warning blocks below all bail + // out — the consumer's already seen the terminal status, anything + // further would contradict the protocol's terminal-event semantics. + let terminated = false; + // Diagnostic snapshot of the last `response.completed` event. SA stuffs + // the final assistant message into `response.output[]` even when it + // didn't emit any deltas (e.g. when a tool failed or the model produced + // nothing). Keeping it lets us recover the text and surface useful + // errors instead of a silent empty turn. + let lastCompleted: + | { + status?: string; + output?: Array<{ + type?: string; + content?: Array<{ type?: string; text?: string }>; + }>; + error?: unknown; + incomplete_details?: unknown; + } + | undefined; + + for await (const { event, data } of readSseEvents(stream, signal)) { + if (data === "[DONE]") continue; + + let parsed: Record; + try { + parsed = JSON.parse(data); + } catch (err) { + logger.debug( + "Failed to parse SSE data line: %s (%O)", + data.slice(0, 200), + err, + ); + continue; + } + + const eventType = event || (parsed.type as string) || ""; + eventCounts.set(eventType, (eventCounts.get(eventType) ?? 0) + 1); + + // `response.completed` is held back until after the loop so we can + // synthesise a `message_delta` from `response.output[]` when the + // stream produced no incremental deltas (intermittent SA behaviour). + // Emitting `complete` first would let UIs finalise the turn before the + // recovered text arrives. + if (eventType === "response.completed") { + lastCompleted = parsed.response as typeof lastCompleted; + continue; + } + + const out = mapEvent(eventType, parsed, streamedItemIds); + if (out) { + if (out.type === "message_delta") receivedAnyDelta = true; + yield out; + if (out.type === "status" && out.status === "error") { + terminated = true; + break; + } + } + } + + if (signal?.aborted) return; + + if (eventCounts.size === 0) { + logger.warn( + "Supervisor API stream closed without emitting any SSE events.", + ); + return; + } + + if (terminated) return; + + // Recovery path: no deltas streamed but SA finished — pull the assistant + // text out of `response.completed.response.output[]`. + if (!receivedAnyDelta) { + const recovered = extractTextFromCompletedResponse(lastCompleted); + if (recovered) { + logger.debug( + "Recovered %d chars from response.completed.output[]", + recovered.length, + ); + yield { type: "message_delta", content: recovered }; + receivedAnyDelta = true; + } + } + + if (eventCounts.has("response.completed")) { + // SA sometimes signals a failed turn via `response.completed` with a + // nested `status: "failed"` (or a populated `error`/`incomplete_details`) + // rather than emitting `response.failed`. Without this gate the + // adapter would silently yield `complete` on a server-side failure. + if ( + lastCompleted?.status === "failed" || + lastCompleted?.error != null || + lastCompleted?.incomplete_details != null + ) { + yield emitError("upstream_failed", { + status: lastCompleted?.status, + error: lastCompleted?.error, + incomplete_details: lastCompleted?.incomplete_details, + }); + return; + } + yield { type: "status", status: "complete" }; + } + + if (!receivedAnyDelta) { + const histogram = [...eventCounts.entries()] + .map(([t, n]) => `${t}=${n}`) + .join(", "); + logger.warn( + "Supervisor API stream completed without any output_text deltas. " + + "events={%s} completed.status=%s completed.error=%s completed.incomplete=%s", + histogram, + lastCompleted?.status ?? "", + summariseErrorPayload(lastCompleted?.error), + summariseErrorPayload(lastCompleted?.incomplete_details), + ); + logger.debug( + "Supervisor API no-delta full payload: error=%O incomplete=%O", + lastCompleted?.error, + lastCompleted?.incomplete_details, + ); + } + } + + /** + * Splits the agent's message list into a Responses-API payload. System + * messages are concatenated (in order) into the top-level `instructions` + * field; user/assistant turns become `input` (as a plain string for the + * common single-user-turn case, otherwise as `{role,content}[]`). Tool-role + * messages are skipped — SA owns its own tool history server-side, so + * re-feeding our tool-result records would only confuse it. + */ + private buildInput(messages: Message[]): { + instructions: string | undefined; + input: ResponseInput; + } { + const instructionsParts: string[] = []; + const turns: Array<{ + role: "user" | "assistant" | "system"; + content: string; + }> = []; + + for (const m of messages) { + if (m.role === "system") instructionsParts.push(m.content); + else if (m.role !== "tool") + turns.push({ role: m.role, content: m.content }); + } + + const instructions = instructionsParts.length + ? instructionsParts.join("\n\n") + : undefined; + + if (turns.length === 1 && turns[0].role === "user") { + return { instructions, input: turns[0].content }; + } + return { instructions, input: turns }; + } +} + +type ResponseInput = + | string + | Array<{ role: "user" | "assistant" | "system"; content: string }>; + +/** + * Pulls the final assistant text out of the `response` payload attached to a + * `response.completed` event. SA always materialises the full response there, + * so this is our last-resort recovery path when the stream produced neither + * `output_text.delta` nor an actionable `output_item.done` (observed + * intermittently with tool-enabled SA agents). + */ +function extractTextFromCompletedResponse( + response: + | { + output?: Array<{ + type?: string; + content?: Array<{ type?: string; text?: string }>; + }>; + } + | undefined, +): string { + if (!response?.output) return ""; + let text = ""; + for (const item of response.output) { + if (item?.type !== "message" || !Array.isArray(item.content)) continue; + for (const part of item.content) { + if (part?.type === "output_text" && typeof part.text === "string") { + text += part.text; + } + } + } + return text; +} + +function mapEvent( + eventType: string, + data: Record, + streamedItemIds: Set, +): AgentEvent | null { + // The cast restricts the switch domain to the closed wire-event union + // exported by `shared`, so typos in case clauses (e.g. `response.faled`) + // become compile errors instead of silent string mismatches. Unknown + // event names still fall through to `default` at runtime — we don't + // require exhaustive matching since SA emits more lifecycle events + // than we care to map. + switch (eventType as ResponseStreamEvent["type"]) { + case "response.output_text.delta": { + const itemId = data.item_id as string | undefined; + if (itemId) streamedItemIds.add(itemId); + return { type: "message_delta", content: (data.delta as string) ?? "" }; + } + + // `response.completed` is intentionally absent: `streamResponse` holds + // it back so it can synthesise a delta from `response.output[]` when + // the stream produced none, then emits `{status:"complete"}` itself. + + case "response.failed": + return emitError("upstream_failed", data); + + case "error": { + // Branch detail extraction so a missing `error` field doesn't surface + // the JSON-stringified literal `'"Unknown error"'` (with quotes) in + // server logs. The client never sees this string — `emitError` + // sanitises it to a stable code. + const detail = + typeof data.error === "string" + ? data.error + : data.error == null + ? "Unknown error" + : data.error; + return emitError("upstream_unknown", detail); + } + + case "response.output_item.done": { + const item = data.item as + | { + id?: string; + type?: string; + content?: Array<{ text?: string; type?: string }>; + } + | undefined; + + // SA's contract reserves `item.id === "error"` for tool failures, but + // a 5-char identifier collision is too small a margin. Require either + // an explicit `type === "error"` or pair the reserved id with a + // non-message type (a normal assistant message uses `type: "message"`). + if ( + item?.type === "error" || + (item?.id === "error" && item?.type !== "message") + ) { + return emitError("upstream_tool", item); + } + + // Fallback: when SA produces a tool-driven response (e.g. Genie space), + // it often omits `response.output_text.delta` events and only emits the + // final assistant message via `output_item.done`. Surface that text as + // a single delta so the UI sees the answer. + if ( + item?.type === "message" && + item.id && + !streamedItemIds.has(item.id) + ) { + const text = (item.content ?? []) + .map((c) => (c.type === "output_text" ? (c.text ?? "") : "")) + .join(""); + if (text.length > 0) { + streamedItemIds.add(item.id); + return { type: "message_delta", content: text }; + } + } + return null; + } + + // All other event types are intentionally ignored. Notable lifecycle + // events we drop on the floor: `response.created`, `response.in_progress`, + // `response.output_text.done`, `response.output_item.added`, + // `response.content_part.added`, `response.content_part.done`. + default: + return null; + } +} + +/** + * Creates an {@link AgentAdapter} backed by the Databricks AI Gateway + * Responses API (`/ai-gateway/mlflow/v1/responses`). + * + * Uses the SDK's default credential chain for auth (reads DATABRICKS_HOST, + * DATABRICKS_TOKEN, OAuth config, etc.). Tools are declared on the agent + * (via `createAgent({ tools })`), not on this factory. + * + * Application code should prefer the + * {@link DatabricksAdapter.fromSupervisorApi} static — it delegates here + * and keeps a single `DatabricksAdapter.from*` autocomplete root for all + * Databricks-backed adapters. This free function is the implementation + * behind the static and remains exported for callers that want to import + * it directly without pulling in {@link DatabricksAdapter}. + * + * @example + * ```ts + * import { createApp, createAgent } from "@databricks/appkit"; + * import { + * agents, + * DatabricksAdapter, + * supervisorTools, + * } from "@databricks/appkit/beta"; + * + * await createApp({ + * plugins: [ + * agents({ + * agents: { + * assistant: createAgent({ + * instructions: "You are a helpful assistant.", + * model: DatabricksAdapter.fromSupervisorApi({ + * model: "databricks-claude-sonnet-4", + * }), + * tools: () => ({ + * nyc: supervisorTools.genieSpace({ + * id: "01ABCDEF12345678", + * description: "NYC taxi trip records and zones", + * }), + * }), + * }), + * }, + * }), + * ], + * }); + * ``` + * + * @remarks + * ⚠ When passing your own `workspaceClient`, see the warning on + * {@link SupervisorApiAdapterOptions.workspaceClient} — the client is + * captured once and reused, so per-request OBO clients would leak + * identity across requests. + * + * @see {@link DatabricksAdapter.fromSupervisorApi} — the recommended + * application-facing entry point. + */ +export async function fromSupervisorApi( + options: SupervisorApiAdapterOptions, +): Promise { + let client = options.workspaceClient; + if (!client) { + const sdk = await import("@databricks/sdk-experimental"); + client = new sdk.WorkspaceClient({}) as unknown as WorkspaceClientLike; + } + + await client.config.ensureResolved(); + + // Capture the resolved client so the closure doesn't depend on the outer + // `let` binding being reassigned later. + const resolved = client; + return new SupervisorApiAdapter({ + streamBody: (body, signal) => + streamPath(resolved, "/ai-gateway/mlflow/v1/responses", body, signal), + model: options.model, + }); +} diff --git a/packages/appkit/src/agents/tests/supervisor-api.test.ts b/packages/appkit/src/agents/tests/supervisor-api.test.ts new file mode 100644 index 000000000..f7c69ade8 --- /dev/null +++ b/packages/appkit/src/agents/tests/supervisor-api.test.ts @@ -0,0 +1,988 @@ +import type { AgentEvent, AgentInput } from "shared"; +import { afterEach, describe, expect, test, vi } from "vitest"; +import { + fromSupervisorApi, + isSupervisorTool, + SUPERVISOR_EXTENSION_KEY, + SupervisorApiAdapter, + type SupervisorExtension, + type SupervisorTool, + supervisorTools, +} from "../supervisor-api"; + +function createReadableStream(chunks: string[]): ReadableStream { + const encoder = new TextEncoder(); + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(encoder.encode(chunks[i])); + i++; + } else { + controller.close(); + } + }, + }); +} + +function sseEvent(eventName: string, data: Record): string { + return `event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`; +} + +/** + * Captures the body the adapter posts and returns a fake stream of SSE + * chunks. Mirrors the `streamBody` test pattern used by `DatabricksAdapter`. + */ +function makeStreamBody(chunks: string[]): { + streamBody: ReturnType; + lastBody: () => Record | undefined; +} { + let captured: Record | undefined; + const streamBody = vi.fn(async (body: Record) => { + captured = body; + return createReadableStream(chunks); + }); + return { streamBody, lastBody: () => captured }; +} + +function createInput(overrides: Partial = {}): AgentInput { + return { + messages: [ + { id: "1", role: "user", content: "Hello", createdAt: new Date() }, + ], + tools: [], + threadId: "thread-1", + ...overrides, + }; +} + +/** + * Convenience to build the `extensions` payload the agents plugin / runAgent + * produce, so tests don't have to repeat the key/shape boilerplate. + */ +function withSupervisorTools( + hostedTools: SupervisorTool[], +): Pick { + const ext: SupervisorExtension = { hostedTools }; + return { extensions: { [SUPERVISOR_EXTENSION_KEY]: ext } }; +} + +async function collect( + gen: AsyncGenerator, +): Promise { + const out: AgentEvent[] = []; + for await (const e of gen) out.push(e); + return out; +} + +describe("supervisorTools factories", () => { + test("genieSpace returns a tagged record wrapping the wire spec", () => { + const tool = supervisorTools.genieSpace({ + id: "space123", + description: "NYC taxi data", + }); + expect(tool).toEqual({ + __kind: "hosted-supervisor", + spec: { + type: "genie_space", + genie_space: { id: "space123", description: "NYC taxi data" }, + }, + }); + }); + + test("ucFunction returns a tagged record wrapping the wire spec", () => { + const tool = supervisorTools.ucFunction({ + name: "main.default.add", + description: "Adds two integers.", + }); + expect(tool).toEqual({ + __kind: "hosted-supervisor", + spec: { + type: "uc_function", + uc_function: { + name: "main.default.add", + description: "Adds two integers.", + }, + }, + }); + }); + + test("knowledgeAssistant maps knowledgeAssistantId into the wire field", () => { + const tool = supervisorTools.knowledgeAssistant({ + knowledgeAssistantId: "ka-1", + description: "Internal docs Q&A", + }); + expect(tool).toEqual({ + __kind: "hosted-supervisor", + spec: { + type: "knowledge_assistant", + knowledge_assistant: { + knowledge_assistant_id: "ka-1", + description: "Internal docs Q&A", + }, + }, + }); + }); + + test("app returns a tagged record wrapping the wire spec", () => { + const tool = supervisorTools.app({ + name: "my-app", + description: "Demo Databricks app.", + }); + expect(tool).toEqual({ + __kind: "hosted-supervisor", + spec: { + type: "app", + app: { name: "my-app", description: "Demo Databricks app." }, + }, + }); + }); + + test("ucConnection returns a tagged record wrapping the wire spec", () => { + const tool = supervisorTools.ucConnection({ + name: "my-conn", + description: "Connection to external DB.", + }); + expect(tool).toEqual({ + __kind: "hosted-supervisor", + spec: { + type: "uc_connection", + uc_connection: { + name: "my-conn", + description: "Connection to external DB.", + }, + }, + }); + }); +}); + +describe("isSupervisorTool", () => { + test("accepts every supervisorTools.* factory output", () => { + expect( + isSupervisorTool( + supervisorTools.genieSpace({ id: "g", description: "d" }), + ), + ).toBe(true); + expect( + isSupervisorTool( + supervisorTools.ucFunction({ name: "main.x.y", description: "d" }), + ), + ).toBe(true); + expect( + isSupervisorTool( + supervisorTools.knowledgeAssistant({ + knowledgeAssistantId: "ka", + description: "d", + }), + ), + ).toBe(true); + expect( + isSupervisorTool(supervisorTools.app({ name: "a", description: "d" })), + ).toBe(true); + expect( + isSupervisorTool( + supervisorTools.ucConnection({ name: "c", description: "d" }), + ), + ).toBe(true); + }); + + test("rejects plain wire-format objects (no __kind tag)", () => { + const wireOnly: SupervisorTool = { + type: "genie_space", + genie_space: { id: "g", description: "d" }, + }; + expect(isSupervisorTool(wireOnly)).toBe(false); + }); + + test("rejects MCP hosted tools and other shapes", () => { + expect(isSupervisorTool({ type: "genie-space", genie_space: {} })).toBe( + false, + ); + expect(isSupervisorTool(null)).toBe(false); + expect(isSupervisorTool(undefined)).toBe(false); + expect(isSupervisorTool("hosted-supervisor")).toBe(false); + expect(isSupervisorTool({})).toBe(false); + expect(isSupervisorTool({ __kind: "function" })).toBe(false); + }); +}); + +describe("SupervisorApiAdapter", () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + test("declares capability negotiation fields (acceptsExtensions, consumesInputTools)", () => { + const adapter = new SupervisorApiAdapter({ + streamBody: vi.fn(), + model: "databricks-claude-sonnet-4", + }); + expect(adapter.acceptsExtensions).toEqual([SUPERVISOR_EXTENSION_KEY]); + expect(adapter.consumesInputTools).toBe(false); + }); + + test("posts model, input, and stream:true through streamBody", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.output_text.delta", { delta: "Hi" }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + + await collect(adapter.run(createInput(), { executeTool: vi.fn() })); + + expect(streamBody).toHaveBeenCalledTimes(1); + expect(lastBody()).toMatchObject({ + model: "databricks-claude-sonnet-4", + input: "Hello", + stream: true, + }); + // No tools wired via extensions -> no `tools` field on the wire. + expect(lastBody()).not.toHaveProperty("tools"); + }); + + test("reads hosted tools from AgentInput.extensions and posts them in the request body", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + + const genie = supervisorTools.genieSpace({ + id: "g1", + description: "Test genie space", + }); + const uc = supervisorTools.ucFunction({ + name: "main.x.add", + description: "Adds two integers.", + }); + + await collect( + adapter.run(createInput(withSupervisorTools([genie.spec, uc.spec])), { + executeTool: vi.fn(), + }), + ); + + expect(lastBody()?.tools).toEqual([genie.spec, uc.spec]); + }); + + test("ignores extensions written under a different key (key namespacing)", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + + await collect( + adapter.run( + createInput({ + extensions: { + "other.namespace": { hostedTools: [{ type: "ignored" }] }, + }, + }), + { executeTool: vi.fn() }, + ), + ); + + expect(lastBody()).not.toHaveProperty("tools"); + }); + + test("omits the tools field entirely when extensions carry an empty hostedTools array", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + await collect( + adapter.run(createInput(withSupervisorTools([])), { + executeTool: vi.fn(), + }), + ); + expect(lastBody()).not.toHaveProperty("tools"); + }); + + test("hoists system messages into the top-level instructions field", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + await collect( + adapter.run( + { + messages: [ + { + id: "s", + role: "system", + content: "Be terse.", + createdAt: new Date(), + }, + { id: "u", role: "user", content: "Hi", createdAt: new Date() }, + ], + tools: [], + threadId: "t", + }, + { executeTool: vi.fn() }, + ), + ); + const body = lastBody(); + expect(body?.instructions).toBe("Be terse."); + expect(body?.input).toBe("Hi"); + }); + + test("omits instructions when there is no system message", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + await collect(adapter.run(createInput(), { executeTool: vi.fn() })); + expect(lastBody()).not.toHaveProperty("instructions"); + }); + + test("emits message_delta and complete on the happy path", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("response.output_text.delta", { delta: "Hello" }), + sseEvent("response.output_text.delta", { delta: " world" }), + sseEvent("response.completed", {}), + ]); + + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "Hello" }, + { type: "message_delta", content: " world" }, + { type: "status", status: "complete" }, + ]); + }); + + test("maps response.failed to a sanitised status:error event", async () => { + // The verbose upstream payload must NOT reach the client (CWE-209) — + // only the stable `upstream_failed` code does. Server logs still keep + // the full detail via logger.warn. + const { streamBody } = makeStreamBody([ + sseEvent("response.failed", { + response: { error: { message: "secret-internal-stack-trace" } }, + }), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "status", + status: "error", + error: "Supervisor API error (upstream_failed)", + }); + // Belt-and-braces: the leaky upstream string is never in the event. + for (const e of events) { + if (e.type === "status" && "error" in e) { + expect(e.error).not.toContain("secret-internal-stack-trace"); + } + } + }); + + test("maps top-level error events to sanitised upstream_unknown code", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("error", { error: "rate limited (workspace abc-123)" }), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "status", + status: "error", + error: "Supervisor API error (upstream_unknown)", + }); + for (const e of events) { + if (e.type === "status" && "error" in e) { + expect(e.error).not.toContain("workspace abc-123"); + } + } + }); + + test("maps response.output_item.done error item to sanitised upstream_tool code", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("response.output_item.done", { + item: { + id: "error", + type: "error", + content: [{ text: "Tool stack trace with /home/user paths" }], + }, + }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "status", + status: "error", + error: "Supervisor API error (upstream_tool)", + }); + for (const e of events) { + if (e.type === "status" && "error" in e) { + expect(e.error).not.toContain("/home/user"); + } + } + }); + + test("does NOT treat output_item.done id:'error' as error when type:'message' (collision guard)", async () => { + // SA reserves `id === "error"` for tool failures, but the 5-char id + // could collide with a legitimately-id'd assistant message. The guard + // requires `type === "error"` (or a non-message type alongside the + // reserved id) so a stray message with id="error" is not mis-classified. + const { streamBody } = makeStreamBody([ + sseEvent("response.output_item.done", { + item: { + id: "error", + type: "message", + role: "assistant", + content: [{ type: "output_text", text: "hello from error-id msg" }], + }, + }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "hello from error-id msg" }, + { type: "status", status: "complete" }, + ]); + }); + + test("falls back to output_item.done text when no deltas streamed (tool-driven SA response)", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("response.output_item.added", { + item: { type: "message", id: "msg-1", role: "assistant", content: [] }, + }), + sseEvent("response.output_item.done", { + item: { + type: "message", + id: "msg-1", + status: "completed", + role: "assistant", + content: [{ type: "output_text", text: "Genie says hi." }], + }, + }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "Genie says hi." }, + { type: "status", status: "complete" }, + ]); + }); + + test("does not double-emit when both deltas and output_item.done arrive for the same item", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("response.output_text.delta", { + item_id: "msg-1", + delta: "Hello", + }), + sseEvent("response.output_text.delta", { + item_id: "msg-1", + delta: " world", + }), + sseEvent("response.output_item.done", { + item: { + type: "message", + id: "msg-1", + status: "completed", + role: "assistant", + content: [{ type: "output_text", text: "Hello world" }], + }, + }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "Hello" }, + { type: "message_delta", content: " world" }, + { type: "status", status: "complete" }, + ]); + }); + + test("emits sanitised transport error when the underlying streamBody throws", async () => { + const streamBody = vi + .fn() + .mockRejectedValue( + new Error( + "HTTP 500 from https://workspace-internal.foo: stack trace ...", + ), + ); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "status", + status: "error", + error: "Supervisor API error (transport)", + }); + for (const e of events) { + if (e.type === "status" && "error" in e) { + expect(e.error).not.toContain("workspace-internal.foo"); + expect(e.error).not.toContain("stack trace"); + } + } + }); + + test("does NOT emit a terminal error when the consumer aborts before streamBody resolves", async () => { + // Regression: previously the streamBody catch yielded a sanitised + // `{status:"error"}` even when the underlying rejection was an abort + // triggered by the consumer. Consumers that issued the abort must see + // a clean stop (zero further events after their abort), not a + // contradictory terminal error. + const controller = new AbortController(); + const streamBody = vi.fn(async (_body, signal?: AbortSignal) => { + controller.abort(); + // Simulate the SDK transport rejecting because the signal aborted. + // The catch path must observe `signal.aborted` and return silently. + throw new DOMException( + signal?.aborted ? "aborted" : "fetch failed", + "AbortError", + ); + }); + + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { + executeTool: vi.fn(), + signal: controller.signal, + }), + ); + + expect(events).toEqual([{ type: "status", status: "running" }]); + }); + + test("short-circuits when the signal is already aborted", async () => { + const streamBody = vi.fn(); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + + const controller = new AbortController(); + controller.abort(); + + const events = await collect( + adapter.run(createInput(), { + executeTool: vi.fn(), + signal: controller.signal, + }), + ); + + expect(events).toEqual([]); + expect(streamBody).not.toHaveBeenCalled(); + }); + + test("multi-turn input (user + assistant + user) is sent as a structured array", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + + await collect( + adapter.run( + { + messages: [ + { id: "u1", role: "user", content: "Hi", createdAt: new Date() }, + { + id: "a", + role: "assistant", + content: "Hello!", + createdAt: new Date(), + }, + { + id: "u2", + role: "user", + content: "Tell me more", + createdAt: new Date(), + }, + ], + tools: [], + threadId: "t", + }, + { executeTool: vi.fn() }, + ), + ); + + expect(lastBody()?.input).toEqual([ + { role: "user", content: "Hi" }, + { role: "assistant", content: "Hello!" }, + { role: "user", content: "Tell me more" }, + ]); + }); + + test("drops tool-role messages from the request payload", async () => { + const { streamBody, lastBody } = makeStreamBody([ + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + await collect( + adapter.run( + { + messages: [ + { id: "u", role: "user", content: "Hi", createdAt: new Date() }, + { + id: "t1", + role: "tool", + content: "(genie result)", + createdAt: new Date(), + }, + ], + tools: [], + threadId: "t", + }, + { executeTool: vi.fn() }, + ), + ); + expect(lastBody()?.input).toBe("Hi"); + }); + + test("recovers final assistant text from response.completed.output when no deltas streamed", async () => { + // Real-world flake: SA occasionally finishes a turn with zero + // `output_text.delta` events and no `output_item.done` for the message, + // but still mirrors the full assistant text in `response.completed`. + // Without recovery the UI sees a silent empty turn. + const { streamBody } = makeStreamBody([ + sseEvent("response.created", {}), + sseEvent("response.in_progress", {}), + sseEvent("response.completed", { + response: { + status: "completed", + output: [ + { + type: "message", + id: "msg-x", + role: "assistant", + content: [ + { type: "output_text", text: "Recovered " }, + { type: "output_text", text: "answer." }, + ], + }, + ], + }, + }), + ]); + + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "Recovered answer." }, + { type: "status", status: "complete" }, + ]); + }); + + test("does not recover from response.completed when deltas already streamed", async () => { + const { streamBody } = makeStreamBody([ + sseEvent("response.output_text.delta", { + item_id: "msg-x", + delta: "Hi", + }), + sseEvent("response.completed", { + response: { + status: "completed", + output: [ + { + type: "message", + id: "msg-x", + role: "assistant", + content: [{ type: "output_text", text: "Hi" }], + }, + ], + }, + }), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + const deltas = events.filter((e) => e.type === "message_delta"); + expect(deltas).toHaveLength(1); + expect(deltas[0]).toEqual({ type: "message_delta", content: "Hi" }); + }); + + test("treats response.failed as terminal: no events follow the error", async () => { + // SA may keep sending events after `response.failed` (and even a stray + // `response.completed`). The adapter must stop yielding once it has + // surfaced a terminal `status: error` so consumers don't see contradictory + // `message_delta`/`complete` events after the failure. + const { streamBody } = makeStreamBody([ + sseEvent("response.failed", {}), + sseEvent("response.output_text.delta", { delta: "ignored" }), + sseEvent("response.completed", {}), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toEqual([ + { type: "status", status: "running" }, + { + type: "status", + status: "error", + error: "Supervisor API error (upstream_failed)", + }, + ]); + }); + + test("does NOT yield complete when response.completed carries status:'failed'", async () => { + // Regression for the silent-success-on-failure bug: SA occasionally + // reports a failed turn via `response.completed.status = "failed"` + // (with optional `error`/`incomplete_details`) rather than emitting + // `response.failed`. The adapter must surface this as a terminal + // error and NOT yield `{status:"complete"}`. + const { streamBody } = makeStreamBody([ + sseEvent("response.completed", { + response: { + status: "failed", + error: { message: "tool timeout" }, + }, + }), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toEqual([ + { type: "status", status: "running" }, + { + type: "status", + status: "error", + error: "Supervisor API error (upstream_failed)", + }, + ]); + }); + + test("does NOT yield complete when response.completed carries a populated error", async () => { + // Variant: status reported as "completed" but `error` is non-null. + // Treat as a terminal failure rather than silently completing. + const { streamBody } = makeStreamBody([ + sseEvent("response.completed", { + response: { + status: "completed", + error: { code: "internal" }, + }, + }), + ]); + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "status", + status: "error", + error: "Supervisor API error (upstream_failed)", + }); + expect(events).not.toContainEqual({ + type: "status", + status: "complete", + }); + }); + + test("does not yield complete when the consumer aborts mid-stream", async () => { + // Stream that yields one delta, then waits forever — the consumer aborts + // after the first event arrives. The adapter must NOT subsequently yield + // a synthesised `complete` from a buffered `response.completed`. + const controller = new AbortController(); + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(c) { + c.enqueue( + encoder.encode( + sseEvent("response.output_text.delta", { delta: "Hi" }), + ), + ); + }, + pull() { + return new Promise(() => { + /* never resolves until cancel() */ + }); + }, + }); + + const adapter = new SupervisorApiAdapter({ + streamBody: async () => stream, + model: "databricks-claude-sonnet-4", + }); + + const events: AgentEvent[] = []; + for await (const e of adapter.run(createInput(), { + executeTool: vi.fn(), + signal: controller.signal, + })) { + events.push(e); + if (e.type === "message_delta") controller.abort(); + } + + expect(events).toEqual([ + { type: "status", status: "running" }, + { type: "message_delta", content: "Hi" }, + ]); + }); + + test("recovers when event: and data: lines arrive in separate chunks", async () => { + const { streamBody } = makeStreamBody([ + "event: response.output_text.delta\n", + `data: ${JSON.stringify({ delta: "split" })}\n\n`, + "event: response.completed\ndata: {}\n\n", + ]); + + const adapter = new SupervisorApiAdapter({ + streamBody, + model: "databricks-claude-sonnet-4", + }); + const events = await collect( + adapter.run(createInput(), { executeTool: vi.fn() }), + ); + expect(events).toContainEqual({ + type: "message_delta", + content: "split", + }); + expect(events).toContainEqual({ type: "status", status: "complete" }); + }); +}); + +describe("fromSupervisorApi", () => { + test("calls ensureResolved on the supplied workspace client", async () => { + const ensureResolved = vi.fn(async () => {}); + const adapter = await fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + workspaceClient: { + config: { ensureResolved }, + apiClient: { request: vi.fn() }, + }, + }); + expect(ensureResolved).toHaveBeenCalledTimes(1); + expect(adapter).toBeInstanceOf(SupervisorApiAdapter); + }); + + test("routes streaming through apiClient.request with the SA path", async () => { + const encoder = new TextEncoder(); + const contents = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseEvent("response.completed", {}))); + controller.close(); + }, + }); + const request = vi.fn().mockResolvedValue({ contents }); + + const adapter = await fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + workspaceClient: { + config: { ensureResolved: vi.fn(async () => {}) }, + apiClient: { request }, + }, + }); + + await collect(adapter.run(createInput(), { executeTool: vi.fn() })); + + expect(request).toHaveBeenCalledTimes(1); + const [requestArgs] = request.mock.calls[0]; + expect(requestArgs.path).toBe("/ai-gateway/mlflow/v1/responses"); + expect(requestArgs.method).toBe("POST"); + expect(requestArgs.raw).toBe(true); + expect(requestArgs.payload).toMatchObject({ + model: "databricks-claude-sonnet-4", + input: "Hello", + stream: true, + }); + expect(requestArgs.payload).not.toHaveProperty("tools"); + }); +}); + +describe("DatabricksAdapter.fromSupervisorApi", () => { + test("returns a SupervisorApiAdapter instance", async () => { + const { DatabricksAdapter } = await import("../databricks"); + const adapter = await DatabricksAdapter.fromSupervisorApi({ + model: "databricks-claude-sonnet-4", + workspaceClient: { + config: { ensureResolved: vi.fn(async () => {}) }, + apiClient: { request: vi.fn() }, + }, + }); + expect(adapter).toBeInstanceOf(SupervisorApiAdapter); + }); +}); diff --git a/packages/appkit/src/beta.ts b/packages/appkit/src/beta.ts index 3f5bba80c..74f7a5d21 100644 --- a/packages/appkit/src/beta.ts +++ b/packages/appkit/src/beta.ts @@ -19,6 +19,20 @@ export type { ToolProvider, } from "shared"; export { DatabricksAdapter, parseTextToolCalls } from "./agents/databricks"; +export type { + HostedSupervisorTool, + SupervisorApiAdapterCtorOptions, + SupervisorApiAdapterOptions, + SupervisorExtension, + SupervisorTool, +} from "./agents/supervisor-api"; +export { + fromSupervisorApi, + isSupervisorTool, + SUPERVISOR_EXTENSION_KEY, + SupervisorApiAdapter, + supervisorTools, +} from "./agents/supervisor-api"; // Agent runtime export { createAgent } from "./core/agent/create-agent"; diff --git a/packages/appkit/src/connectors/serving/client.ts b/packages/appkit/src/connectors/serving/client.ts index 83f065e69..3c556da79 100644 --- a/packages/appkit/src/connectors/serving/client.ts +++ b/packages/appkit/src/connectors/serving/client.ts @@ -41,6 +41,20 @@ function cancellationTokenFromAbortSignal( }; } +/** + * Structural shape of a Databricks SDK client we need for the low-level + * `apiClient.request` call. Lets `streamPath` be reused by adapters that + * don't want a hard dependency on the concrete `WorkspaceClient` type. + */ +export interface ApiClientLike { + apiClient: { + request( + options: Record, + context?: unknown, + ): Promise; + }; +} + /** * Invokes a serving endpoint using the SDK's high-level query API. * Returns a typed QueryEndpointResponse. @@ -62,22 +76,32 @@ export async function invoke( } /** - * Returns the raw SSE byte stream from a serving endpoint. - * No parsing is performed — bytes are passed through as-is. + * POSTs `body` as JSON to an arbitrary workspace API path and returns the raw + * SSE byte stream. No parsing is performed — bytes are passed through as-is. + * + * Uses the SDK's low-level `apiClient.request({ raw: true })` so callers + * inherit URL resolution, the SDK credential chain (PAT/OAuth/OIDC), and + * any future retries/telemetry baked into the SDK transport. * - * Uses the SDK's low-level `apiClient.request({ raw: true })` because - * the high-level `servingEndpoints.query()` returns `Promise` - * and does not support SSE streaming. + * When `signal` is provided it is bridged to the SDK's `Context` / + * `CancellationToken` so aborts cancel the outbound HTTP request. + * + * @internal + * + * Not part of the public AppKit surface. `path` is passed through to the + * SDK without any allowlist — exposing this to user-controlled input would + * turn it into workspace-credentialled SSRF (CWE-918). Internal callers + * must hard-code the path (or build it from a closed enum). New callers + * inside the package: keep this constraint, and do not re-export from + * `beta.ts` or any other entry point. */ -export async function stream( - client: WorkspaceClient, - endpointName: string, +export async function streamPath( + client: ApiClientLike, + path: string, body: Record, signal?: AbortSignal, ): Promise> { - const { stream: _stream, ...cleanBody } = body; - - logger.debug("Streaming from endpoint %s", endpointName); + logger.debug("Streaming from path %s", path); const context = signal ? new Context({ @@ -87,17 +111,17 @@ export async function stream( const response = (await client.apiClient.request( { - path: `/serving-endpoints/${encodeURIComponent(endpointName)}/invocations`, + path, method: "POST", headers: new Headers({ "Content-Type": "application/json", Accept: "text/event-stream", }), - payload: { ...cleanBody, stream: true }, + payload: body, raw: true, }, context, - )) as { contents: ReadableStream }; + )) as { contents: ReadableStream | null }; if (!response.contents) { throw new Error("Response body is null — streaming not supported"); @@ -105,3 +129,23 @@ export async function stream( return response.contents; } + +/** + * Returns the raw SSE byte stream from a serving endpoint. Thin wrapper over + * {@link streamPath} that handles serving-specific URL encoding and forces + * `stream: true` in the payload. + */ +export async function stream( + client: WorkspaceClient, + endpointName: string, + body: Record, + signal?: AbortSignal, +): Promise> { + const { stream: _stream, ...cleanBody } = body; + return streamPath( + client as unknown as ApiClientLike, + `/serving-endpoints/${encodeURIComponent(endpointName)}/invocations`, + { ...cleanBody, stream: true }, + signal, + ); +} diff --git a/packages/appkit/src/core/agent/run-agent.ts b/packages/appkit/src/core/agent/run-agent.ts index 4dd4401ac..5675edd36 100644 --- a/packages/appkit/src/core/agent/run-agent.ts +++ b/packages/appkit/src/core/agent/run-agent.ts @@ -8,6 +8,12 @@ import type { PluginData, ToolProvider, } from "shared"; +import { + isSupervisorTool, + SUPERVISOR_EXTENSION_KEY, + type SupervisorTool, +} from "../../agents/supervisor-api"; +import { createLogger } from "../../logging/logger"; import { consumeAdapterStream } from "./consume-adapter-stream"; import { createPluginsProxy } from "./plugins-map"; import { resolveToolkitFromProvider } from "./toolkit-resolver"; @@ -26,6 +32,8 @@ import type { } from "./types"; import { isToolkitEntry } from "./types"; +const logger = createLogger("agent:run-agent"); + export interface RunAgentInput { /** Seed messages for the run. Either a single user string or a full message list. */ messages: string | Message[]; @@ -102,7 +110,13 @@ async function runAgentInternal( input.plugins ?? [], providerCache, ); - const tools = Array.from(toolIndex.values()).map((e) => e.def); + // Hosted-supervisor entries are routed via `extensions`, not as callable + // tools — exclude their placeholder `def` from the wire `tools` array. + const tools = Array.from(toolIndex.values()) + .filter((e) => e.kind !== "hosted-supervisor") + .map((e) => e.def); + + warnOnCapabilityMismatch(def.name ?? "", adapter, toolIndex); const signal = input.signal; @@ -139,6 +153,15 @@ async function runAgentInternal( ); return res.text; } + if (entry.kind === "hosted-supervisor") { + // Defense-in-depth: should never fire. The placeholder def is + // filtered out of `tools` above, so the model never sees a callable + // schema for hosted-supervisor entries. If we ever reach here, the + // model was somehow handed the def and tried to invoke it directly. + throw new Error( + `runAgent: tool "${name}" is a hosted-supervisor tool, executed server-side by the Databricks AI Gateway. It must not be invoked from the Node process.`, + ); + } throw new Error( `runAgent: tool "${name}" is a ${entry.kind} tool. ` + "Hosted/MCP tools are only usable via createApp({ plugins: [..., agents(...)] }).", @@ -153,6 +176,7 @@ async function runAgentInternal( tools, threadId: randomUUID(), signal, + extensions: buildStandaloneExtensions(toolIndex), }, { executeTool, signal }, ); @@ -295,6 +319,19 @@ type StandaloneEntry = | { kind: "hosted"; def: AgentToolDefinition; + } + | { + /** + * Adapter-side hosted tool. Standalone `runAgent` accepts these + * (unlike MCP hosted tools, which need a live MCP client) because + * the adapter has everything it needs to execute them server-side: + * the spec travels via `AgentInput.extensions` and the SA endpoint + * runs the tool loop. Enables batch-eval / CI use of supervisor + * agents without `createApp`. + */ + kind: "hosted-supervisor"; + def: AgentToolDefinition; + spec: SupervisorTool; }; /** @@ -420,6 +457,23 @@ function classifyTool( def: { ...functionToolToDefinition(tool), name: key }, }; } + // Supervisor-API hosted tools work in standalone mode: the adapter + // executes them server-side via `AgentInput.extensions`, no MCP client + // required. Must come BEFORE the `isHostedTool` MCP rejection — the two + // predicates classify disjoint values (`isSupervisorTool` matches the + // `__kind` tag; `isHostedTool` matches the wire-format `type` field), + // but the placement makes the intent explicit. + if (isSupervisorTool(tool)) { + return { + kind: "hosted-supervisor", + spec: tool.spec, + def: { + name: key, + description: supervisorToolDescription(tool.spec), + parameters: { type: "object", properties: {} }, + }, + }; + } if (isHostedTool(tool)) { // Hosted tools (e.g. MCP `mcpServer(...)`) need a live MCP client that // only exists inside the agents plugin's lifecycle. In standalone @@ -433,6 +487,79 @@ function classifyTool( throw new Error(`runAgent: unrecognized tool shape at key "${key}"`); } +/** Mirrors `agents.ts`'s `supervisorToolDescription`. */ +function supervisorToolDescription(spec: SupervisorTool): string { + switch (spec.type) { + case "genie_space": + return spec.genie_space.description; + case "uc_function": + return spec.uc_function.description; + case "knowledge_assistant": + return spec.knowledge_assistant.description; + case "app": + return spec.app.description; + case "uc_connection": + return spec.uc_connection.description; + } +} + +/** Mirrors `agents.ts`'s `buildAdapterExtensions`. */ +function buildStandaloneExtensions( + toolIndex: Map, +): Readonly> | undefined { + const supervisorSpecs: SupervisorTool[] = []; + for (const entry of toolIndex.values()) { + if (entry.kind === "hosted-supervisor") { + supervisorSpecs.push(entry.spec); + } + } + if (supervisorSpecs.length === 0) return undefined; + return { + [SUPERVISOR_EXTENSION_KEY]: { hostedTools: supervisorSpecs }, + }; +} + +/** + * Mirrors the agents-plugin capability warning so standalone `runAgent` + * produces the same diagnostic when adapter capabilities don't match the + * tool index. Warn-not-throw: doesn't abort batch evals. + */ +function warnOnCapabilityMismatch( + agentName: string, + adapter: AgentAdapter, + toolIndex: Map, +): void { + const accepted = new Set(adapter.acceptsExtensions ?? []); + + const hostedSupervisorKeys: string[] = []; + const inputToolKeys: string[] = []; + for (const [key, entry] of toolIndex) { + if (entry.kind === "hosted-supervisor") { + hostedSupervisorKeys.push(key); + } else { + inputToolKeys.push(key); + } + } + + if ( + hostedSupervisorKeys.length > 0 && + !accepted.has(SUPERVISOR_EXTENSION_KEY) + ) { + logger.warn( + `Agent '${agentName}' declares hosted-supervisor tools (${hostedSupervisorKeys.join(", ")}) ` + + "but its model adapter does not accept the 'databricks.supervisor' extension. " + + "Pair them with `DatabricksAdapter.fromSupervisorApi(...)`, or remove them.", + ); + } + + if (adapter.consumesInputTools === false && inputToolKeys.length > 0) { + logger.warn( + `Agent '${agentName}' declares function tools / sub-agents (${inputToolKeys.join(", ")}) ` + + "but its model adapter does not consume input.tools. These tools will not be exposed to the model.", + ); + } +} + function providerCacheLookup( pluginName: string, cache: Map, diff --git a/packages/appkit/src/core/agent/tests/run-agent.test.ts b/packages/appkit/src/core/agent/tests/run-agent.test.ts index 4d60a8c96..efd3e4202 100644 --- a/packages/appkit/src/core/agent/tests/run-agent.test.ts +++ b/packages/appkit/src/core/agent/tests/run-agent.test.ts @@ -434,4 +434,119 @@ describe("runAgent", () => { // Both parent and child reported the same instance id. expect(result.text).toBe("parent-id=1;child-id=1"); }); + + test("hosted-supervisor tools are routed via AgentInput.extensions and filtered out of input.tools", async () => { + // Standalone runAgent must accept Supervisor-API hosted tools — unlike + // MCP hosted tools (which need a live MCP client). The tagged record + // gets classified as `hosted-supervisor`; its placeholder def is kept + // out of `input.tools` (the spec doesn't expose a callable function) + // and the spec is routed via `input.extensions[SUPERVISOR_EXTENSION_KEY]`. + const { supervisorTools, SUPERVISOR_EXTENSION_KEY } = await import( + "../../../agents/supervisor-api" + ); + + let captured: AgentInput | null = null; + const adapter: AgentAdapter = { + acceptsExtensions: [SUPERVISOR_EXTENSION_KEY], + consumesInputTools: false, + async *run(input, _context) { + captured = input; + yield { type: "message_delta", content: "ok" }; + }, + }; + + const def = createAgent({ + instructions: "x", + model: adapter, + tools: { + nyc: supervisorTools.genieSpace({ + id: "01ABC", + description: "NYC taxi", + }), + }, + }); + + const result = await runAgent(def, { messages: "hi" }); + expect(result.text).toBe("ok"); + + expect(captured).not.toBeNull(); + // biome-ignore lint/style/noNonNullAssertion: asserted above + const inp = captured!; + expect(inp.tools).toEqual([]); + expect(inp.extensions?.[SUPERVISOR_EXTENSION_KEY]).toEqual({ + hostedTools: [ + { + type: "genie_space", + genie_space: { id: "01ABC", description: "NYC taxi" }, + }, + ], + }); + }); + + test("warns when hosted-supervisor tools are paired with an adapter that does not accept the extension", async () => { + const { supervisorTools } = await import("../../../agents/supervisor-api"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + const adapter: AgentAdapter = { + // No `acceptsExtensions` declared. + async *run(_input, _context) { + yield { type: "message_delta", content: "" }; + }, + }; + + const def = createAgent({ + name: "mismatched", + instructions: "x", + model: adapter, + tools: { + nyc: supervisorTools.genieSpace({ + id: "01ABC", + description: "NYC taxi", + }), + }, + }); + + await runAgent(def, { messages: "hi" }); + + const warning = warnSpy.mock.calls + .map((args) => args.join(" ")) + .find((s) => s.includes("hosted-supervisor")); + expect(warning).toBeTruthy(); + expect(warning).toContain("'mismatched'"); + warnSpy.mockRestore(); + }); + + test("warns when function tools are paired with an adapter that opts out of input.tools", async () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + const adapter: AgentAdapter = { + consumesInputTools: false, + async *run(_input, _context) { + yield { type: "message_delta", content: "" }; + }, + }; + + const def = createAgent({ + name: "leaky", + instructions: "x", + model: adapter, + tools: { + get_weather: tool({ + name: "get_weather", + description: "Weather", + schema: z.object({ city: z.string() }), + execute: async () => "sunny", + }), + }, + }); + + await runAgent(def, { messages: "hi" }); + + const warning = warnSpy.mock.calls + .map((args) => args.join(" ")) + .find((s) => s.includes("does not consume input.tools")); + expect(warning).toBeTruthy(); + expect(warning).toContain("'leaky'"); + warnSpy.mockRestore(); + }); }); diff --git a/packages/appkit/src/core/agent/types.ts b/packages/appkit/src/core/agent/types.ts index cf47845f7..eccac82b4 100644 --- a/packages/appkit/src/core/agent/types.ts +++ b/packages/appkit/src/core/agent/types.ts @@ -32,10 +32,15 @@ export interface ToolkitEntry { /** * Any tool an agent can invoke: inline function tools (`tool()`), hosted MCP - * tools (`mcpServer()` / raw hosted), or toolkit references from plugins - * (`analytics().toolkit()`). + * tools (`mcpServer()` / raw hosted), toolkit references from plugins + * (`analytics().toolkit()`), or adapter-hosted Supervisor-API tools + * (`supervisorTools.*`). */ -export type AgentTool = FunctionTool | HostedTool | ToolkitEntry; +export type AgentTool = + | FunctionTool + | HostedTool + | ToolkitEntry + | import("../../agents/supervisor-api").HostedSupervisorTool; export interface ToolkitOptions { /** Key prefix to prepend to each tool's local name. Defaults to `${pluginName}.`. */ @@ -299,6 +304,21 @@ export type ResolvedToolEntry = source: "subagent"; agentName: string; def: AgentToolDefinition; + } + | { + /** + * Adapter-side hosted tool (executed by the model-host, not by the + * Node process). Today: Supervisor API hosted tools (Genie spaces, + * UC functions, etc.). The `spec` is opaque to the agents plugin — + * it routes the entry into `AgentInput.extensions` for the adapter + * that declared the matching `acceptsExtensions` key. `def` is a + * synthetic placeholder kept so the index has a uniform shape; it + * is intentionally NOT included in the `tools` array passed to + * `adapter.run()` (those entries are not callable functions). + */ + source: "hosted-supervisor"; + spec: import("../../agents/supervisor-api").SupervisorTool; + def: AgentToolDefinition; }; export interface RegisteredAgent { diff --git a/packages/appkit/src/plugins/agents/agents.ts b/packages/appkit/src/plugins/agents/agents.ts index 40217e54e..a690ece05 100644 --- a/packages/appkit/src/plugins/agents/agents.ts +++ b/packages/appkit/src/plugins/agents/agents.ts @@ -15,6 +15,11 @@ import type { ToolAnnotations, ToolProvider, } from "shared"; +import { + isSupervisorTool, + SUPERVISOR_EXTENSION_KEY, + type SupervisorTool, +} from "../../agents/supervisor-api"; import { AppKitMcpClient, buildMcpHostPolicy } from "../../connectors/mcp"; import { getWorkspaceClient } from "../../context"; import { consumeAdapterStream } from "../../core/agent/consume-adapter-stream"; @@ -437,6 +442,8 @@ export class AgentsPlugin extends Plugin implements ToolProvider { const adapter = await this.resolveAdapter(def, name); const toolIndex = await this.buildToolIndex(name, def, src); + warnOnCapabilityMismatch(name, adapter, toolIndex); + return { name, instructions: def.instructions, @@ -556,6 +563,22 @@ export class AgentsPlugin extends Plugin implements ToolProvider { }); continue; } + if (isSupervisorTool(tool)) { + index.set(key, { + source: "hosted-supervisor", + spec: tool.spec, + def: { + // `def` is a placeholder so the index has a uniform shape; it + // is intentionally not passed to the adapter's `tools` array + // (the SA endpoint owns its own tool execution and would + // reject our synthetic schema). + name: key, + description: supervisorToolDescription(tool.spec), + parameters: { type: "object", properties: {} }, + }, + }); + continue; + } if (isHostedTool(tool)) { hostedToCollect.push(tool); continue; @@ -977,7 +1000,14 @@ export class AgentsPlugin extends Plugin implements ToolProvider { const requestId = randomUUID(); this.trackStream(requestId, userId, abortController); - const tools = Array.from(registered.toolIndex.values()).map((e) => e.def); + // `hosted-supervisor` entries are not callable from the Node process + // (the SA endpoint executes them server-side). Their `def` is a + // placeholder; including it in the adapter's `tools` array would + // make the SA endpoint reject the request with a schema mismatch. + // The hosted-tool specs are routed via `AgentInput.extensions` below. + const tools = Array.from(registered.toolIndex.values()) + .filter((e) => e.source !== "hosted-supervisor") + .map((e) => e.def); const approvalPolicy = this.resolvedApprovalPolicy; const limits = this.resolvedLimits; const outboundEvents = new EventChannel(); @@ -1047,6 +1077,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider { tools, threadId: thread.id, signal, + extensions: buildAdapterExtensions(registered.toolIndex), }, { executeTool, signal }, ); @@ -1224,6 +1255,18 @@ export class AgentsPlugin extends Plugin implements ToolProvider { if (!childAgent) throw new Error(`Sub-agent not found: ${entry.agentName}`); result = await this.runSubAgent(runState, childAgent, args, depth + 1); + } else if (entry.source === "hosted-supervisor") { + // Defense-in-depth: should never fire. Hosted-supervisor entries are + // routed via `AgentInput.extensions` and the SA endpoint executes + // them server-side; their `def` is filtered out of the adapter's + // `tools` array, so the model never sees a callable schema for them. + // If we reach here, the agent is paired with a non-SA adapter that + // somehow surfaced the placeholder def to the model — surface a + // clear error rather than crash later in `normalizeToolResult`. + throw new Error( + `Tool '${name}' is a hosted-supervisor tool and cannot be invoked from the Node process. ` + + "It is executed server-side by the Databricks AI Gateway and is only reachable when the agent's model is a Supervisor API adapter.", + ); } return normalizeToolResult(result); @@ -1262,7 +1305,12 @@ export class AgentsPlugin extends Plugin implements ToolProvider { typeof (args as { input?: unknown }).input === "string" ? (args as { input: string }).input : JSON.stringify(args); - const childTools = Array.from(child.toolIndex.values()).map((e) => e.def); + // Same filter as the top-level path: hosted-supervisor `def` is a + // placeholder, not a callable function — exclude from the adapter's + // `tools` array. The specs are routed via `extensions` instead. + const childTools = Array.from(child.toolIndex.values()) + .filter((e) => e.source !== "hosted-supervisor") + .map((e) => e.def); const childExecute = (name: string, childArgs: unknown): Promise => this.dispatchToolCall(runState, child.toolIndex, name, childArgs, depth); @@ -1309,6 +1357,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider { tools: childTools, threadId: randomUUID(), signal: runState.signal, + extensions: buildAdapterExtensions(child.toolIndex), }, runContext, ), @@ -1525,6 +1574,93 @@ function composePromptForAgent( return composeSystemPrompt(base, registered.instructions); } +/** + * Pulls the LLM-readable description off any {@link SupervisorTool} kind. + * Used to populate the synthetic placeholder `def.description` on + * hosted-supervisor tool-index entries. + */ +function supervisorToolDescription(spec: SupervisorTool): string { + switch (spec.type) { + case "genie_space": + return spec.genie_space.description; + case "uc_function": + return spec.uc_function.description; + case "knowledge_assistant": + return spec.knowledge_assistant.description; + case "app": + return spec.app.description; + case "uc_connection": + return spec.uc_connection.description; + } +} + +/** + * Builds the `AgentInput.extensions` payload from a tool index, aggregating + * the hosted-supervisor specs under {@link SUPERVISOR_EXTENSION_KEY}. Returns + * `undefined` when there are no adapter-side hosted tools so the field stays + * absent on the wire — adapters that don't read extensions never see it. + */ +function buildAdapterExtensions( + toolIndex: Map, +): Readonly> | undefined { + const supervisorSpecs: SupervisorTool[] = []; + for (const entry of toolIndex.values()) { + if (entry.source === "hosted-supervisor") { + supervisorSpecs.push(entry.spec); + } + } + if (supervisorSpecs.length === 0) return undefined; + return { + [SUPERVISOR_EXTENSION_KEY]: { hostedTools: supervisorSpecs }, + }; +} + +/** + * Compares the adapter's declared capabilities against the tool index and + * logs a warning when the agent's tool declarations would be silently + * dropped at runtime. Warn-not-throw: misconfiguration is loud enough to + * notice without taking the whole app down. + */ +function warnOnCapabilityMismatch( + agentName: string, + adapter: AgentAdapter, + toolIndex: Map, +): void { + const accepted = new Set(adapter.acceptsExtensions ?? []); + + const hostedSupervisorKeys: string[] = []; + const inputToolKeys: string[] = []; + for (const [key, entry] of toolIndex) { + if (entry.source === "hosted-supervisor") { + hostedSupervisorKeys.push(key); + } else { + inputToolKeys.push(key); + } + } + + if ( + hostedSupervisorKeys.length > 0 && + !accepted.has(SUPERVISOR_EXTENSION_KEY) + ) { + logger.warn( + `Agent '${agentName}' declares hosted-supervisor tools (${hostedSupervisorKeys.join(", ")}) ` + + "but its model adapter does not accept the 'databricks.supervisor' extension. " + + "These tools will not reach the model. Pair them with `DatabricksAdapter.fromSupervisorApi(...)`, or remove them.", + ); + } + + // `consumesInputTools` defaults to true. Only warn when an adapter + // explicitly opts out (`false`) and an input tool would be silently + // ignored. + if (adapter.consumesInputTools === false && inputToolKeys.length > 0) { + logger.warn( + `Agent '${agentName}' declares function tools / sub-agents / MCP tools (${inputToolKeys.join(", ")}) ` + + "but its model adapter does not consume input.tools (Supervisor API owns its own tool loop). " + + "These tools will not be exposed to the model. See docs/plugins/agents.md.", + ); + } +} + /** * Plugin factory for the agents plugin. Reads `config/agents/*.md` by default, * resolves toolkits/tools from registered plugins, exposes `appkit.agents.*` diff --git a/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts b/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts index c654e477f..7f6b453b4 100644 --- a/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts +++ b/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts @@ -665,4 +665,128 @@ describe("AgentsPlugin", () => { expect(toolsFn).toHaveBeenCalledTimes(1); }); }); + + describe("hosted-supervisor tools and capability negotiation", () => { + test("indexes supervisorTools.* entries with source 'hosted-supervisor'", async () => { + const { supervisorTools, SUPERVISOR_EXTENSION_KEY } = await import( + "../../../agents/supervisor-api" + ); + const ctx = fakeContext([]); + + const saAdapter: AgentAdapter = { + acceptsExtensions: [SUPERVISOR_EXTENSION_KEY], + consumesInputTools: false, + async *run(_input, _ctx) { + yield { type: "message_delta", content: "" }; + }, + }; + + const plugin = instantiate( + { + dir: false, + agents: { + assistant: { + instructions: "x", + model: saAdapter, + tools: { + nyc: supervisorTools.genieSpace({ + id: "01ABC", + description: "NYC taxi", + }), + }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const api = plugin.exports() as { + // biome-ignore lint/suspicious/noExplicitAny: structural test access + get: (name: string) => any; + }; + const entry = api.get("assistant").toolIndex.get("nyc"); + expect(entry.source).toBe("hosted-supervisor"); + expect(entry.spec).toEqual({ + type: "genie_space", + genie_space: { id: "01ABC", description: "NYC taxi" }, + }); + }); + + test("warns at setup when hosted-supervisor tools paired with non-supervisor adapter", async () => { + const { supervisorTools } = await import( + "../../../agents/supervisor-api" + ); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const ctx = fakeContext([]); + + const plugin = instantiate( + { + dir: false, + agents: { + mismatched: { + instructions: "x", + model: stubAdapter(), // does NOT declare acceptsExtensions + tools: { + nyc: supervisorTools.genieSpace({ + id: "01ABC", + description: "NYC taxi", + }), + }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const warning = warnSpy.mock.calls + .map((args) => args.join(" ")) + .find((s) => s.includes("hosted-supervisor")); + expect(warning).toBeTruthy(); + expect(warning).toContain("'mismatched'"); + warnSpy.mockRestore(); + }); + + test("warns at setup when function tools paired with consumesInputTools:false adapter", async () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const ctx = fakeContext([]); + + const saLikeAdapter: AgentAdapter = { + consumesInputTools: false, + async *run(_input, _ctx) { + yield { type: "message_delta", content: "" }; + }, + }; + + const plugin = instantiate( + { + dir: false, + agents: { + leaky: { + instructions: "x", + model: saLikeAdapter, + tools: { + weather: tool({ + name: "weather", + description: "w", + schema: z.object({ city: z.string() }), + execute: async () => "sunny", + }), + }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const warning = warnSpy.mock.calls + .map((args) => args.join(" ")) + .find((s) => s.includes("does not consume input.tools")); + expect(warning).toBeTruthy(); + expect(warning).toContain("'leaky'"); + warnSpy.mockRestore(); + }); + }); }); diff --git a/packages/appkit/src/stream/index.ts b/packages/appkit/src/stream/index.ts index cc756130a..75ad8b5c4 100644 --- a/packages/appkit/src/stream/index.ts +++ b/packages/appkit/src/stream/index.ts @@ -1 +1,2 @@ +export { readSseEvents } from "./sse-reader"; export { StreamManager } from "./stream-manager"; diff --git a/packages/appkit/src/stream/sse-reader.ts b/packages/appkit/src/stream/sse-reader.ts new file mode 100644 index 000000000..f80f0738e --- /dev/null +++ b/packages/appkit/src/stream/sse-reader.ts @@ -0,0 +1,177 @@ +/** + * One parsed Server-Sent Event. Field names follow the spec: + * https://html.spec.whatwg.org/multipage/server-sent-events.html + * + * The reader does not interpret `data` (no JSON parsing), so callers control + * the wire shape they expect. + */ +export interface SseEvent { + /** Value of the most recent `event:` field, or `""` for an unnamed event. */ + event: string; + /** Joined `data:` lines for the event (empty string when no data was set). */ + data: string; + /** Value of the most recent `id:` field, or `undefined` if none. */ + id?: string; +} + +/** + * Configuration for {@link readSseEvents}. All limits are in UTF-16 code + * units (JS string `.length`) and exist as a DoS guard (CWE-770) for + * untrusted upstreams that might stream arbitrarily large lines or never + * emit a block terminator. + */ +interface ReadSseEventsOptions { + /** + * Maximum length of any single SSE event block (i.e. the text between + * two `\n\n` separators). Exceeding this throws. + * + * @default 1 MiB (1_048_576) + */ + maxLineChars?: number; + /** + * Maximum length of the rolling input buffer when no block terminator + * has been seen yet. Exceeding this throws — protects against an + * upstream that streams indefinitely without ever sending `\n\n`. + * + * @default 8 MiB (8_388_608) + */ + maxBufferChars?: number; +} + +const DEFAULT_MAX_SSE_LINE_CHARS = 1024 * 1024; +const DEFAULT_MAX_SSE_BUFFER_CHARS = 8 * 1024 * 1024; + +/** + * Async-iterates Server-Sent Events from a UTF-8 byte stream. + * + * Block-oriented parser: events are delimited by blank lines (`\n\n` after + * CRLF normalization), so an `event:` line in chunk N pairs correctly with a + * `data:` line in chunk N+1 — no hoisted state needed. + * + * The reader passes through the sentinel string `[DONE]` as `event=""`, + * `data="[DONE]"`. Callers that care about it should match `data === "[DONE]"` + * after destructuring. + * + * Terminates when the stream closes or `signal` aborts; releases the reader + * lock in either case. Throws when {@link ReadSseEventsOptions.maxLineChars} + * or {@link ReadSseEventsOptions.maxBufferChars} are exceeded. + */ +export async function* readSseEvents( + stream: ReadableStream, + signal?: AbortSignal, + options?: ReadSseEventsOptions, +): AsyncGenerator { + const maxLineChars = options?.maxLineChars ?? DEFAULT_MAX_SSE_LINE_CHARS; + const maxBufferChars = + options?.maxBufferChars ?? DEFAULT_MAX_SSE_BUFFER_CHARS; + + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + // Cancel the reader on abort so an in-flight `reader.read()` returns + // immediately instead of waiting for the next chunk. Without this, an + // aborted consumer would only notice between reads — fine for chatty + // streams, but unbounded for an idle/heartbeat-less upstream. + const onAbort = () => { + reader.cancel().catch(() => { + // `cancel()` rejects if the stream is already errored/closed; ignore. + }); + }; + if (signal) { + if (signal.aborted) onAbort(); + else signal.addEventListener("abort", onAbort, { once: true }); + } + + try { + while (true) { + if (signal?.aborted) break; + const { done, value } = await reader.read(); + if (done) { + if (buffer.length > maxLineChars) { + throw new Error( + `readSseEvents: trailing SSE block exceeds maxLineChars (${maxLineChars} UTF-16 code units)`, + ); + } + const tail = parseSseBlock(buffer); + if (tail) yield tail; + break; + } + + buffer += decoder.decode(value, { stream: true }); + + // Gate the CRLF normalize on `\r` presence — saves a full-buffer + // regex scan on every chunk for the common LF-only steady state. + const normalized = + buffer.indexOf("\r") !== -1 ? buffer.replace(/\r\n/g, "\n") : buffer; + const blocks = normalized.split("\n\n"); + // Last entry is either an incomplete block or "" (when the chunk ended + // exactly on a boundary). Either way, keep it for the next iteration. + buffer = blocks.pop() ?? ""; + + if (buffer.length > maxBufferChars) { + throw new Error( + `readSseEvents: incomplete SSE block exceeds maxBufferChars (${maxBufferChars} UTF-16 code units) without a terminator`, + ); + } + + for (const block of blocks) { + if (block.length > maxLineChars) { + throw new Error( + `readSseEvents: SSE block exceeds maxLineChars (${maxLineChars} UTF-16 code units)`, + ); + } + const event = parseSseBlock(block); + if (event) yield event; + } + } + } finally { + if (signal) signal.removeEventListener("abort", onAbort); + reader.releaseLock(); + } +} + +/** + * Per the SSE spec, only a single leading `U+0020` is stripped from a field + * value — not arbitrary whitespace. `trimStart()` would also strip tabs, + * NBSP, etc.; for callers that feed binary or whitespace-prefixed payloads + * this is a footgun. + */ +function stripOneLeadingSpace(s: string): string { + return s.startsWith(" ") ? s.slice(1) : s; +} + +function parseSseBlock(block: string): SseEvent | null { + if (block.length === 0) return null; + // CRLF was already normalised at the buffer level, so each `line` here is + // already free of trailing `\r` — no per-line strip needed. + const lines = block.split("\n"); + + let eventName = ""; + let id: string | undefined; + const dataLines: string[] = []; + + for (const line of lines) { + if (line === "" || line.startsWith(":")) continue; + + if (line.startsWith("event:")) { + eventName = stripOneLeadingSpace(line.slice(6)); + } else if (line.startsWith("data:")) { + dataLines.push(stripOneLeadingSpace(line.slice(5))); + } else if (line.startsWith("id:")) { + id = stripOneLeadingSpace(line.slice(3)); + } + // Other fields (`retry:`, custom) are ignored by design. + } + + // Per the SSE spec, a block is only dispatched when the data buffer is + // non-empty. Blocks containing only `event:`/`id:` (or comments) do not + // surface as events. + if (dataLines.length === 0) return null; + + return { + event: eventName, + data: dataLines.join("\n"), + id, + }; +} diff --git a/packages/appkit/src/stream/tests/sse-reader.test.ts b/packages/appkit/src/stream/tests/sse-reader.test.ts new file mode 100644 index 000000000..d83ba26f2 --- /dev/null +++ b/packages/appkit/src/stream/tests/sse-reader.test.ts @@ -0,0 +1,235 @@ +import { describe, expect, test } from "vitest"; +import { readSseEvents, type SseEvent } from "../sse-reader"; + +function streamOf(chunks: string[]): ReadableStream { + const encoder = new TextEncoder(); + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(encoder.encode(chunks[i])); + i++; + } else { + controller.close(); + } + }, + }); +} + +async function collect( + gen: AsyncGenerator, +): Promise { + const out: SseEvent[] = []; + for await (const e of gen) out.push(e); + return out; +} + +describe("readSseEvents", () => { + test("parses a single named event with JSON data", async () => { + const events = await collect( + readSseEvents( + streamOf(['event: response.completed\ndata: {"ok":true}\n\n']), + ), + ); + expect(events).toEqual([ + { event: "response.completed", data: '{"ok":true}', id: undefined }, + ]); + }); + + test("pairs event: and data: across chunk boundaries", async () => { + const events = await collect( + readSseEvents( + streamOf([ + "event: response.output_text.delta\n", + 'data: {"delta":"split"}\n\n', + ]), + ), + ); + expect(events).toEqual([ + { + event: "response.output_text.delta", + data: '{"delta":"split"}', + id: undefined, + }, + ]); + }); + + test("ignores blank lines, comment lines, and unknown fields", async () => { + const events = await collect( + readSseEvents( + streamOf([": heartbeat\n\nretry: 1000\nevent: ping\ndata: hi\n\n"]), + ), + ); + expect(events).toEqual([{ event: "ping", data: "hi", id: undefined }]); + }); + + test("captures id: when present", async () => { + const events = await collect( + readSseEvents(streamOf(["id: abc-123\nevent: ping\ndata: hi\n\n"])), + ); + expect(events).toEqual([{ event: "ping", data: "hi", id: "abc-123" }]); + }); + + test("falls back to empty event name when only data: is present", async () => { + const events = await collect(readSseEvents(streamOf(["data: 1\n\n"]))); + expect(events).toEqual([{ event: "", data: "1", id: undefined }]); + }); + + test("joins multi-line data: payloads with \\n", async () => { + const events = await collect( + readSseEvents(streamOf(["data: line1\ndata: line2\n\n"])), + ); + expect(events).toEqual([ + { event: "", data: "line1\nline2", id: undefined }, + ]); + }); + + test("normalises CRLF line endings", async () => { + const events = await collect( + readSseEvents(streamOf(["event: x\r\ndata: y\r\n\r\n"])), + ); + expect(events).toEqual([{ event: "x", data: "y", id: undefined }]); + }); + + test("emits a trailing event when the stream closes without a final blank line", async () => { + const events = await collect( + readSseEvents(streamOf(["event: ping\ndata: hi"])), + ); + expect(events).toEqual([{ event: "ping", data: "hi", id: undefined }]); + }); + + test("passes through [DONE] sentinels as data", async () => { + const events = await collect(readSseEvents(streamOf(["data: [DONE]\n\n"]))); + expect(events).toEqual([{ event: "", data: "[DONE]", id: undefined }]); + }); + + test("aborts when the signal fires before the next read", async () => { + const controller = new AbortController(); + let pulls = 0; + const stream = new ReadableStream({ + pull(c) { + pulls++; + if (pulls === 1) { + c.enqueue(new TextEncoder().encode("event: a\ndata: 1\n\n")); + } else { + controller.abort(); + c.enqueue(new TextEncoder().encode("event: b\ndata: 2\n\n")); + } + }, + }); + + const out: SseEvent[] = []; + for await (const e of readSseEvents(stream, controller.signal)) { + out.push(e); + if (out.length === 1) controller.abort(); + } + expect(out.map((e) => e.event)).toEqual(["a"]); + }); + + test("aborts an idle reader immediately via reader.cancel()", async () => { + // Stream that sends one event then never resolves further reads — models + // an upstream that has stopped sending data. Without `reader.cancel()` + // the consumer would block forever after aborting. + const controller = new AbortController(); + let cancelled = false; + const stream = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode("event: a\ndata: 1\n\n")); + }, + pull() { + return new Promise(() => { + /* never resolves */ + }); + }, + cancel() { + cancelled = true; + }, + }); + + const out: SseEvent[] = []; + const iterator = readSseEvents(stream, controller.signal); + const first = await iterator.next(); + if (!first.done) out.push(first.value); + controller.abort(); + const second = await iterator.next(); + expect(second.done).toBe(true); + expect(out.map((e) => e.event)).toEqual(["a"]); + expect(cancelled).toBe(true); + }); + + test("does not dispatch a block whose only field is id: (spec compliance)", async () => { + const events = await collect( + readSseEvents(streamOf(["id: only\n\nevent: ping\ndata: hi\n\n"])), + ); + expect(events).toEqual([{ event: "ping", data: "hi", id: undefined }]); + }); + + test("decodes a multi-byte UTF-8 character split across chunks", async () => { + const checkBytes = new TextEncoder().encode("✓"); + expect(checkBytes.length).toBe(3); + const stream = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode("data: ")); + c.enqueue(checkBytes.subarray(0, 1)); + c.enqueue(checkBytes.subarray(1)); + c.enqueue(new TextEncoder().encode("\n\n")); + c.close(); + }, + }); + const events = await collect(readSseEvents(stream)); + expect(events).toEqual([{ event: "", data: "✓", id: undefined }]); + }); + + test("throws when a single block exceeds maxLineChars (DoS guard)", async () => { + // A complete block whose total length exceeds the cap must throw rather + // than silently propagate to the consumer — protects callers from + // upstreams that stream arbitrarily large payloads (CWE-770). + const huge = `data: ${"x".repeat(200)}\n\n`; + await expect(async () => { + for await (const _ of readSseEvents(streamOf([huge]), undefined, { + maxLineChars: 100, + })) { + /* iterate */ + } + }).rejects.toThrow(/exceeds maxLineChars/); + }); + + test("throws when the rolling buffer exceeds maxBufferChars without a terminator", async () => { + // An upstream that streams forever without ever sending the `\n\n` + // block separator must not grow the buffer unboundedly — throw once + // the cap is exceeded. + const stream = new ReadableStream({ + pull(c) { + c.enqueue(new TextEncoder().encode("x".repeat(50))); + // No close() — keep feeding until the cap fires. + }, + }); + await expect(async () => { + for await (const _ of readSseEvents(stream, undefined, { + maxBufferChars: 200, + maxLineChars: 10_000, + })) { + /* iterate */ + } + }).rejects.toThrow(/exceeds maxBufferChars/); + }); + + test("strips only a single leading U+0020 from field values (spec compliance)", async () => { + // `trimStart()` would strip tabs / NBSP / multi-space prefixes, which + // is wrong per the SSE spec — only one leading U+0020 may be removed. + const events = await collect( + readSseEvents(streamOf(["data: with-leading-spaces\n\n"])), + ); + // First space is stripped; second is preserved. + expect(events).toEqual([ + { event: "", data: " with-leading-spaces", id: undefined }, + ]); + }); + + test("preserves tab-prefixed data values (trimStart would have stripped)", async () => { + const events = await collect( + readSseEvents(streamOf(["data:\t\tvalue\n\n"])), + ); + expect(events).toEqual([{ event: "", data: "\t\tvalue", id: undefined }]); + }); +}); diff --git a/packages/shared/src/agent.ts b/packages/shared/src/agent.ts index 6486b1b29..5ec2caf35 100644 --- a/packages/shared/src/agent.ts +++ b/packages/shared/src/agent.ts @@ -275,6 +275,17 @@ export interface AgentInput { tools: AgentToolDefinition[]; threadId: string; signal?: AbortSignal; + /** + * Adapter-specific opaque payloads, keyed by adapter namespace. The + * shared contract intentionally does not enumerate keys — see each + * adapter's docs for which keys it reads and the shape of each value. + * + * The agents plugin and standalone `runAgent` populate this from the + * agent's tool index when entries declare an adapter-side spec (e.g. + * Supervisor API hosted tools). Adapters that don't read extensions + * should leave it untouched. + */ + extensions?: Readonly>; } export interface AgentRunContext { @@ -288,4 +299,23 @@ export interface AgentAdapter { input: AgentInput, context: AgentRunContext, ): AsyncGenerator; + + /** + * Extension keys this adapter consumes from {@link AgentInput.extensions}. + * The agents plugin (and standalone `runAgent`) warns at registration + * if the tool index produces extensions whose keys aren't listed here. + * + * Adapters that don't read extensions can omit this field. + */ + readonly acceptsExtensions?: readonly string[]; + + /** + * Whether the adapter consumes tools from `input.tools`. Defaults to + * true. Adapters whose tool execution happens elsewhere (e.g. the + * Supervisor API, where SA owns the tool loop server-side) declare + * false; the agents plugin warns at registration if the agent declares + * function tools or local sub-agents alongside such an adapter, since + * those tools would never reach the model. + */ + readonly consumesInputTools?: boolean; }