Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/major-pianos-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chat": minor
---

add streaming options to thread.post() with platform-specific namespacing
5 changes: 5 additions & 0 deletions packages/chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export type {
} from "./postable-object";
export { isPostableObject } from "./postable-object";
export { reviver } from "./reviver";
export {
StreamMessage,
type StreamMessageData,
type StreamMessageOptions,
} from "./stream-message";
export { StreamingMarkdownRenderer } from "./streaming-markdown";
export { type SerializedThread, ThreadImpl } from "./thread";

Expand Down
89 changes: 89 additions & 0 deletions packages/chat/src/stream-message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import {
POSTABLE_OBJECT,
type PostableObject,
type PostableObjectContext,
} from "./postable-object";
import type { Adapter, StreamChunk, StreamEvent } from "./types";

export interface StreamMessageOptions {
/**
* Block Kit elements to attach when the stream stops (Slack only).
* Useful for adding feedback buttons after a streamed response.
*/
endWith?: unknown[];
/**
* Controls how task_update chunks are displayed (Slack only).
* - `"plan"` - all tasks grouped into a single plan block
* - `"timeline"` - individual task cards shown inline with text (default)
*/
groupTasks?: "plan" | "timeline";
/**
* Minimum interval between updates in ms (default: 500).
* Used for fallback mode (post+edit on adapters without native streaming).
*/
updateIntervalMs?: number;
}

export interface StreamMessageData {
options: StreamMessageOptions;
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
}

/**
* A StreamMessage wraps an async iterable with platform-specific streaming options.
*
* Use this when you need to pass options like task grouping or stop blocks
* to the streaming API. For simple streaming without options, pass the
* async iterable directly to `thread.post()`.
*
* @example
* ```typescript
* const stream = new StreamMessage(result.fullStream, {
* groupTasks: "plan",
* endWith: [feedbackBlock],
* });
* await thread.post(stream);
* ```
*/
export class StreamMessage implements PostableObject<StreamMessageData> {
readonly $$typeof = POSTABLE_OBJECT;
readonly kind = "stream";

private readonly _stream: AsyncIterable<string | StreamChunk | StreamEvent>;
private readonly _options: StreamMessageOptions;

constructor(
stream: AsyncIterable<string | StreamChunk | StreamEvent>,
options: StreamMessageOptions = {}
) {
this._stream = stream;
this._options = options;
}

get stream(): AsyncIterable<string | StreamChunk | StreamEvent> {
return this._stream;
}

get options(): StreamMessageOptions {
return this._options;
}

getFallbackText(): string {
return "";
}

getPostData(): StreamMessageData {
return {
stream: this._stream,
options: this._options,
};
}

isSupported(_adapter: Adapter): boolean {
return true;
}

onPosted(_context: PostableObjectContext): void {
// Streams are one-shot, no lifecycle binding needed
}
}
122 changes: 122 additions & 0 deletions packages/chat/src/thread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
mockLogger,
} from "./mock-adapter";
import { Plan } from "./plan";
import { StreamMessage } from "./stream-message";
import { ThreadImpl } from "./thread";
import type { Adapter, Message, ScheduledMessage, StreamChunk } from "./types";
import { NotImplementedError } from "./types";
Expand Down Expand Up @@ -638,6 +639,127 @@ describe("ThreadImpl", () => {
})
);
});

it("should pass StreamMessage PostableObject options to adapter.stream", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
const streamMsg = new StreamMessage(textStream, {
groupTasks: "plan",
endWith: [{ type: "actions" }],
updateIntervalMs: 1000,
});
await thread.post(streamMsg);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "plan",
stopBlocks: [{ type: "actions" }],
updateIntervalMs: 1000,
})
);
});

it("should pass StreamMessage with only groupTasks", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(
new StreamMessage(textStream, { groupTasks: "timeline" })
);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "timeline",
})
);
const options = mockStream.mock.calls[0][2];
expect(options.stopBlocks).toBeUndefined();
});

it("should pass StreamMessage with only endWith", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(
new StreamMessage(textStream, { endWith: [{ type: "actions" }] })
);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
stopBlocks: [{ type: "actions" }],
})
);
const options = mockStream.mock.calls[0][2];
expect(options.taskDisplayMode).toBeUndefined();
});

it("should pass StreamMessage PostableObject options to adapter.stream", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
const streamMsg = new StreamMessage(textStream, {
groupTasks: "plan",
endWith: [{ type: "actions" }],
});
await thread.post(streamMsg);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "plan",
stopBlocks: [{ type: "actions" }],
})
);
});

it("should still work without options (backward compat)", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(textStream);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.any(Object)
);
const options = mockStream.mock.calls[0][2];
expect(options.taskDisplayMode).toBeUndefined();
expect(options.stopBlocks).toBeUndefined();
});
});

describe("fallback streaming error logging", () => {
Expand Down
28 changes: 25 additions & 3 deletions packages/chat/src/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,27 @@ export class ThreadImpl<TState = Record<string, unknown>>
message: string | PostableMessage | ChatElement
): Promise<SentMessage | PostableObject> {
if (isPostableObject(message)) {
// StreamMessage PostableObject - route to streaming with options
if (message.kind === "stream") {
const data = message.getPostData() as {
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
options: {
groupTasks?: "plan" | "timeline";
endWith?: unknown[];
updateIntervalMs?: number;
};
};
const streamOptions: StreamOptions = {
...(data.options.updateIntervalMs
? { updateIntervalMs: data.options.updateIntervalMs }
: {}),
...(data.options.groupTasks
? { taskDisplayMode: data.options.groupTasks }
: {}),
...(data.options.endWith ? { stopBlocks: data.options.endWith } : {}),
};
return this.handleStream(data.stream, streamOptions);
}
await this.handlePostableObject(message);
return message;
}
Expand Down Expand Up @@ -516,12 +537,13 @@ export class ThreadImpl<TState = Record<string, unknown>>
* then uses adapter's native streaming if available, otherwise falls back to post+edit.
*/
private async handleStream(
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>,
callerOptions?: StreamOptions
): Promise<SentMessage> {
// Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects
const textStream = fromFullStream(rawStream);
// Build streaming options from current message context
const options: StreamOptions = {};
// Build streaming options from current message context + caller options
const options: StreamOptions = { ...callerOptions };
if (this._currentMessage) {
options.recipientUserId = this._currentMessage.author.userId;
// Extract teamId from raw Slack payload
Expand Down
8 changes: 8 additions & 0 deletions packages/chat/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,14 @@ export interface Thread<TState = Record<string, unknown>, TRawMessage = unknown>
* const result = await agent.stream({ prompt: message.text });
* await thread.post(result.textStream);
*
* // Stream with platform-specific options
* // Stream with options via StreamMessage PostableObject
* const stream = new StreamMessage(result.fullStream, {
* groupTasks: "plan",
* endWith: [feedbackBlocks],
* });
* await thread.post(stream);
*
* // Plan with live updates
* const plan = new Plan({ initialMessage: "Working..." });
* await thread.post(plan);
Expand Down
Loading