diff --git a/README.md b/README.md index daeb3fa21..9f12e449f 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,7 @@ That's it. `chat.messages()` is an Angular Signal. Bind it directly in your temp | Tool call progress | `toolProgress()` | `toolProgress` | | Tool calls with results | `toolCalls()` | `toolCalls` | | Branch / history | `branch()` / `history()` | `branch` / `history` | +| Pending run queue | `queue()` | `queue` | | Subagent streaming | `subagents()` / `activeSubagents()` | `subagents` / `activeSubagents` | | Reactive thread switching | `Signal` input | prop | | Submit | `submit(values, opts?)` | `submit(values, opts?)` | @@ -116,7 +117,7 @@ That's it. `chat.messages()` is an Angular Signal. Bind it directly in your temp />

-`agent()` creates 12 `BehaviorSubject`s at injection-context time — once, at component construction. The `StreamManager` bridge (the only file that touches `@langchain/langgraph-sdk` internals) pushes stream events into those subjects. `toSignal()` converts each subject to an Angular Signal, also at construction time. Dynamic actions (`submit`, `stop`, `switchThread`) push into the existing subjects — no new subjects are ever created after construction. This architecture is required because `toSignal()` must be called in an injection context and cannot be called again later. +`agent()` creates its internal `BehaviorSubject`s at injection-context time — once, at component construction. The `StreamManager` bridge (the only file that touches `@langchain/langgraph-sdk` internals) pushes stream events into those subjects. `toSignal()` converts each subject to an Angular Signal, also at construction time. Dynamic actions (`submit`, `stop`, `switchThread`) push into the existing subjects — no new subjects are ever created after construction. This architecture is required because `toSignal()` must be called in an injection context and cannot be called again later. --- diff --git a/apps/website/content/docs/agent/api/api-docs.json b/apps/website/content/docs/agent/api/api-docs.json index 094889408..5d51691b3 100644 --- a/apps/website/content/docs/agent/api/api-docs.json +++ b/apps/website/content/docs/agent/api/api-docs.json @@ -22,6 +22,62 @@ ], "properties": [], "methods": [ + { + "name": "cancelRun", + "signature": "cancelRun(threadId: string, runId: string, signal: AbortSignal)", + "description": "Cancel a server-side run.", + "params": [ + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "runId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, + { + "name": "createQueuedRun", + "signature": "createQueuedRun(assistantId: string, threadId: string, payload: unknown, signal: AbortSignal)", + "description": "Create a pending server-side run using LangGraph's enqueue strategy.", + "params": [ + { + "name": "assistantId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "payload", + "type": "unknown", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "joinStream", "signature": "joinStream(threadId: string, runId: string, lastEventId: string | undefined, signal: AbortSignal)", @@ -101,14 +157,89 @@ "examples": [ "```typescript\nconst transport = new MockAgentTransport([\n [{ type: 'values', data: { messages: [aiMsg('Hello')] } }],\n [{ type: 'values', data: { status: 'done' } }],\n]);\n```" ], - "properties": [], + "properties": [ + { + "name": "cancelledRuns", + "type": "object[]", + "description": "", + "optional": false + }, + { + "name": "createdQueuedRuns", + "type": "AgentQueueEntry[]", + "description": "", + "optional": false + }, + { + "name": "joinedRuns", + "type": "object[]", + "description": "", + "optional": false + } + ], "methods": [ + { + "name": "cancelRun", + "signature": "cancelRun(threadId: string, runId: string, signal: AbortSignal)", + "description": "Optional: cancel a server-side run.", + "params": [ + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "runId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "close", "signature": "close()", "description": "Close the stream. Remaining queued events are drained before completion.", "params": [] }, + { + "name": "createQueuedRun", + "signature": "createQueuedRun(_assistantId: string, threadId: string, payload: unknown, signal: AbortSignal)", + "description": "Optional: create a server-side queued run without joining it immediately.", + "params": [ + { + "name": "_assistantId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "payload", + "type": "unknown", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "emit", "signature": "emit(events: StreamEvent[])", @@ -141,6 +272,37 @@ "description": "Returns true if a stream is currently active.", "params": [] }, + { + "name": "joinStream", + "signature": "joinStream(threadId: string, runId: string, lastEventId: string | undefined, signal: AbortSignal)", + "description": "Optional: join an already-started run without creating a new one.", + "params": [ + { + "name": "threadId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "runId", + "type": "string", + "description": "", + "optional": false + }, + { + "name": "lastEventId", + "type": "string | undefined", + "description": "", + "optional": false + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": false + } + ] + }, { "name": "nextBatch", "signature": "nextBatch()", @@ -292,11 +454,93 @@ ], "examples": [] }, + { + "name": "AgentQueue", + "kind": "interface", + "description": "Public queue surface for pending server-side LangGraph runs.", + "properties": [ + { + "name": "cancel", + "type": "object", + "description": "Cancel a specific pending run by server run ID.", + "optional": false + }, + { + "name": "clear", + "type": "object", + "description": "Cancel all pending runs and clear the queue.", + "optional": false + }, + { + "name": "entries", + "type": "readonly AgentQueueEntry[]", + "description": "Read-only pending queue entries.", + "optional": false + }, + { + "name": "size", + "type": "number", + "description": "Number of pending queue entries.", + "optional": false + } + ], + "examples": [] + }, + { + "name": "AgentQueueEntry", + "kind": "interface", + "description": "A queued server-side LangGraph run.", + "properties": [ + { + "name": "createdAt", + "type": "Date", + "description": "Timestamp when the queued run was registered locally.", + "optional": false + }, + { + "name": "id", + "type": "string", + "description": "Server-side run ID.", + "optional": false + }, + { + "name": "options", + "type": "LangGraphSubmitOptions", + "description": "Submit options used when the queued run was created.", + "optional": true + }, + { + "name": "threadId", + "type": "string", + "description": "Thread that owns the queued run.", + "optional": false + }, + { + "name": "values", + "type": "T | null | undefined", + "description": "Values submitted for the queued run.", + "optional": false + } + ], + "examples": [] + }, { "name": "AgentTransport", "kind": "interface", "description": "Transport interface for connecting to a LangGraph agent.", "properties": [ + { + "name": "cancelRun", + "type": "unknown", + "description": "", + "optional": true + }, + { + "name": "createQueuedRun", + "type": "unknown", + "description": "", + "optional": true + }, { "name": "joinStream", "type": "unknown", @@ -483,6 +727,12 @@ "description": "", "optional": false }, + { + "name": "queue", + "type": "Signal>", + "description": "Pending server-side runs created via `multitaskStrategy: 'enqueue'`.", + "optional": false + }, { "name": "reload", "type": "object", @@ -552,6 +802,26 @@ ], "examples": [] }, + { + "name": "LangGraphSubmitOptions", + "kind": "interface", + "description": "Options accepted by LangGraph-backed submit calls.", + "properties": [ + { + "name": "multitaskStrategy", + "type": "LangGraphMultitaskStrategy", + "description": "Strategy for handling concurrent runs on the same thread.", + "optional": true + }, + { + "name": "signal", + "type": "AbortSignal", + "description": "", + "optional": true + } + ], + "examples": [] + }, { "name": "MockLangGraphAgent", "kind": "interface", @@ -665,6 +935,12 @@ "description": "", "optional": false }, + { + "name": "queue", + "type": "WritableSignal>", + "description": "Pending server-side runs created via `multitaskStrategy: 'enqueue'`.", + "optional": false + }, { "name": "reload", "type": "object", @@ -759,7 +1035,7 @@ }, { "name": "type", - "type": "\"error\" | \"values\" | `values|${string}` | \"messages\" | `messages|${string}` | `messages/${string}` | `messages/${string}|${string}` | \"updates\" | `updates|${string}` | \"tools\" | `tools|${string}` | \"custom\" | `custom|${string}` | `error|${string}` | \"metadata\" | \"checkpoints\" | `checkpoints|${string}` | \"tasks\" | `tasks|${string}` | \"debug\" | `debug|${string}` | \"events\" | `events|${string}` | \"interrupt\" | \"interrupts\"", + "type": "\"error\" | \"interrupt\" | \"values\" | `values|${string}` | \"messages\" | `messages|${string}` | `messages/${string}` | `messages/${string}|${string}` | \"updates\" | `updates|${string}` | \"tools\" | `tools|${string}` | \"custom\" | `custom|${string}` | `error|${string}` | \"metadata\" | \"checkpoints\" | `checkpoints|${string}` | \"tasks\" | `tasks|${string}` | \"debug\" | `debug|${string}` | \"events\" | `events|${string}` | \"interrupts\"", "description": "Event type identifier (e.g., 'values', 'messages', 'error', 'interrupt').", "optional": false } @@ -990,6 +1266,13 @@ "signature": "T extends { ~agentTypes: unknown } ? BagTemplate : B", "examples": [] }, + { + "name": "LangGraphMultitaskStrategy", + "kind": "type", + "description": "Strategy for handling concurrent LangGraph runs on the same thread.", + "signature": "\"reject\" | \"interrupt\" | \"rollback\" | \"enqueue\"", + "examples": [] + }, { "name": "ResourceStatus", "kind": "type", diff --git a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx index d92d62195..2e7bd5bc2 100644 --- a/apps/website/content/docs/agent/concepts/langgraph-basics.mdx +++ b/apps/website/content/docs/agent/concepts/langgraph-basics.mdx @@ -324,6 +324,7 @@ agent.branch() // Signal — time-travel branch agent.toolCalls() // Signal — tool results agent.toolProgress() // Signal — active tool execution +agent.queue() // Signal — pending enqueue runs agent.subagents() // Signal> — delegated agents agent.activeSubagents() // Signal — running workers ``` diff --git a/docs/limitations.md b/docs/limitations.md index dcdc388dc..96db98c93 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -66,20 +66,7 @@ SDK and depends on internal tree-diffing utilities not exported from --- -## 5. `queue` (QueueInterface) - -**React behavior:** `useStream()` exposes a `queue` property -(`QueueInterface`) for inspecting and managing the pending submission queue. - -**Angular behavior:** Queue management is handled internally in the -bridge but not exposed as a public signal in v1. The queue drains -automatically on `submit()` calls. - -**Workaround:** None in v1. Use `isLoading()` to gate UI interactions. - ---- - -### Limitation: subagent helper methods are not exposed +## 5. Subagent Helper Methods **Feature:** `getSubagent()` / `getSubagentsByType()` / `getSubagentsByMessage()` diff --git a/libs/langgraph/src/lib/agent.fn.spec.ts b/libs/langgraph/src/lib/agent.fn.spec.ts index 51e574a27..a1b987d8c 100644 --- a/libs/langgraph/src/lib/agent.fn.spec.ts +++ b/libs/langgraph/src/lib/agent.fn.spec.ts @@ -48,6 +48,24 @@ describe('agent', () => { expect(ref.isLoading()).toBe(true); }); + it('queue() exposes server-side enqueue submissions', async () => { + const transport = new MockAgentTransport(); + const ref = withInjectionContext(() => + agent({ apiUrl: '', assistantId: 'a', transport, threadId: 'thread-1' }) + ); + + ref.submit({ message: 'active' }); + await ref.submit({ message: 'queued' }, { multitaskStrategy: 'enqueue' }); + + expect(ref.queue().size).toBe(1); + expect(ref.queue().entries[0]).toMatchObject({ + values: { messages: [{ role: 'human', content: 'queued' }] }, + }); + + await ref.queue().clear(); + expect(ref.queue().size).toBe(0); + }); + it('hasValue becomes true after values event', async () => { const transport = new MockAgentTransport(); const ref = withInjectionContext(() => diff --git a/libs/langgraph/src/lib/agent.fn.ts b/libs/langgraph/src/lib/agent.fn.ts index c662976fd..6e5b32325 100644 --- a/libs/langgraph/src/lib/agent.fn.ts +++ b/libs/langgraph/src/lib/agent.fn.ts @@ -36,6 +36,8 @@ import { StreamSubjects, SubagentStreamRef, ResourceStatus, + AgentQueue, + LangGraphSubmitOptions, } from './agent.types'; import type { ThreadState, ToolProgress } from '@langchain/langgraph-sdk'; import type { MessageMetadata } from '@langchain/langgraph-sdk/ui'; @@ -100,6 +102,12 @@ export function agent< const toolCalls$ = new BehaviorSubject([]); const messageMetadata$ = new BehaviorSubject>>>(new Map()); const subagents$ = new BehaviorSubject>(new Map()); + const queue$ = new BehaviorSubject({ + entries: [], + size: 0, + cancel: async () => false, + clear: async () => undefined, + }); const custom$ = new BehaviorSubject([]); const hasValue$ = new BehaviorSubject(false); @@ -118,7 +126,7 @@ export function agent< const subjects: StreamSubjects> = { status$, values$, messages$, error$, interrupt$, interrupts$, branch$, history$, - isThreadLoading$, toolProgress$, toolCalls$, messageMetadata$, subagents$, custom$, + isThreadLoading$, toolProgress$, toolCalls$, messageMetadata$, subagents$, queue$, custom$, }; // threadId$ — resolved before bridge creation (injection context required for toObservable) @@ -165,6 +173,7 @@ export function agent< const toolProgSig = toSignal(toolProgress$, { initialValue: [] }); const rawToolCalls = toSignal(toolCalls$, { initialValue: [] }); const subagentsSig = toSignal(subagents$, { initialValue: new Map() }); + const queueSig = toSignal(queue$, { initialValue: queue$.value }); const customSig = toSignal(custom$, { initialValue: [] as CustomStreamEvent[] }); const isLoading = computed(() => statusSig() === ResourceStatus.Loading); @@ -214,8 +223,8 @@ export function agent< subagents: subagentsNeutral, events$, history: historyNeutral, - submit: (input: AgentSubmitInput, opts?: AgentSubmitOptions) => { - manager.submit(buildSubmitPayload(input), opts ? { signal: opts.signal } as never : undefined); + submit: (input: AgentSubmitInput, opts?: AgentSubmitOptions & LangGraphSubmitOptions) => { + manager.submit(buildSubmitPayload(input), opts); return Promise.resolve(); }, stop: () => manager.stop(), @@ -231,6 +240,7 @@ export function agent< hasValue: hasValueSig, reload: () => manager.resubmitLast(), toolProgress: toolProgSig, + queue: queueSig, activeSubagents, customEvents: customSig, branch: branchSig, diff --git a/libs/langgraph/src/lib/agent.types.ts b/libs/langgraph/src/lib/agent.types.ts index 8a1717639..9c1592128 100644 --- a/libs/langgraph/src/lib/agent.types.ts +++ b/libs/langgraph/src/lib/agent.types.ts @@ -73,6 +73,42 @@ export interface StreamEvent { [key: string]: unknown; } +/** Strategy for handling concurrent LangGraph runs on the same thread. */ +export type LangGraphMultitaskStrategy = 'reject' | 'interrupt' | 'rollback' | 'enqueue'; + +/** Options accepted by LangGraph-backed submit calls. */ +export interface LangGraphSubmitOptions { + signal?: AbortSignal; + /** Strategy for handling concurrent runs on the same thread. */ + multitaskStrategy?: LangGraphMultitaskStrategy; +} + +/** A queued server-side LangGraph run. */ +export interface AgentQueueEntry { + /** Server-side run ID. */ + id: string; + /** Thread that owns the queued run. */ + threadId: string; + /** Values submitted for the queued run. */ + values: T | null | undefined; + /** Submit options used when the queued run was created. */ + options?: LangGraphSubmitOptions; + /** Timestamp when the queued run was registered locally. */ + createdAt: Date; +} + +/** Public queue surface for pending server-side LangGraph runs. */ +export interface AgentQueue { + /** Read-only pending queue entries. */ + readonly entries: ReadonlyArray>; + /** Number of pending queue entries. */ + readonly size: number; + /** Cancel a specific pending run by server run ID. */ + cancel: (id: string) => Promise; + /** Cancel all pending runs and clear the queue. */ + clear: () => Promise; +} + /** A custom event emitted by the LangGraph backend via adispatch_custom_event(). */ export interface CustomStreamEvent { /** Event name set by the backend (e.g., 'state_update'). */ @@ -98,6 +134,21 @@ export interface AgentTransport { lastEventId: string | undefined, signal: AbortSignal, ): AsyncIterable; + + /** Optional: create a server-side queued run without joining it immediately. */ + createQueuedRun?( + assistantId: string, + threadId: string, + payload: unknown, + signal: AbortSignal, + ): Promise; + + /** Optional: cancel a server-side run. */ + cancelRun?( + threadId: string, + runId: string, + signal: AbortSignal, + ): Promise; } // ── Options ────────────────────────────────────────────────────────────────── @@ -184,6 +235,9 @@ export interface LangGraphAgent; + /** Pending server-side runs created via `multitaskStrategy: 'enqueue'`. */ + queue: Signal; + /** Filtered list of subagents with status 'running'. */ activeSubagents: Signal; @@ -230,5 +284,6 @@ export interface StreamSubjects; messageMetadata$: BehaviorSubject>>>; subagents$: BehaviorSubject>; + queue$: BehaviorSubject; custom$: BehaviorSubject; } diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts index 5574e216b..95a881a04 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.spec.ts @@ -20,6 +20,12 @@ function makeSubjects(): StreamSubjects> { toolCalls$: new BehaviorSubject([]), messageMetadata$: new BehaviorSubject(new Map()), subagents$: new BehaviorSubject(new Map()), + queue$: new BehaviorSubject({ + entries: [], + size: 0, + cancel: async () => false, + clear: async () => undefined, + }), custom$: new BehaviorSubject([]), }; } @@ -55,6 +61,132 @@ describe('createStreamManagerBridge', () => { destroy$.next(); }); + it('exposes enqueue submissions through queue$ without starting a second stream immediately', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'active' }] }); + await bridge.submit( + { messages: [{ type: 'human', content: 'queued' }] }, + { multitaskStrategy: 'enqueue' }, + ); + + expect(subjects.queue$.value.size).toBe(1); + expect(subjects.queue$.value.entries[0]).toMatchObject({ + values: { messages: [{ type: 'human', content: 'queued' }] }, + }); + expect(transport.createdQueuedRuns).toHaveLength(1); + expect(transport.isStreaming()).toBe(true); + destroy$.next(); + }); + + it('can cancel a queued run through queue$', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'active' }] }); + await bridge.submit( + { messages: [{ type: 'human', content: 'queued' }] }, + { multitaskStrategy: 'enqueue' }, + ); + + const queued = subjects.queue$.value.entries[0]; + await subjects.queue$.value.cancel(queued.id); + + expect(subjects.queue$.value.size).toBe(0); + expect(transport.cancelledRuns).toEqual([{ threadId: 'thread-1', runId: queued.id }]); + destroy$.next(); + }); + + it('cancels queued runs when switching threads', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'active' }] }); + await bridge.submit( + { messages: [{ type: 'human', content: 'queued' }] }, + { multitaskStrategy: 'enqueue' }, + ); + + bridge.switchThread('thread-2'); + await Promise.resolve(); + + expect(subjects.queue$.value.size).toBe(0); + expect(transport.cancelledRuns).toEqual([{ threadId: 'thread-1', runId: 'queued-run-1' }]); + destroy$.next(); + }); + + it('cancels queued runs when stopping the active stream', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'active' }] }); + await bridge.submit( + { messages: [{ type: 'human', content: 'queued' }] }, + { multitaskStrategy: 'enqueue' }, + ); + + await bridge.stop(); + + expect(subjects.queue$.value.size).toBe(0); + expect(transport.cancelledRuns).toEqual([{ threadId: 'thread-1', runId: 'queued-run-1' }]); + destroy$.next(); + }); + + it('joins queued runs in FIFO order after the active stream completes', async () => { + const transport = new MockAgentTransport(); + const subjects = makeSubjects(); + const destroy$ = new Subject(); + const bridge = createStreamManagerBridge({ + options: { apiUrl: '', assistantId: 'test', transport }, + subjects, + threadId$: of('thread-1'), + destroy$: destroy$.asObservable(), + }); + + bridge.submit({ messages: [{ type: 'human', content: 'active' }] }); + await bridge.submit( + { messages: [{ type: 'human', content: 'queued' }] }, + { multitaskStrategy: 'enqueue' }, + ); + + transport.close(); + await new Promise(r => setTimeout(r, 20)); + + expect(transport.joinedRuns).toEqual([{ threadId: 'thread-1', runId: 'queued-run-1' }]); + expect(subjects.queue$.value.size).toBe(0); + expect(subjects.values$.value).toMatchObject({ queued: true }); + destroy$.next(); + }); + it('sets status to Resolved when stream completes', async () => { const transport = new MockAgentTransport([ [{ type: 'values', values: { count: 1 } }], diff --git a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts index 8f04456a9..cf6edbeda 100644 --- a/libs/langgraph/src/lib/internals/stream-manager.bridge.ts +++ b/libs/langgraph/src/lib/internals/stream-manager.bridge.ts @@ -8,6 +8,9 @@ import { StreamEvent, AgentTransport, SubagentStreamRef, + AgentQueue, + AgentQueueEntry, + LangGraphSubmitOptions, } from '../agent.types'; import { FetchStreamTransport } from '../transport/fetch-stream.transport'; import { BagTemplate } from '@langchain/langgraph-sdk'; @@ -29,7 +32,7 @@ export interface StreamManagerBridgeOptions Promise; + submit: (values: unknown, opts?: LangGraphSubmitOptions) => Promise; stop: () => Promise; switchThread: (id: string | null) => void; joinStream: (runId: string, lastEventId?: string) => Promise; @@ -55,6 +58,8 @@ export function createStreamManagerBridge(); + const queuedRuns: AgentQueueEntry[] = []; + let drainingQueue = false; const subagentManager = new SubagentTracker({ subagentToolNames: options.subagentToolNames, onSubagentChange: publishSubagents, @@ -70,6 +75,8 @@ export function createStreamManagerBridge subjects.error$.next(err)); + publishQueue(); subjects.custom$.next([]); subjects.isThreadLoading$.next(false); toolProgressMap.clear(); @@ -93,6 +100,111 @@ export function createStreamManagerBridge { + if (!currentThreadId) { + throw new Error('Cannot enqueue a run before a LangGraph thread exists.'); + } + if (!transport.createQueuedRun) { + throw new Error('The configured LangGraph transport does not support server-side queueing.'); + } + + const controller = new AbortController(); + const entry = await transport.createQueuedRun( + options.assistantId, + currentThreadId, + payload, + opts?.signal ?? controller.signal, + ); + queuedRuns.push({ + ...entry, + values: payload, + options: { ...opts, multitaskStrategy: 'enqueue' }, + createdAt: entry.createdAt ?? new Date(), + }); + publishQueue(); + } + + async function cancelQueuedRun(id: string): Promise { + const index = queuedRuns.findIndex(entry => entry.id === id); + if (index === -1) return false; + + const [entry] = queuedRuns.splice(index, 1); + publishQueue(); + if (!entry || !transport.cancelRun) return false; + await cancelQueueEntries([entry]); + return true; + } + + async function clearQueue(): Promise { + const entries = takeQueuedRuns(); + publishQueue(); + await cancelQueueEntries(entries); + } + + function takeQueuedRuns(): AgentQueueEntry[] { + return queuedRuns.splice(0, queuedRuns.length); + } + + async function cancelQueueEntries(entries: AgentQueueEntry[]): Promise { + const cancelRun = transport.cancelRun?.bind(transport); + if (!cancelRun) return; + await Promise.all(entries.map(entry => + cancelRun(entry.threadId, entry.id, new AbortController().signal) + )); + } + + async function drainQueue(): Promise { + if (drainingQueue || queuedRuns.length === 0) return; + drainingQueue = true; + try { + while (queuedRuns.length > 0) { + const entry = queuedRuns.shift(); + publishQueue(); + if (!entry || !transport.joinStream) continue; + await joinQueuedRun(entry); + } + } finally { + drainingQueue = false; + } + } + + async function joinQueuedRun(entry: AgentQueueEntry): Promise { + abortController = new AbortController(); + subjects.custom$.next([]); + subjects.toolProgress$.next([]); + toolProgressMap.clear(); + subjects.status$.next(ResourceStatus.Loading); + + try { + const iter = transport.joinStream + ? transport.joinStream(entry.threadId, entry.id, undefined, abortController.signal) + : []; + for await (const event of iter) { + if (abortController.signal.aborted) break; + processEvent(event); + } + if (!abortController.signal.aborted) { + subjects.status$.next(ResourceStatus.Resolved); + } + } catch (err) { + subjects.error$.next(err); + subjects.status$.next(ResourceStatus.Error); + } + } + async function runStream(payload: unknown): Promise { abortController?.abort(); abortController = new AbortController(); @@ -127,6 +239,7 @@ export function createStreamManagerBridge runStream(payload), + submit: async (payload, opts) => { + if (opts?.multitaskStrategy === 'enqueue' && subjects.status$.value === ResourceStatus.Loading) { + await enqueueRun(payload, opts); + return; + } + await runStream(payload); + }, stop: async () => { abortController?.abort(); + await clearQueue(); subjects.status$.next(ResourceStatus.Resolved); }, diff --git a/libs/langgraph/src/lib/testing/mock-langgraph-agent.ts b/libs/langgraph/src/lib/testing/mock-langgraph-agent.ts index bd7de0d76..e7e426969 100644 --- a/libs/langgraph/src/lib/testing/mock-langgraph-agent.ts +++ b/libs/langgraph/src/lib/testing/mock-langgraph-agent.ts @@ -4,6 +4,7 @@ import { Subject } from 'rxjs'; import type { LangGraphAgent, SubagentStreamRef, + AgentQueue, Interrupt, ThreadState, CustomStreamEvent, @@ -41,6 +42,7 @@ export interface MockLangGraphAgent extends LangGraphAgent { toolCalls: WritableSignal; langGraphToolCalls: WritableSignal; toolProgress: WritableSignal; + queue: WritableSignal; branch: WritableSignal; history: WritableSignal; langGraphHistory: WritableSignal[]>; @@ -77,6 +79,12 @@ export function mockLangGraphAgent( const toolCalls$ = signal([]); const langGraphToolCalls$ = signal([]); const toolProgress$ = signal([]); + const queue$ = signal({ + entries: [], + size: 0, + cancel: async () => false, + clear: async () => undefined, + }); const branch$ = signal(''); const history$ = signal([]); const langGraphHistory$ = signal[]>([]); @@ -119,6 +127,7 @@ export function mockLangGraphAgent( // eslint-disable-next-line @typescript-eslint/no-empty-function reload: () => {}, toolProgress: toolProgress$, + queue: queue$, activeSubagents: activeSubagents$, customEvents: customEvents$, branch: branch$, diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts index e23b3c877..a7d8dae60 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.spec.ts @@ -4,6 +4,8 @@ import { FetchStreamTransport } from './fetch-stream.transport'; const mocks = vi.hoisted(() => ({ threadsCreate: vi.fn(), runsStream: vi.fn(), + runsCreate: vi.fn(), + runsCancel: vi.fn(), runsJoinStream: vi.fn(), clientCtor: vi.fn(function (_config: { apiUrl: string }) { return { @@ -12,6 +14,8 @@ const mocks = vi.hoisted(() => ({ }, runs: { stream: mocks.runsStream, + create: mocks.runsCreate, + cancel: mocks.runsCancel, joinStream: mocks.runsJoinStream, }, }; @@ -34,6 +38,8 @@ describe('FetchStreamTransport', () => { beforeEach(() => { mocks.threadsCreate.mockReset(); mocks.runsStream.mockReset(); + mocks.runsCreate.mockReset(); + mocks.runsCancel.mockReset(); mocks.runsJoinStream.mockReset(); mocks.clientCtor.mockClear(); }); @@ -199,4 +205,50 @@ describe('FetchStreamTransport', () => { { type: 'values', status: 'resumed', data: { status: 'resumed' } }, ]); }); + + it('creates a server-side queued run with enqueue multitask strategy', async () => { + mocks.runsCreate.mockResolvedValue({ + run_id: 'run-queued', + thread_id: 'thread-1', + created_at: '2026-05-02T00:00:00.000Z', + }); + + const transport = new FetchStreamTransport('http://example.test'); + const entry = await transport.createQueuedRun( + 'assistant-1', + 'thread-1', + { messages: [{ type: 'human', content: 'queued' }] }, + new AbortController().signal, + ); + + expect(mocks.runsCreate).toHaveBeenCalledWith( + 'thread-1', + 'assistant-1', + expect.objectContaining({ + input: { messages: [{ type: 'human', content: 'queued' }] }, + multitaskStrategy: 'enqueue', + streamSubgraphs: true, + }), + ); + expect(entry).toMatchObject({ + id: 'run-queued', + threadId: 'thread-1', + values: { messages: [{ type: 'human', content: 'queued' }] }, + }); + expect(entry.createdAt).toBeInstanceOf(Date); + }); + + it('cancels a queued run by thread and run id', async () => { + const transport = new FetchStreamTransport('http://example.test'); + + await transport.cancelRun('thread-1', 'run-queued', new AbortController().signal); + + expect(mocks.runsCancel).toHaveBeenCalledWith( + 'thread-1', + 'run-queued', + false, + 'interrupt', + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); }); diff --git a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts index 0ceef0e53..7edc2d99e 100644 --- a/libs/langgraph/src/lib/transport/fetch-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/fetch-stream.transport.ts @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MIT import { Client } from '@langchain/langgraph-sdk'; import type { StreamMode } from '@langchain/langgraph-sdk'; -import { AgentTransport, StreamEvent } from '../agent.types'; +import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; /** * Production transport that connects to a LangGraph Platform API via HTTP and SSE. @@ -84,6 +84,36 @@ export class FetchStreamTransport implements AgentTransport { yield normalizeSdkEvent(event.event as StreamEvent['type'], event.data); } } + + /** Create a pending server-side run using LangGraph's enqueue strategy. */ + async createQueuedRun( + assistantId: string, + threadId: string, + payload: unknown, + signal: AbortSignal, + ): Promise { + const streamMode = ['values', 'messages-tuple', 'updates', 'tools', 'custom'] satisfies StreamMode[]; + const run = await this.client.runs.create(threadId, assistantId, { + input: payload as Record, + streamMode: streamMode as unknown as 'values', + streamSubgraphs: true, + multitaskStrategy: 'enqueue', + signal, + }); + + return { + id: run.run_id, + threadId: run.thread_id ?? threadId, + values: payload, + options: { multitaskStrategy: 'enqueue', signal }, + createdAt: run.created_at ? new Date(run.created_at) : new Date(), + }; + } + + /** Cancel a server-side run. */ + async cancelRun(threadId: string, runId: string, signal: AbortSignal): Promise { + await this.client.runs.cancel(threadId, runId, false, 'interrupt', { signal }); + } } function normalizeSdkEvent(type: StreamEvent['type'], data: unknown): StreamEvent { diff --git a/libs/langgraph/src/lib/transport/mock-stream.transport.ts b/libs/langgraph/src/lib/transport/mock-stream.transport.ts index 2c4f3fbad..460d4d9d6 100644 --- a/libs/langgraph/src/lib/transport/mock-stream.transport.ts +++ b/libs/langgraph/src/lib/transport/mock-stream.transport.ts @@ -1,5 +1,5 @@ // SPDX-License-Identifier: MIT -import { AgentTransport, StreamEvent } from '../agent.types'; +import { AgentQueueEntry, AgentTransport, StreamEvent } from '../agent.types'; /** * Test transport for deterministic agent testing without a real LangGraph server. @@ -16,6 +16,9 @@ import { AgentTransport, StreamEvent } from '../agent.types'; * ``` */ export class MockAgentTransport implements AgentTransport { + readonly createdQueuedRuns: AgentQueueEntry[] = []; + readonly cancelledRuns: Array<{ threadId: string; runId: string }> = []; + readonly joinedRuns: Array<{ threadId: string; runId: string }> = []; private script: StreamEvent[][]; private scriptIndex = 0; private streaming = false; @@ -70,7 +73,8 @@ export class MockAgentTransport implements AgentTransport { while (!this.closed && !signal.aborted) { if (this.pendingError) throw this.pendingError; if (this.eventQueue.length > 0) { - yield this.eventQueue.shift()!; + const event = this.eventQueue.shift(); + if (event) yield event; } else { // Wait until flush() wakes us, then loop again to check state. await new Promise((resolve) => { @@ -81,12 +85,50 @@ export class MockAgentTransport implements AgentTransport { } if (signal.aborted) return; // Drain remaining events after close() - while (this.eventQueue.length > 0) yield this.eventQueue.shift()!; + while (this.eventQueue.length > 0) { + const event = this.eventQueue.shift(); + if (event) yield event; + } } finally { this.streaming = false; } } + async createQueuedRun( + _assistantId: string, + threadId: string, + payload: unknown, + signal: AbortSignal, + ): Promise { + void signal; + const entry: AgentQueueEntry = { + id: `queued-run-${this.createdQueuedRuns.length + 1}`, + threadId, + values: payload, + options: { multitaskStrategy: 'enqueue' }, + createdAt: new Date(), + }; + this.createdQueuedRuns.push(entry); + return entry; + } + + async cancelRun(threadId: string, runId: string, signal: AbortSignal): Promise { + void signal; + this.cancelledRuns.push({ threadId, runId }); + } + + async *joinStream( + threadId: string, + runId: string, + lastEventId: string | undefined, + signal: AbortSignal, + ): AsyncIterable { + void lastEventId; + void signal; + this.joinedRuns.push({ threadId, runId }); + yield { type: 'values', values: { queued: true } }; + } + private flush(): void { const resolve = this.resolvers.shift(); if (resolve) resolve(); diff --git a/libs/langgraph/src/public-api.ts b/libs/langgraph/src/public-api.ts index 72b748692..8567342a2 100644 --- a/libs/langgraph/src/public-api.ts +++ b/libs/langgraph/src/public-api.ts @@ -9,7 +9,11 @@ export type { AgentConfig } from './lib/agent.provider'; // Public types export type { AgentOptions, + AgentQueue, + AgentQueueEntry, LangGraphAgent, + LangGraphMultitaskStrategy, + LangGraphSubmitOptions, AgentTransport, CustomStreamEvent, StreamEvent,