Skip to content

Commit 9f8b3fa

Browse files
authored
Merge pull request #521 from krissetto/fix-anthropic-msg-sequencing-edge-cases
Add msg sequencing guards for anthropic
2 parents 7b74cda + 36d9842 commit 9f8b3fa

3 files changed

Lines changed: 419 additions & 5 deletions

File tree

pkg/model/provider/anthropic/beta_client.go

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package anthropic
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
58
"log/slog"
69

710
"github.com/anthropics/anthropic-sdk-go"
@@ -25,10 +28,20 @@ func (c *Client) createBetaStream(
2528
return nil, err
2629
}
2730

31+
converted := convertBetaMessages(messages)
32+
if err := validateAnthropicSequencingBeta(converted); err != nil {
33+
slog.Warn("Invalid message sequencing for Anthropic Beta API detected, attempting self-repair", "error", err)
34+
converted = repairAnthropicSequencingBeta(converted)
35+
if err2 := validateAnthropicSequencingBeta(converted); err2 != nil {
36+
slog.Error("Failed to self-repair Anthropic Beta sequencing", "error", err2)
37+
return nil, err
38+
}
39+
}
40+
2841
params := anthropic.BetaMessageNewParams{
2942
Model: anthropic.Model(c.config.Model),
3043
MaxTokens: maxTokens,
31-
Messages: convertBetaMessages(messages),
44+
Messages: converted,
3245
Tools: allTools,
3346
Betas: []anthropic.AnthropicBeta{anthropic.AnthropicBetaInterleavedThinking2025_05_14},
3447
}
@@ -69,3 +82,107 @@ func (c *Client) createBetaStream(
6982

7083
return newBetaStreamAdapter(stream), nil
7184
}
85+
86+
// validateAnthropicSequencingBeta performs the same validation as standard API but for Beta payloads
87+
func validateAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) error {
88+
for i := range msgs {
89+
m, ok := marshalToMapBeta(msgs[i])
90+
if !ok || m["role"] != "assistant" {
91+
continue
92+
}
93+
94+
toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
95+
if len(toolUseIDs) == 0 {
96+
continue
97+
}
98+
99+
if i+1 >= len(msgs) {
100+
slog.Warn("Anthropic (beta) sequencing invalid: assistant tool_use present but no next user tool_result message", "assistant_index", i)
101+
return errors.New("assistant tool_use present but no subsequent user message with tool_result blocks (beta)")
102+
}
103+
104+
next, ok := marshalToMapBeta(msgs[i+1])
105+
if !ok || next["role"] != "user" {
106+
slog.Warn("Anthropic (beta) sequencing invalid: next message after assistant tool_use is not user", "assistant_index", i, "next_role", next["role"])
107+
return errors.New("assistant tool_use must be followed by a user message containing corresponding tool_result blocks (beta)")
108+
}
109+
110+
toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
111+
missing := differenceIDs(toolUseIDs, toolResultIDs)
112+
if len(missing) > 0 {
113+
slog.Warn("Anthropic (beta) sequencing invalid: missing tool_result for tool_use id in next user message", "assistant_index", i, "tool_use_id", missing[0], "missing_count", len(missing))
114+
return fmt.Errorf("missing tool_result for tool_use id %s in the next user message (beta)", missing[0])
115+
}
116+
}
117+
return nil
118+
}
119+
120+
// repairAnthropicSequencingBeta inserts a synthetic user message with tool_result blocks
121+
// for any assistant tool_use blocks that don't have corresponding tool_result blocks
122+
// in the immediate next user message.
123+
func repairAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) []anthropic.BetaMessageParam {
124+
if len(msgs) == 0 {
125+
return msgs
126+
}
127+
repaired := make([]anthropic.BetaMessageParam, 0, len(msgs)+2)
128+
for i := range msgs {
129+
repaired = append(repaired, msgs[i])
130+
131+
m, ok := marshalToMapBeta(msgs[i])
132+
if !ok || m["role"] != "assistant" {
133+
continue
134+
}
135+
136+
toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
137+
if len(toolUseIDs) == 0 {
138+
continue
139+
}
140+
141+
if i+1 < len(msgs) {
142+
if next, ok := marshalToMapBeta(msgs[i+1]); ok && next["role"] == "user" {
143+
toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
144+
for id := range toolResultIDs {
145+
delete(toolUseIDs, id)
146+
}
147+
}
148+
}
149+
150+
if len(toolUseIDs) > 0 {
151+
blocks := make([]anthropic.BetaContentBlockParamUnion, 0, len(toolUseIDs))
152+
for id := range toolUseIDs {
153+
blocks = append(blocks, anthropic.BetaContentBlockParamUnion{
154+
OfToolResult: &anthropic.BetaToolResultBlockParam{
155+
ToolUseID: id,
156+
Content: []anthropic.BetaToolResultBlockParamContentUnion{
157+
{OfText: &anthropic.BetaTextBlockParam{Text: "(tool execution failed)"}},
158+
},
159+
},
160+
})
161+
}
162+
repaired = append(repaired, anthropic.BetaMessageParam{
163+
Role: anthropic.BetaMessageParamRoleUser,
164+
Content: blocks,
165+
})
166+
}
167+
}
168+
return repaired
169+
}
170+
171+
func marshalToMapBeta(v any) (map[string]any, bool) {
172+
b, err := json.Marshal(v)
173+
if err != nil {
174+
return nil, false
175+
}
176+
var m map[string]any
177+
if json.Unmarshal(b, &m) != nil {
178+
return nil, false
179+
}
180+
return m, true
181+
}
182+
183+
func contentArrayBeta(m map[string]any) []any {
184+
if a, ok := m["content"].([]any); ok {
185+
return a
186+
}
187+
return nil
188+
}

