Skip to content

Commit 8d5744e

Browse files
committed
Throttle and merge incoming events before sending them to the TUI
Sometimes the LLMs just send too many tokens per second, we don't want to re-render completely for each token in that case. So we throttle and merge partial tool call, reasoning and agent choice events. This change brings the CPU usage on fast LLMs down from ~130% CPU to ~25-30%. Signed-off-by: Djordje Lukic <djordje.lukic@docker.com>
1 parent 145eac2 commit 8d5744e

1 file changed

Lines changed: 167 additions & 15 deletions

File tree

pkg/app/app.go

Lines changed: 167 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"os/exec"
66
"strings"
7+
"time"
78

89
tea "github.com/charmbracelet/bubbletea/v2"
910

@@ -13,24 +14,26 @@ import (
1314
)
1415

1516
type App struct {
16-
title string
17-
agentFilename string
18-
runtime runtime.Runtime
19-
team *team.Team
20-
session *session.Session
21-
firstMessage *string
22-
events chan tea.Msg
17+
title string
18+
agentFilename string
19+
runtime runtime.Runtime
20+
team *team.Team
21+
session *session.Session
22+
firstMessage *string
23+
events chan tea.Msg
24+
throttleDuration time.Duration
2325
}
2426

2527
func New(title, agentFilename string, rt runtime.Runtime, agents *team.Team, sess *session.Session, firstMessage *string) *App {
2628
return &App{
27-
title: title,
28-
agentFilename: agentFilename,
29-
runtime: rt,
30-
team: agents,
31-
session: sess,
32-
firstMessage: firstMessage,
33-
events: make(chan tea.Msg, 128),
29+
title: title,
30+
agentFilename: agentFilename,
31+
runtime: rt,
32+
team: agents,
33+
session: sess,
34+
firstMessage: firstMessage,
35+
events: make(chan tea.Msg, 128),
36+
throttleDuration: 50 * time.Millisecond, // Throttle rapid events
3437
}
3538
}
3639

@@ -65,11 +68,12 @@ func (a *App) Run(ctx context.Context, message string) {
6568
}
6669

6770
func (a *App) Subscribe(ctx context.Context, program *tea.Program) {
71+
throttledChan := a.throttleEvents(ctx, a.events)
6872
for {
6973
select {
7074
case <-ctx.Done():
7175
return
72-
case msg, ok := <-a.events:
76+
case msg, ok := <-throttledChan:
7377
if !ok {
7478
return
7579
}
@@ -91,3 +95,151 @@ func (a *App) ResumeStartOAuth(confirmation bool) {
9195
a.runtime.ResumeStartAuthorizationFlow(context.Background(), confirmation)
9296
}
9397
}
98+
99+
// throttleEvents buffers and merges rapid events to prevent UI flooding
100+
func (a *App) throttleEvents(ctx context.Context, in <-chan tea.Msg) <-chan tea.Msg {
101+
out := make(chan tea.Msg, 128)
102+
103+
go func() {
104+
defer close(out)
105+
106+
var buffer []tea.Msg
107+
ticker := time.NewTicker(a.throttleDuration)
108+
defer ticker.Stop()
109+
110+
flush := func() {
111+
if len(buffer) == 0 {
112+
return
113+
}
114+
115+
// Merge events if possible
116+
merged := a.mergeEvents(buffer)
117+
for _, msg := range merged {
118+
select {
119+
case out <- msg:
120+
case <-ctx.Done():
121+
return
122+
}
123+
}
124+
buffer = buffer[:0]
125+
}
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
flush()
131+
return
132+
133+
case msg, ok := <-in:
134+
if !ok {
135+
flush()
136+
return
137+
}
138+
139+
// Check if this event type should be throttled
140+
if a.shouldThrottle(msg) {
141+
buffer = append(buffer, msg)
142+
} else {
143+
// Pass through immediately for important events
144+
flush() // Flush any buffered events first
145+
select {
146+
case out <- msg:
147+
case <-ctx.Done():
148+
return
149+
}
150+
}
151+
152+
case <-ticker.C:
153+
flush()
154+
}
155+
}
156+
}()
157+
158+
return out
159+
}
160+
161+
// shouldThrottle determines if an event should be buffered/throttled
162+
func (a *App) shouldThrottle(msg tea.Msg) bool {
163+
switch msg.(type) {
164+
case *runtime.AgentChoiceEvent:
165+
return true
166+
case *runtime.AgentChoiceReasoningEvent:
167+
return true
168+
case *runtime.PartialToolCallEvent:
169+
return true
170+
default:
171+
return false
172+
}
173+
}
174+
175+
// mergeEvents merges consecutive similar events to reduce UI updates
176+
func (a *App) mergeEvents(events []tea.Msg) []tea.Msg {
177+
if len(events) == 0 {
178+
return events
179+
}
180+
181+
var result []tea.Msg
182+
183+
// Group events by type and merge
184+
for i := 0; i < len(events); i++ {
185+
current := events[i]
186+
187+
switch ev := current.(type) {
188+
case *runtime.AgentChoiceEvent:
189+
// Merge consecutive AgentChoiceEvents with same agent
190+
merged := ev
191+
for i+1 < len(events) {
192+
if next, ok := events[i+1].(*runtime.AgentChoiceEvent); ok && next.AgentName == ev.AgentName {
193+
// Concatenate content
194+
merged = &runtime.AgentChoiceEvent{
195+
Type: ev.Type,
196+
Content: merged.Content + next.Content,
197+
AgentContext: ev.AgentContext,
198+
}
199+
i++
200+
} else {
201+
break
202+
}
203+
}
204+
result = append(result, merged)
205+
206+
case *runtime.AgentChoiceReasoningEvent:
207+
// Merge consecutive AgentChoiceReasoningEvents with same agent
208+
merged := ev
209+
for i+1 < len(events) {
210+
if next, ok := events[i+1].(*runtime.AgentChoiceReasoningEvent); ok && next.AgentName == ev.AgentName {
211+
// Concatenate content
212+
merged = &runtime.AgentChoiceReasoningEvent{
213+
Type: ev.Type,
214+
Content: merged.Content + next.Content,
215+
AgentContext: ev.AgentContext,
216+
}
217+
i++
218+
} else {
219+
break
220+
}
221+
}
222+
result = append(result, merged)
223+
224+
case *runtime.PartialToolCallEvent:
225+
// For PartialToolCallEvent, keep only the latest one per tool call ID
226+
// Check if there's a newer one in the buffer
227+
var latest *runtime.PartialToolCallEvent = ev
228+
for j := i + 1; j < len(events); j++ {
229+
if next, ok := events[j].(*runtime.PartialToolCallEvent); ok {
230+
if next.ToolCall.ID == ev.ToolCall.ID {
231+
latest = next
232+
i = j // Skip to this position
233+
}
234+
}
235+
}
236+
result = append(result, latest)
237+
238+
default:
239+
// Pass through other events as-is
240+
result = append(result, current)
241+
}
242+
}
243+
244+
return result
245+
}

0 commit comments

Comments
 (0)