Skip to content

Commit b3f900d

Browse files
committed
fix(#2053): filter sub-agent streaming events from parent session persistence
During task transfers, sub-agent streaming events (AgentChoiceEvent, AgentChoiceReasoningEvent, MessageAddedEvent) were forwarded through the parent session's event channel. The PersistentRuntime's handleEvent checked sess.IsSubSession(), but sess was always the parent session, so the guard never triggered. This caused sub-agent assistant messages to be persisted directly into the parent session's message history, corrupting it. On session restore, the parent session contained interleaved sub-agent messages with tool_use blocks that had no corresponding tool_result messages in the parent context, causing Anthropic API errors: "unexpected tool_use_id found in tool_result blocks". Add SessionID field to AgentChoiceEvent and AgentChoiceReasoningEvent, and filter all streaming/message events by comparing the event's SessionID against the parent session's ID. Events from sub-sessions are now silently skipped during persistence (they are persisted separately via SubSessionCompletedEvent). Assisted-By: docker-agent
1 parent e5d75fe commit b3f900d

4 files changed

Lines changed: 34 additions & 21 deletions

File tree

pkg/runtime/event.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,33 @@ func StreamStarted(sessionID, agentName string) Event {
138138
}
139139

140140
type AgentChoiceEvent struct {
141-
Type string `json:"type"`
142-
Content string `json:"content"`
141+
Type string `json:"type"`
142+
Content string `json:"content"`
143+
SessionID string `json:"session_id,omitempty"`
143144
AgentContext
144145
}
145146

146-
func AgentChoice(agentName, content string) Event {
147+
func AgentChoice(agentName, sessionID, content string) Event {
147148
return &AgentChoiceEvent{
148149
Type: "agent_choice",
149150
Content: content,
151+
SessionID: sessionID,
150152
AgentContext: newAgentContext(agentName),
151153
}
152154
}
153155

154156
type AgentChoiceReasoningEvent struct {
155-
Type string `json:"type"`
156-
Content string `json:"content"`
157+
Type string `json:"type"`
158+
Content string `json:"content"`
159+
SessionID string `json:"session_id,omitempty"`
157160
AgentContext
158161
}
159162

160-
func AgentChoiceReasoning(agentName, content string) Event {
163+
func AgentChoiceReasoning(agentName, sessionID, content string) Event {
161164
return &AgentChoiceReasoningEvent{
162165
Type: "agent_choice_reasoning",
163166
Content: content,
167+
SessionID: sessionID,
164168
AgentContext: newAgentContext(agentName),
165169
}
166170
}

pkg/runtime/persistent_runtime.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,19 @@ func (r *PersistentRuntime) handleEvent(ctx context.Context, sess *session.Sessi
7373

7474
switch e := event.(type) {
7575
case *AgentChoiceEvent:
76+
if e.SessionID != sess.ID {
77+
return
78+
}
7679
// Accumulate streaming content
7780
streaming.content.WriteString(e.Content)
7881
streaming.agentName = e.AgentName
7982

8083
r.persistStreamingContent(ctx, sess.ID, streaming)
8184

8285
case *AgentChoiceReasoningEvent:
86+
if e.SessionID != sess.ID {
87+
return
88+
}
8389
// Accumulate streaming reasoning content
8490
streaming.reasoningContent.WriteString(e.Content)
8591
streaming.agentName = e.AgentName
@@ -98,6 +104,9 @@ func (r *PersistentRuntime) handleEvent(ctx context.Context, sess *session.Sessi
98104
}
99105

100106
case *MessageAddedEvent:
107+
if e.SessionID != sess.ID {
108+
return
109+
}
101110
// Finalize the streaming message with complete metadata
102111
if streaming.messageID != 0 {
103112
// Update the existing streaming message with final content

pkg/runtime/runtime_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ func TestSimple(t *testing.T) {
282282
UserMessage("Hi", sess.ID, nil, 0),
283283
StreamStarted(sess.ID, "root"),
284284
ToolsetInfo(0, false, "root"),
285-
AgentChoice("root", "Hello"),
285+
AgentChoice("root", sess.ID, "Hello"),
286286
MessageAdded(sess.ID, msgAdded.Message, "root"),
287287
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 3, OutputTokens: 2, ContextLength: 5, LastMessage: &MessageUsage{
288288
Usage: chat.Usage{InputTokens: 3, OutputTokens: 2},
@@ -321,11 +321,11 @@ func TestMultipleContentChunks(t *testing.T) {
321321
UserMessage("Please greet me", sess.ID, nil, 0),
322322
StreamStarted(sess.ID, "root"),
323323
ToolsetInfo(0, false, "root"),
324-
AgentChoice("root", "Hello "),
325-
AgentChoice("root", "there, "),
326-
AgentChoice("root", "how "),
327-
AgentChoice("root", "are "),
328-
AgentChoice("root", "you?"),
324+
AgentChoice("root", sess.ID, "Hello "),
325+
AgentChoice("root", sess.ID, "there, "),
326+
AgentChoice("root", sess.ID, "how "),
327+
AgentChoice("root", sess.ID, "are "),
328+
AgentChoice("root", sess.ID, "you?"),
329329
MessageAdded(sess.ID, msgAdded.Message, "root"),
330330
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 8, OutputTokens: 12, ContextLength: 20, LastMessage: &MessageUsage{
331331
Usage: chat.Usage{InputTokens: 8, OutputTokens: 12},
@@ -362,9 +362,9 @@ func TestWithReasoning(t *testing.T) {
362362
UserMessage("Hi", sess.ID, nil, 0),
363363
StreamStarted(sess.ID, "root"),
364364
ToolsetInfo(0, false, "root"),
365-
AgentChoiceReasoning("root", "Let me think about this..."),
366-
AgentChoiceReasoning("root", " I should respond politely."),
367-
AgentChoice("root", "Hello, how can I help you?"),
365+
AgentChoiceReasoning("root", sess.ID, "Let me think about this..."),
366+
AgentChoiceReasoning("root", sess.ID, " I should respond politely."),
367+
AgentChoice("root", sess.ID, "Hello, how can I help you?"),
368368
MessageAdded(sess.ID, msgAdded.Message, "root"),
369369
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 10, OutputTokens: 15, ContextLength: 25, LastMessage: &MessageUsage{
370370
Usage: chat.Usage{InputTokens: 10, OutputTokens: 15},
@@ -402,10 +402,10 @@ func TestMixedContentAndReasoning(t *testing.T) {
402402
UserMessage("Hi there", sess.ID, nil, 0),
403403
StreamStarted(sess.ID, "root"),
404404
ToolsetInfo(0, false, "root"),
405-
AgentChoiceReasoning("root", "The user wants a greeting"),
406-
AgentChoice("root", "Hello!"),
407-
AgentChoiceReasoning("root", " I should be friendly"),
408-
AgentChoice("root", " How can I help you today?"),
405+
AgentChoiceReasoning("root", sess.ID, "The user wants a greeting"),
406+
AgentChoice("root", sess.ID, "Hello!"),
407+
AgentChoiceReasoning("root", sess.ID, " I should be friendly"),
408+
AgentChoice("root", sess.ID, " How can I help you today?"),
409409
MessageAdded(sess.ID, msgAdded.Message, "root"),
410410
NewTokenUsageEvent(sess.ID, "root", &Usage{InputTokens: 15, OutputTokens: 20, ContextLength: 35, LastMessage: &MessageUsage{
411411
Usage: chat.Usage{InputTokens: 15, OutputTokens: 20},

pkg/runtime/streaming.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
164164
}
165165

166166
if choice.Delta.ReasoningContent != "" {
167-
events <- AgentChoiceReasoning(a.Name(), choice.Delta.ReasoningContent)
167+
events <- AgentChoiceReasoning(a.Name(), sess.ID, choice.Delta.ReasoningContent)
168168
fullReasoningContent.WriteString(choice.Delta.ReasoningContent)
169169
}
170170

@@ -174,7 +174,7 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
174174
}
175175

176176
if choice.Delta.Content != "" {
177-
events <- AgentChoice(a.Name(), choice.Delta.Content)
177+
events <- AgentChoice(a.Name(), sess.ID, choice.Delta.Content)
178178
fullContent.WriteString(choice.Delta.Content)
179179
}
180180
}

0 commit comments

Comments
 (0)