pkg/model/provider/anthropic/client.go

Lines changed: 184 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"log/slog"
89
"strings"
910

@@ -167,10 +168,21 @@ func (c *Client) CreateChatCompletionStream(
167168
return nil, err
168169
}
169170

171+
converted := convertMessages(messages)
172+
// Preflight validation to ensure tool_use/tool_result sequencing is valid
173+
if err := validateAnthropicSequencing(converted); err != nil {
174+
slog.Warn("Invalid message sequencing for Anthropic detected, attempting self-repair", "error", err)
175+
converted = repairAnthropicSequencing(converted)
176+
if err2 := validateAnthropicSequencing(converted); err2 != nil {
177+
slog.Error("Failed to self-repair Anthropic sequencing", "error", err2)
178+
return nil, err
179+
}
180+
}
181+
170182
params := anthropic.MessageNewParams{
171183
Model: anthropic.Model(c.config.Model),
172184
MaxTokens: maxTokens,
173-
Messages: convertMessages(messages),
185+
Messages: converted,
174186
Tools: allTools,
175187
}
176188

@@ -219,8 +231,11 @@ func (c *Client) CreateChatCompletionStream(
219231

220232
func convertMessages(messages []chat.Message) []anthropic.MessageParam {
221233
var anthropicMessages []anthropic.MessageParam
234+
// Track whether the last appended assistant message included tool_use blocks
235+
// so we can ensure the immediate next message is the grouped tool_result user message.
236+
pendingAssistantToolUse := false
222237

223-
for i := range messages {
238+
for i := 0; i < len(messages); i++ {
224239
msg := &messages[i]
225240
if msg.Role == chat.MessageRoleSystem {
226241
// System messages are handled via the top-level params.System
@@ -333,19 +348,43 @@ func convertMessages(messages []chat.Message) []anthropic.MessageParam {
333348
}
334349
}
335350
anthropicMessages = append(anthropicMessages, anthropic.NewAssistantMessage(toolUseBlocks...))
351+
// Mark that we expect the very next message to be the grouped tool_result blocks.
352+
pendingAssistantToolUse = true
336353
} else {
337354
if txt := strings.TrimSpace(msg.Content); txt != "" {
338355
contentBlocks = append(contentBlocks, anthropic.NewTextBlock(txt))
339356
}
340357
if len(contentBlocks) > 0 {
341358
anthropicMessages = append(anthropicMessages, anthropic.NewAssistantMessage(contentBlocks...))
342359
}
360+
// No tool_use in this assistant message
361+
pendingAssistantToolUse = false
343362
}
344363
continue
345364
}
346365
if msg.Role == chat.MessageRoleTool {
347-
toolResult := anthropic.NewToolResultBlock(msg.ToolCallID, strings.TrimSpace(msg.Content), false)
348-
anthropicMessages = append(anthropicMessages, anthropic.NewUserMessage(toolResult))
366+
// Group consecutive tool results into a single user message.
367+
//
368+
// This is to satisfy Anthropic's requirement that tool_use blocks are immediately followed
369+
// by a single user message containing all corresponding tool_result blocks.
370+
var blocks []anthropic.ContentBlockParamUnion
371+
j := i
372+
for j < len(messages) && messages[j].Role == chat.MessageRoleTool {
373+
tr := anthropic.NewToolResultBlock(messages[j].ToolCallID, strings.TrimSpace(messages[j].Content), false)
374+
blocks = append(blocks, tr)
375+
j++
376+
}
377+
if len(blocks) > 0 {
378+
// Only include tool_result blocks if they immediately follow an assistant
379+
// message that contained tool_use. Otherwise, drop them to avoid invalid
380+
// sequencing errors.
381+
if pendingAssistantToolUse {
382+
anthropicMessages = append(anthropicMessages, anthropic.NewUserMessage(blocks...))
383+
}
384+
// Whether we used them or not, we've now handled the expected tool_result slot.
385+
pendingAssistantToolUse = false
386+
}
387+
i = j - 1
349388
continue
350389
}
351390
}
@@ -417,3 +456,144 @@ func (c *Client) ID() string {
417456
func (c *Client) Options() options.ModelOptions {
418457
return c.modelOptions
419458
}
459+
460+
// validateAnthropicSequencing verifies that for every assistant message that includes
461+
// one or more tool_use blocks, the immediately following message is a user message
462+
// that includes tool_result blocks for all those tool_use IDs (grouped into that single message).
463+
func validateAnthropicSequencing(msgs []anthropic.MessageParam) error {
464+
// Marshal-based inspection to avoid depending on SDK internals of union types
465+
for i := range msgs {
466+
m, ok := marshalToMap(msgs[i])
467+
if !ok || m["role"] != "assistant" {
468+
continue
469+
}
470+
471+
toolUseIDs := collectToolUseIDs(contentArray(m))
472+
if len(toolUseIDs) == 0 {
473+
continue
474+
}
475+
476+
if i+1 >= len(msgs) {
477+
slog.Warn("Anthropic sequencing invalid: assistant tool_use present but no next user tool_result message", "assistant_index", i)
478+
return errors.New("assistant tool_use present but no subsequent user message with tool_result blocks")
479+
}
480+
481+
next, ok := marshalToMap(msgs[i+1])
482+
if !ok || next["role"] != "user" {
483+
slog.Warn("Anthropic sequencing invalid: next message after assistant tool_use is not user", "assistant_index", i, "next_role", next["role"])
484+
return errors.New("assistant tool_use must be followed by a user message containing corresponding tool_result blocks")
485+
}
486+
487+
toolResultIDs := collectToolResultIDs(contentArray(next))
488+
missing := differenceIDs(toolUseIDs, toolResultIDs)
489+
if len(missing) > 0 {
490+
slog.Warn("Anthropic sequencing invalid: missing tool_result for tool_use id in next user message", "assistant_index", i, "tool_use_id", missing[0], "missing_count", len(missing))
491+
return fmt.Errorf("missing tool_result for tool_use id %s in the next user message", missing[0])
492+
}
493+
}
494+
return nil
495+
}
496+
497+
// repairAnthropicSequencing inserts a synthetic user message containing tool_result blocks
498+
// immediately after any assistant message that has tool_use blocks missing a corresponding
499+
// tool_result in the next user message. This is a best-effort local repair to keep the
500+
// conversation valid for Anthropic while preserving original messages, to keep the agent loop running.
501+
func repairAnthropicSequencing(msgs []anthropic.MessageParam) []anthropic.MessageParam {
502+
if len(msgs) == 0 {
503+
return msgs
504+
}
505+
repaired := make([]anthropic.MessageParam, 0, len(msgs)+2)
506+
for i := range msgs {
507+
repaired = append(repaired, msgs[i])
508+
509+
m, ok := marshalToMap(msgs[i])
510+
if !ok || m["role"] != "assistant" {
511+
continue
512+
}
513+
514+
toolUseIDs := collectToolUseIDs(contentArray(m))
515+
if len(toolUseIDs) == 0 {
516+
continue
517+
}
518+
519+
// Remove any IDs that already have results in the next user message
520+
if i+1 < len(msgs) {
521+
if next, ok := marshalToMap(msgs[i+1]); ok && next["role"] == "user" {
522+
toolResultIDs := collectToolResultIDs(contentArray(next))
523+
for id := range toolResultIDs {
524+
delete(toolUseIDs, id)
525+
}
526+
}
527+
}
528+
529+
if len(toolUseIDs) > 0 {
530+
blocks := make([]anthropic.ContentBlockParamUnion, 0, len(toolUseIDs))
531+
for id := range toolUseIDs {
532+
blocks = append(blocks, anthropic.NewToolResultBlock(id, "(tool execution failed)", false))
533+
}
534+
repaired = append(repaired, anthropic.NewUserMessage(blocks...))
535+
}
536+
}
537+
return repaired
538+
}
539+
540+
// Helpers for map-based inspection
541+
func marshalToMap(v any) (map[string]any, bool) {
542+
b, err := json.Marshal(v)
543+
if err != nil {
544+
return nil, false
545+
}
546+
var m map[string]any
547+
if json.Unmarshal(b, &m) != nil {
548+
return nil, false
549+
}
550+
return m, true
551+
}
552+
553+
func contentArray(m map[string]any) []any {
554+
if a, ok := m["content"].([]any); ok {
555+
return a
556+
}
557+
return nil
558+
}
559+
560+
func collectToolUseIDs(content []any) map[string]struct{} {
561+
ids := make(map[string]struct{})
562+
for _, c := range content {
563+
if cb, ok := c.(map[string]any); ok {
564+
if t, _ := cb["type"].(string); t == "tool_use" {
565+
if id, _ := cb["id"].(string); id != "" {
566+
ids[id] = struct{}{}
567+
}
568+
}
569+
}
570+
}
571+
return ids
572+
}
573+
574+
func collectToolResultIDs(content []any) map[string]struct{} {
575+
ids := make(map[string]struct{})
576+
for _, c := range content {
577+
if cb, ok := c.(map[string]any); ok {
578+
if t, _ := cb["type"].(string); t == "tool_result" {
579+
if id, _ := cb["tool_use_id"].(string); id != "" {
580+
ids[id] = struct{}{}
581+
}
582+
}
583+
}
584+
}
585+
return ids
586+
}
587+
588+
func differenceIDs(a, b map[string]struct{}) []string {
589+
if len(a) == 0 {
590+
return nil
591+
}
592+
var missing []string
593+
for id := range a {
594+
if _, ok := b[id]; !ok {
595+
missing = append(missing, id)
596+
}
597+
}
598+
return missing
599+
}

0 commit comments

Comments
 (0)