Skip to content

Commit 0a737f3

Browse files
committed
feat(wecom): add streaming message support with StreamReplier interface
- Add StreamReplier interface for channels supporting streaming (StartStream, AppendStream, FinishStream) - Implement streaming methods in WSChannel with full accumulated content updates - Reply now delegates to StartStream + FinishStream (DRY) - Extract replyCtx helper to eliminate type assertion duplication - generateReqID now uses prefix format (prefix_id) - Add handleStreamingReply with tool call loop support - Use settings.Current.MaxLoopIterations for loop limit - Extract buildChatMessagesAndTools to eliminate code duplication
1 parent cccef3f commit 0a737f3

4 files changed

Lines changed: 519 additions & 41 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
---
2+
title: feat: WeCom WebSocket 流式消息支持
3+
type: feat
4+
status: active
5+
date: 2026-04-02
6+
---
7+
8+
# WeCom WebSocket 流式消息支持
9+
10+
## Overview
11+
12+
改造 WeCom WebSocket 的 `Reply` 方法,实现真正的流式消息:首次 `finish=false` 创建流,后续追加,最终 `finish=true` 结束。
13+
14+
## Problem Statement
15+
16+
当前 `WSChannel.Reply()` (`websocket.go:370`) 直接发送 `finish=true` 的流式消息,无法实现内容接续效果。
17+
18+
## Proposed Solution
19+
20+
### 1. WSChannel 添加三个方法
21+
22+
`pkg/services/channels/wecom/websocket.go` 中新增:
23+
24+
```go
25+
// startStream 首次发送,finish=false
26+
func (p *WSChannel) startStream(ctx context.Context, rctx wsReplyContext, content string) (string, error) {
27+
streamID := p.generateReqID("stream")
28+
frame := p.buildStreamFrame(rctx, streamID, content, false)
29+
if err := p.writeJSON(frame); err != nil {
30+
return "", err
31+
}
32+
return streamID, nil
33+
}
34+
35+
// appendStream 后续发送,finish=false
36+
func (p *WSChannel) appendStream(ctx context.Context, rctx wsReplyContext, streamID string, content string) error {
37+
frame := p.buildStreamFrame(rctx, streamID, content, false)
38+
return p.writeJSON(frame)
39+
}
40+
41+
// finishStream 结束发送,finish=true
42+
func (p *WSChannel) finishStream(ctx context.Context, rctx wsReplyContext, streamID string) error {
43+
frame := p.buildStreamFrame(rctx, streamID, "", true)
44+
return p.writeJSON(frame)
45+
}
46+
47+
// wsStreamFrame 预定义流式消息帧结构
48+
type wsStreamFrame struct {
49+
Cmd string `json:"cmd"`
50+
Headers wsFrameHeaders `json:"headers"`
51+
Body wsStreamBody `json:"body"`
52+
}
53+
54+
type wsStreamBody struct {
55+
MsgType string `json:"msgtype"`
56+
Stream wsStreamContent `json:"stream"`
57+
}
58+
59+
type wsStreamContent struct {
60+
ID string `json:"id"`
61+
Finish bool `json:"finish"`
62+
Content string `json:"content"`
63+
}
64+
65+
func (p *WSChannel) buildStreamFrame(rc wsReplyContext, streamID, content string, finish bool) wsStreamFrame {
66+
return wsStreamFrame{
67+
Cmd: "aibot_respond_msg",
68+
Headers: wsFrameHeaders{ReqID: rc.reqID},
69+
Body: wsStreamBody{
70+
MsgType: "stream",
71+
Stream: wsStreamContent{
72+
ID: streamID,
73+
Finish: finish,
74+
Content: content,
75+
},
76+
},
77+
}
78+
}
79+
```
80+
81+
### 2. StreamReplier 接口(可选)
82+
83+
`pkg/models/channel/channel.go` 添加:
84+
85+
```go
86+
// StreamReplier is an optional interface for channels that support streaming.
87+
type StreamReplier interface {
88+
StartStream(ctx context.Context, replyCtx any, content string) (streamID string, err error)
89+
AppendStream(ctx context.Context, replyCtx any, streamID string, content string) error
90+
FinishStream(ctx context.Context, replyCtx any, streamID string) error
91+
}
92+
```
93+
94+
WSChannel 实现该接口。
95+
96+
### 3. MessageHandler 流式分支
97+
98+
`handle_platform.go` 中检测 `StreamReplier` 接口,走流式处理分支:
99+
100+
```go
101+
// handleStreamingReply 处理 WeCom WebSocket 流式回复
102+
func (chh *channelHandler) handleStreamingReply(p channel.Channel, msg *channel.Message) {
103+
ctx := context.Background()
104+
105+
// 构建消息(复用现有逻辑)...
106+
messages, tools := chh.buildChatMessages(msg)
107+
108+
// 流式调用 LLM
109+
stream, err := chh.llm.StreamChat(ctx, messages, tools)
110+
if err != nil {
111+
channelReplyError(p, msg, "AI processing failed")
112+
return
113+
}
114+
115+
sr := p.(channel.StreamReplier)
116+
var streamID string
117+
var content strings.Builder // 本地累积完整内容
118+
119+
for result := range stream {
120+
if result.Error != nil {
121+
slog.Warn("channel: stream error", "err", result.Error)
122+
break
123+
}
124+
125+
content.WriteString(result.Delta) // 累积内容
126+
127+
if streamID == "" {
128+
// 首次:startStream,发送累积内容
129+
sid, err := sr.StartStream(ctx, msg.ReplyCtx, content.String())
130+
if err != nil {
131+
slog.Error("channel: start stream failed", "err", err)
132+
channelReplyError(p, msg, "Failed to start streaming")
133+
return
134+
}
135+
streamID = sid
136+
} else {
137+
// 后续:appendStream,每次发送完整累积内容(覆盖更新)
138+
if err := sr.AppendStream(ctx, msg.ReplyCtx, streamID, content.String()); err != nil {
139+
slog.Warn("channel: append stream failed", "err", err)
140+
}
141+
}
142+
}
143+
144+
// 结束流
145+
if streamID != "" {
146+
if err := sr.FinishStream(ctx, msg.ReplyCtx, streamID); err != nil {
147+
slog.Warn("channel: finish stream failed", "err", err)
148+
}
149+
}
150+
151+
// 保存历史...
152+
}
153+
```
154+
155+
`MessageHandler` 中检测:
156+
157+
```go
158+
func (chh *channelHandler) MessageHandler(p channel.Channel, msg *channel.Message) {
159+
// ... 命令检测等前置逻辑不变 ...
160+
161+
if sr, ok := p.(channel.StreamReplier); ok {
162+
chh.handleStreamingReply(p, msg)
163+
} else {
164+
chh.handleRegularReply(p, msg)
165+
}
166+
}
167+
```
168+
169+
## File Changes
170+
171+
| File | Change |
172+
|------|--------|
173+
| `pkg/models/channel/channel.go` | 添加 `StreamReplier` 接口 |
174+
| `pkg/services/channels/wecom/websocket.go` | 实现 `StreamReplier` 三个方法 |
175+
| `pkg/web/api/handle_platform.go` | `MessageHandler` 增加流式分支检测 |
176+
177+
## Acceptance Criteria
178+
179+
- [ ] WeCom WebSocket 首次发送 `finish=false`
180+
- [ ] 后续增量追加
181+
- [ ] 最终发送 `finish=true`
182+
- [ ] HTTP 模式和其他通道不受影响
183+
184+
## ⚠️ 待验证
185+
186+
- [x] ~~**WeCom 流式接续语义**~~ 官方文档明确:继续使用相同 `stream.id` 推送会**覆盖**消息内容,需本地累积完整内容后发送。设计已按此思路实现,验证不符时再修订。

