Skip to content

Commit 717c63a

Browse files
committed
Extract SteerQueue interface for pluggable storage
Introduce a SteerQueue interface (Enqueue/Drain) so that callers can provide their own storage implementation for steered messages. The default InMemorySteerQueue uses a buffered channel and is created automatically. Custom implementations can be injected via the WithSteerQueue option on LocalRuntime.
1 parent 2bb7953 commit 717c63a

1 file changed

Lines changed: 68 additions & 26 deletions

File tree

pkg/runtime/runtime.go

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,64 @@ func ResumeReject(reason string) ResumeRequest {
8181
}
8282

8383
// SteeredMessage is a user message injected mid-turn while the agent loop is
84-
// running. It is enqueued via Steer() and drained inside the loop between
84+
// running. It is enqueued via a SteerQueue and drained inside the loop between
8585
// tool execution and the stop-condition check.
8686
type SteeredMessage struct {
8787
Content string
8888
MultiContent []chat.MessagePart
8989
}
9090

91-
// maxSteeredMessages is the maximum number of steered messages that can be
92-
// buffered before Steer() starts rejecting new messages.
93-
const maxSteeredMessages = 5
91+
// SteerQueue is the interface for storing steered messages that are injected
92+
// into a running agent loop mid-turn. Implementations must be safe for
93+
// concurrent use: Enqueue is called from API handlers while Drain is called
94+
// from the agent loop goroutine.
95+
//
96+
// The default implementation is InMemorySteerQueue. Callers that need
97+
// durable or distributed storage can provide their own implementation
98+
// via the WithSteerQueue option.
99+
type SteerQueue interface {
100+
// Enqueue adds a message to the queue. Returns false if the queue is
101+
// full and the message was not accepted.
102+
Enqueue(msg SteeredMessage) bool
103+
// Drain returns all pending messages and removes them from the queue.
104+
// It must not block — if the queue is empty it returns nil.
105+
Drain() []SteeredMessage
106+
}
107+
108+
// inMemorySteerQueue is the default SteerQueue backed by a buffered channel.
109+
type inMemorySteerQueue struct {
110+
ch chan SteeredMessage
111+
}
112+
113+
// defaultSteerQueueCapacity is the buffer size for the default in-memory queue.
114+
const defaultSteerQueueCapacity = 5
115+
116+
// NewInMemorySteerQueue creates a SteerQueue backed by a buffered channel
117+
// with the given capacity.
118+
func NewInMemorySteerQueue(capacity int) SteerQueue {
119+
return &inMemorySteerQueue{ch: make(chan SteeredMessage, capacity)}
120+
}
121+
122+
func (q *inMemorySteerQueue) Enqueue(msg SteeredMessage) bool {
123+
select {
124+
case q.ch <- msg:
125+
return true
126+
default:
127+
return false
128+
}
129+
}
130+
131+
func (q *inMemorySteerQueue) Drain() []SteeredMessage {
132+
var msgs []SteeredMessage
133+
for {
134+
select {
135+
case m := <-q.ch:
136+
msgs = append(msgs, m)
137+
default:
138+
return msgs
139+
}
140+
}
141+
}
94142

95143
// ToolHandlerFunc is a function type for handling tool calls
96144
type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error)
@@ -213,11 +261,10 @@ type LocalRuntime struct {
213261

214262
currentAgentMu sync.RWMutex
215263

216-
// steerCh receives user messages injected mid-turn via Steer().
217-
// The agent loop drains this channel after tool execution, before
218-
// checking the stop condition, so the LLM sees the new message on
219-
// its next iteration.
220-
steerCh chan SteeredMessage
264+
// steerQueue stores user messages injected mid-turn. The agent loop
265+
// drains this queue after tool execution, before checking the stop
266+
// condition, so the LLM sees the new messages on its next iteration.
267+
steerQueue SteerQueue
221268

222269
// onToolsChanged is called when an MCP toolset reports a tool list change.
223270
onToolsChanged func(Event)
@@ -246,6 +293,14 @@ func WithTracer(t trace.Tracer) Opt {
246293
}
247294
}
248295

296+
// WithSteerQueue sets a custom SteerQueue implementation for mid-turn message
297+
// injection. If not provided, an in-memory buffered queue is used.
298+
func WithSteerQueue(q SteerQueue) Opt {
299+
return func(r *LocalRuntime) {
300+
r.steerQueue = q
301+
}
302+
}
303+
249304
func WithSessionCompaction(sessionCompaction bool) Opt {
250305
return func(r *LocalRuntime) {
251306
r.sessionCompaction = sessionCompaction
@@ -309,7 +364,7 @@ func NewLocalRuntime(agents *team.Team, opts ...Opt) (*LocalRuntime, error) {
309364
currentAgent: defaultAgent.Name(),
310365
resumeChan: make(chan ResumeRequest),
311366
elicitationRequestCh: make(chan ElicitationResult),
312-
steerCh: make(chan SteeredMessage, maxSteeredMessages),
367+
steerQueue: NewInMemorySteerQueue(defaultSteerQueueCapacity),
313368
sessionCompaction: true,
314369
managedOAuth: true,
315370
sessionStore: session.NewInMemorySessionStore(),
@@ -1037,29 +1092,16 @@ func (r *LocalRuntime) ResumeElicitation(ctx context.Context, action tools.Elici
10371092
// Steer enqueues a user message for mid-turn injection into the running
10381093
// agent loop. The message will be picked up after the current batch of tool
10391094
// calls finishes but before the loop checks whether to stop. Returns false
1040-
// if the steer buffer is full and the message was not enqueued.
1095+
// if the queue is full and the message was not enqueued.
10411096
func (r *LocalRuntime) Steer(msg SteeredMessage) bool {
1042-
select {
1043-
case r.steerCh <- msg:
1044-
return true
1045-
default:
1046-
return false
1047-
}
1097+
return r.steerQueue.Enqueue(msg)
10481098
}
10491099

10501100
// DrainSteeredMessages returns all pending steered messages without blocking.
10511101
// It is called inside the agent loop to batch-inject any messages that arrived
10521102
// while the current iteration was in progress.
10531103
func (r *LocalRuntime) DrainSteeredMessages() []SteeredMessage {
1054-
var msgs []SteeredMessage
1055-
for {
1056-
select {
1057-
case m := <-r.steerCh:
1058-
msgs = append(msgs, m)
1059-
default:
1060-
return msgs
1061-
}
1062-
}
1104+
return r.steerQueue.Drain()
10631105
}
10641106

10651107
// Run starts the agent's interaction loop

0 commit comments

Comments
 (0)