Skip to content

Commit e2f14da

Browse files
authored
Merge pull request #371 from rumpl/tui-throttle
Throttle and merge incoming events before sending them to the TUI
2 parents 145eac2 + 8d5744e commit e2f14da

1 file changed

Lines changed: 167 additions & 15 deletions

File tree

pkg/app/app.go

Lines changed: 167 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"os/exec"
66
"strings"
7+
"time"
78

89
tea "github.com/charmbracelet/bubbletea/v2"
910

@@ -13,24 +14,26 @@ import (
1314
)
1415

1516
type App struct {
16-
title string
17-
agentFilename string
18-
runtime runtime.Runtime
19-
team *team.Team
20-
session *session.Session
21-
firstMessage *string
22-
events chan tea.Msg
17+
title string
18+
agentFilename string
19+
runtime runtime.Runtime
20+
team *team.Team
21+
session *session.Session
22+
firstMessage *string
23+
events chan tea.Msg
24+
throttleDuration time.Duration
2325
}
2426

2527
func New(title, agentFilename string, rt runtime.Runtime, agents *team.Team, sess *session.Session, firstMessage *string) *App {
2628
return &App{
27-
title: title,
28-
agentFilename: agentFilename,
29-
runtime: rt,
30-
team: agents,
31-
session: sess,
32-
firstMessage: firstMessage,
33-
events: make(chan tea.Msg, 128),
29+
title: title,
30+
agentFilename: agentFilename,
31+
runtime: rt,
32+
team: agents,
33+
session: sess,
34+
firstMessage: firstMessage,
35+
events: make(chan tea.Msg, 128),
36+
throttleDuration: 50 * time.Millisecond, // Throttle rapid events
3437
}
3538
}
3639

@@ -65,11 +68,12 @@ func (a *App) Run(ctx context.Context, message string) {
6568
}
6669

6770
func (a *App) Subscribe(ctx context.Context, program *tea.Program) {
71+
throttledChan := a.throttleEvents(ctx, a.events)
6872
for {
6973
select {
7074
case <-ctx.Done():
7175
return
72-
case msg, ok := <-a.events:
76+
case msg, ok := <-throttledChan:
7377
if !ok {
7478
return
7579
}
@@ -91,3 +95,151 @@ func (a *App) ResumeStartOAuth(confirmation bool) {
9195
a.runtime.ResumeStartAuthorizationFlow(context.Background(), confirmation)
9296
}
9397
}
98+
99+
// throttleEvents buffers and merges rapid events to prevent UI flooding
100+
func (a *App) throttleEvents(ctx context.Context, in <-chan tea.Msg) <-chan tea.Msg {
101+
out := make(chan tea.Msg, 128)
102+
103+
go func() {
104+
defer close(out)
105+
106+
var buffer []tea.Msg
107+
ticker := time.NewTicker(a.throttleDuration)
108+
defer ticker.Stop()
109+
110+
flush := func() {
111+
if len(buffer) == 0 {
112+
return
113+
}
114+
115+
// Merge events if possible
116+
merged := a.mergeEvents(buffer)
117+
for _, msg := range merged {
118+
select {
119+
case out <- msg:
120+
case <-ctx.Done():
121+
return
122+
}
123+
}
124+
buffer = buffer[:0]
125+
}
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
flush()
131+
return
132+
133+
case msg, ok := <-in:
134+
if !ok {
135+
flush()
136+
return
137+
}
138+
139+
// Check if this event type should be throttled
140+
if a.shouldThrottle(msg) {
141+
buffer = append(buffer, msg)
142+
} else {
143+
// Pass through immediately for important events
144+
flush() // Flush any buffered events first
145+
select {
146+
case out <- msg:
147+
case <-ctx.Done():
148+
return
149+
}
150+
}
151+
152+
case <-ticker.C:
153+
flush()
154+
}
155+
}
156+
}()
157+
158+
return out
159+
}
160+
161+
// shouldThrottle determines if an event should be buffered/throttled
162+
func (a *App) shouldThrottle(msg tea.Msg) bool {
163+
switch msg.(type) {
164+
case *runtime.AgentChoiceEvent:
165+
return true
166+
case *runtime.AgentChoiceReasoningEvent:
167+
return true
168+
case *runtime.PartialToolCallEvent:
169+
return true
170+
default:
171+
return false
172+
}
173+
}
174+
175+
// mergeEvents merges consecutive similar events to reduce UI updates
176+
func (a *App) mergeEvents(events []tea.Msg) []tea.Msg {
177+
if len(events) == 0 {
178+
return events
179+
}
180+
181+
var result []tea.Msg
182+
183+
// Group events by type and merge
184+
for i := 0; i < len(events); i++ {
185+
current := events[i]
186+
187+
switch ev := current.(type) {
188+
case *runtime.AgentChoiceEvent:
189+
// Merge consecutive AgentChoiceEvents with same agent
190+
merged := ev
191+
for i+1 < len(events) {
192+
if next, ok := events[i+1].(*runtime.AgentChoiceEvent); ok && next.AgentName == ev.AgentName {
193+
// Concatenate content
194+
merged = &runtime.AgentChoiceEvent{
195+
Type: ev.Type,
196+
Content: merged.Content + next.Content,
197+
AgentContext: ev.AgentContext,
198+
}
199+
i++
200+
} else {
201+
break
202+
}
203+
}
204+
result = append(result, merged)
205+
206+
case *runtime.AgentChoiceReasoningEvent:
207+
// Merge consecutive AgentChoiceReasoningEvents with same agent
208+
merged := ev
209+
for i+1 < len(events) {
210+
if next, ok := events[i+1].(*runtime.AgentChoiceReasoningEvent); ok && next.AgentName == ev.AgentName {
211+
// Concatenate content
212+
merged = &runtime.AgentChoiceReasoningEvent{
213+
Type: ev.Type,
214+
Content: merged.Content + next.Content,
215+
AgentContext: ev.AgentContext,
216+
}
217+
i++
218+
} else {
219+
break
220+
}
221+
}
222+
result = append(result, merged)
223+
224+
case *runtime.PartialToolCallEvent:
225+
// For PartialToolCallEvent, keep only the latest one per tool call ID
226+
// Check if there's a newer one in the buffer
227+
var latest *runtime.PartialToolCallEvent = ev
228+
for j := i + 1; j < len(events); j++ {
229+
if next, ok := events[j].(*runtime.PartialToolCallEvent); ok {
230+
if next.ToolCall.ID == ev.ToolCall.ID {
231+
latest = next
232+
i = j // Skip to this position
233+
}
234+
}
235+
}
236+
result = append(result, latest)
237+
238+
default:
239+
// Pass through other events as-is
240+
result = append(result, current)
241+
}
242+
}
243+
244+
return result
245+
}

0 commit comments

Comments
 (0)