pkg/models/channel/channel.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,13 @@ type MessageUpdater interface {
4646
UpdateMessage(ctx context.Context, replyCtx any, content string) error
4747
}
4848

49+
// StreamReplier is an optional interface for channels that support streaming message updates.
50+
// The channel accumulates content locally and sends full accumulated content on each update.
51+
type StreamReplier interface {
52+
StartStream(ctx context.Context, replyCtx any, content string) (streamID string, err error)
53+
AppendStream(ctx context.Context, replyCtx any, streamID string, content string) error
54+
FinishStream(ctx context.Context, replyCtx any, streamID string, finalContent string) error
55+
}
56+
4957
// MessageHandler is called by channels when a new message arrives.
5058
type MessageHandler func(p Channel, msg *Message)

pkg/services/channels/wecom/websocket.go

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,24 @@ type wsMsgCallbackBody struct {
8080
CreateTime int64 `json:"create_time"`
8181
}
8282

83+
// wsStreamFrame is the frame structure for streaming message responses.
84+
type wsStreamFrame struct {
85+
Cmd string `json:"cmd"`
86+
Headers wsFrameHeaders `json:"headers"`
87+
Body wsStreamBody `json:"body"`
88+
}
89+
90+
type wsStreamBody struct {
91+
MsgType string `json:"msgtype"`
92+
Stream wsStreamContent `json:"stream"`
93+
}
94+
95+
type wsStreamContent struct {
96+
ID string `json:"id"`
97+
Finish bool `json:"finish"`
98+
Content string `json:"content"`
99+
}
100+
83101
func newWebSocket(opts map[string]any) (channel.Channel, error) {
84102
botID, _ := opts["bot_id"].(string)
85103
secret, _ := opts["bot_secret"].(string)
@@ -98,9 +116,15 @@ func newWebSocket(opts map[string]any) (channel.Channel, error) {
98116

99117
func (p *WSChannel) generateReqID(prefix string) string {
100118
id := oid.NewID(oid.OtEvent)
101-
return id.String()
102-
// seq := p.reqSeq.Add(1)
103-
// return fmt.Sprintf("%s_%d", prefix, seq)
119+
return fmt.Sprintf("%s_%s", prefix, id)
120+
}
121+
122+
func (p *WSChannel) replyCtx(rctx any) (wsReplyContext, error) {
123+
rc, ok := rctx.(wsReplyContext)
124+
if !ok {
125+
return wsReplyContext{}, fmt.Errorf("wecom-ws: invalid reply context type %T", rctx)
126+
}
127+
return rc, nil
104128
}
105129

106130
func (p *WSChannel) Name() string { return "wecom" }
@@ -368,39 +392,82 @@ func (p *WSChannel) handleMsgCallback(frame wsFrame) {
368392
}
369393

370394
func (p *WSChannel) Reply(ctx context.Context, rctx any, content string) error {
371-
rc, ok := rctx.(wsReplyContext)
372-
if !ok {
373-
return fmt.Errorf("wecom-ws: invalid reply context type %T", rctx)
374-
}
375395
if content == "" {
376396
return nil
377397
}
398+
streamID, err := p.StartStream(ctx, rctx, content)
399+
if err != nil {
400+
return err
401+
}
402+
return p.FinishStream(ctx, rctx, streamID, content)
403+
}
378404

379-
streamID := p.generateReqID("stream")
380-
frame := map[string]any{
381-
"cmd": "aibot_respond_msg",
382-
"headers": map[string]string{"req_id": rc.reqID},
383-
"body": map[string]any{
384-
"msgtype": "stream",
385-
"stream": map[string]any{
386-
"id": streamID,
387-
"finish": true,
388-
"content": content,
405+
// buildStreamFrame builds a streaming message frame.
406+
func (p *WSChannel) buildStreamFrame(rc wsReplyContext, streamID, content string, finish bool) wsStreamFrame {
407+
return wsStreamFrame{
408+
Cmd: "aibot_respond_msg",
409+
Headers: wsFrameHeaders{ReqID: rc.reqID},
410+
Body: wsStreamBody{
411+
MsgType: "stream",
412+
Stream: wsStreamContent{
413+
ID: streamID,
414+
Finish: finish,
415+
Content: content,
389416
},
390417
},
391418
}
419+
}
420+
421+
// StartStream starts a new streaming message, returns the stream ID.
422+
func (p *WSChannel) StartStream(ctx context.Context, rctx any, content string) (string, error) {
423+
rc, err := p.replyCtx(rctx)
424+
if err != nil {
425+
return "", err
426+
}
427+
streamID := p.generateReqID("stream")
428+
frame := p.buildStreamFrame(rc, streamID, content, false)
429+
if err := p.writeJSON(frame); err != nil {
430+
slog.Error("wecom-ws: start stream failed", "user", rc.userID, "error", err)
431+
return "", err
432+
}
433+
slog.Debug("wecom-ws: stream started", "user", rc.userID, "streamID", streamID, "len", len(content))
434+
return streamID, nil
435+
}
436+
437+
// AppendStream appends content to an existing stream.
438+
func (p *WSChannel) AppendStream(ctx context.Context, rctx any, streamID string, content string) error {
439+
rc, err := p.replyCtx(rctx)
440+
if err != nil {
441+
return err
442+
}
443+
frame := p.buildStreamFrame(rc, streamID, content, false)
444+
if err := p.writeJSON(frame); err != nil {
445+
slog.Warn("wecom-ws: append stream failed", "user", rc.userID, "streamID", streamID, "error", err)
446+
return err
447+
}
448+
slog.Debug("wecom-ws: stream appended", "user", rc.userID, "streamID", streamID, "len", len(content))
449+
return nil
450+
}
451+
452+
// FinishStream ends the streaming message with final content.
453+
func (p *WSChannel) FinishStream(ctx context.Context, rctx any, streamID string, finalContent string) error {
454+
rc, err := p.replyCtx(rctx)
455+
if err != nil {
456+
return err
457+
}
458+
frame := p.buildStreamFrame(rc, streamID, finalContent, true)
392459
if err := p.writeJSON(frame); err != nil {
393-
slog.Error("wecom-ws: reply failed", "user", rc.userID, "error", err)
460+
slog.Error("wecom-ws: finish stream failed", "user", rc.userID, "streamID", streamID, "error", err)
394461
return err
395462
}
396-
slog.Debug("wecom-ws: reply sent", "user", rc.userID, "len", len(content))
463+
slog.Debug("wecom-ws: stream finished", "user", rc.userID, "streamID", streamID, "content_len", len(finalContent))
397464
return nil
398465
}
399466

400467
func (p *WSChannel) Send(ctx context.Context, rctx any, content string) error {
401-
rc, ok := rctx.(wsReplyContext)
402-
if !ok {
403-
return fmt.Errorf("wecom-ws: invalid reply context type %T", rctx)
468+
rc, err := p.replyCtx(rctx)
469+
if err != nil {
470+
return err
404471
}
405472
if content == "" {
406473
return nil

0 commit comments

Comments
 (0)