1- import type { Context } from '@opentelemetry/api'
1+ import { type Context , SpanStatusCode } from '@opentelemetry/api'
22import { createLogger } from '@sim/logger'
33import { ORCHESTRATION_TIMEOUT_MS } from '@/lib/copilot/constants'
4- import { MothershipStreamV1SpanLifecycleEvent } from '@/lib/copilot/generated/mothership-stream-v1'
4+ import {
5+ type MothershipStreamV1EventType ,
6+ MothershipStreamV1SpanLifecycleEvent ,
7+ } from '@/lib/copilot/generated/mothership-stream-v1'
8+ import { CopilotSseCloseReason } from '@/lib/copilot/generated/trace-attribute-values-v1'
59import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1'
10+ import { TraceEvent } from '@/lib/copilot/generated/trace-events-v1'
11+ import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
612import { fetchGo } from '@/lib/copilot/request/go/fetch'
713import {
814 buildPreviewContentUpdate ,
@@ -17,6 +23,7 @@ import {
1723 sseHandlers ,
1824 subAgentHandlers ,
1925} from '@/lib/copilot/request/handlers'
26+ import { getCopilotTracer } from '@/lib/copilot/request/otel'
2027import {
2128 eventToStreamEvent ,
2229 isSubagentSpanStreamEvent ,
@@ -165,27 +172,74 @@ export async function runStreamLoop(
165172 url : fetchUrl ,
166173 method : fetchOptions . method ?? 'GET' ,
167174 } )
168- const bodyStart = performance . now ( )
169- let firstEventMs : number | undefined
170- let eventsReceived = 0
171- let endedOn = 'terminal'
172175
173- const reader = response . body . getReader ( )
176+ // Aggregate counters populated inline by the reader wrapper + onEvent
177+ // dispatcher below and flushed to both the legacy TraceCollector span
178+ // and the OTel read-loop span when the loop terminates. Kept as plain
179+ // JS variables (not span attrs) so incrementing them is free — we
180+ // only pay OTel cost once at span End().
181+ const counters = {
182+ bytes : 0 ,
183+ chunks : 0 ,
184+ events : 0 ,
185+ eventsByType : {
186+ session : 0 ,
187+ text : 0 ,
188+ tool : 0 ,
189+ span : 0 ,
190+ resource : 0 ,
191+ run : 0 ,
192+ error : 0 ,
193+ complete : 0 ,
194+ } as Record < MothershipStreamV1EventType , number > ,
195+ firstEventMs : undefined as number | undefined ,
196+ lastChunkMs : performance . now ( ) ,
197+ longestIdleGapMs : 0 ,
198+ }
199+ const bodyStart = performance . now ( )
200+ let endedOn : string = CopilotSseCloseReason . Terminal
201+
202+ // Wrap the body's reader so we can track per-chunk bytes and the gap
203+ // between chunks. `processSSEStream` consumes this reader exactly as
204+ // it would the raw one — no API changes there. The longest idle gap
205+ // is the diagnostic signal for "silent TTFT" perceived hangs: a gap
206+ // larger than the threshold means the user saw nothing for that long.
207+ const IDLE_GAP_EVENT_THRESHOLD_MS = 10000
208+ const rawReader = response . body . getReader ( )
209+ const reader : ReadableStreamDefaultReader < Uint8Array > = {
210+ async read ( ) {
211+ const result = await rawReader . read ( )
212+ if ( ! result . done && result . value ) {
213+ const now = performance . now ( )
214+ const gap = now - counters . lastChunkMs
215+ if ( gap > counters . longestIdleGapMs ) counters . longestIdleGapMs = gap
216+ counters . lastChunkMs = now
217+ counters . chunks += 1
218+ counters . bytes += result . value . byteLength
219+ }
220+ return result
221+ } ,
222+ cancel : ( reason ) => rawReader . cancel ( reason ) ,
223+ releaseLock : ( ) => rawReader . releaseLock ( ) ,
224+ get closed ( ) {
225+ return rawReader . closed
226+ } ,
227+ }
174228 const decoder = new TextDecoder ( )
175229
176230 const timeoutId = setTimeout ( ( ) => {
177231 context . errors . push ( 'Request timed out' )
178232 context . streamComplete = true
179- endedOn = 'timeout'
233+ endedOn = CopilotSseCloseReason . Timeout
180234 reader . cancel ( ) . catch ( ( ) => { } )
181235 } , timeout )
182236
183237 try {
184238 await processSSEStream ( reader , decoder , abortSignal , async ( raw ) => {
185- if ( eventsReceived === 0 ) {
186- firstEventMs = Math . round ( performance . now ( ) - bodyStart )
239+ if ( counters . events === 0 ) {
240+ counters . firstEventMs = Math . round ( performance . now ( ) - bodyStart )
187241 }
188- eventsReceived += 1
242+ counters . events += 1
189243 if ( abortSignal ?. aborted ) {
190244 context . wasAborted = true
191245 return true
@@ -214,6 +268,12 @@ export async function runStreamLoop(
214268 options . onGoTraceId ?.( goTraceId )
215269 }
216270
271+ // Per-type counters for the copilot.sse.read_loop span. Bound set
272+ // (8 types) so this can never blow up into high cardinality.
273+ if ( streamEvent . type in counters . eventsByType ) {
274+ counters . eventsByType [ streamEvent . type as MothershipStreamV1EventType ] += 1
275+ }
276+
217277 if ( shouldSkipToolCallEvent ( streamEvent ) || shouldSkipToolResultEvent ( streamEvent ) ) {
218278 return
219279 }
@@ -319,44 +379,58 @@ export async function runStreamLoop(
319379 requestId : context . requestId ,
320380 messageId : context . messageId ,
321381 } )
322- endedOn = 'closed_no_terminal'
382+ endedOn = CopilotSseCloseReason . ClosedNoTerminal
323383 throw new CopilotBackendError ( message , { status : 503 } )
324384 }
325385 } catch ( error ) {
326386 if ( error instanceof FatalSseEventError && ! context . errors . includes ( error . message ) ) {
327387 context . errors . push ( error . message )
328388 }
329- if ( endedOn === 'terminal' ) {
389+ if ( endedOn === CopilotSseCloseReason . Terminal ) {
330390 endedOn =
331391 error instanceof CopilotBackendError
332- ? 'backend_error'
392+ ? CopilotSseCloseReason . BackendError
333393 : error instanceof BillingLimitError
334- ? 'billing_limit'
335- : 'error'
394+ ? CopilotSseCloseReason . BillingLimit
395+ : CopilotSseCloseReason . Error
336396 }
337397 throw error
338398 } finally {
339399 if ( abortSignal ?. aborted ) {
340400 context . wasAborted = true
341401 await reader . cancel ( ) . catch ( ( ) => { } )
342- if ( endedOn === 'terminal' ) {
343- endedOn = 'aborted'
402+ if ( endedOn === CopilotSseCloseReason . Terminal ) {
403+ endedOn = CopilotSseCloseReason . Aborted
344404 }
345405 }
346406 clearTimeout ( timeoutId )
347407
408+ // Legacy TraceCollector span (consumed by the in-memory trace
409+ // collector, kept for backwards compatibility with existing
410+ // tooling). The real OTel span is stamped below.
348411 const bodyDurationMs = Math . round ( performance . now ( ) - bodyStart )
349412 bodySpan . attributes = {
350413 ...( bodySpan . attributes ?? { } ) ,
351- eventsReceived,
352- firstEventMs,
414+ eventsReceived : counters . events ,
415+ firstEventMs : counters . firstEventMs ,
353416 endedOn,
354417 durationMs : bodyDurationMs ,
355418 }
356419 context . trace . endSpan (
357420 bodySpan ,
358- endedOn === 'terminal' ? 'ok' : endedOn === 'aborted' ? 'cancelled' : 'error'
421+ endedOn === CopilotSseCloseReason . Terminal
422+ ? 'ok'
423+ : endedOn === CopilotSseCloseReason . Aborted
424+ ? 'cancelled'
425+ : 'error'
359426 )
427+
428+ // Real OTel span for Tempo/Grafana. Stamped aggregate-only so
429+ // there is no per-chunk OTel cost — one span per read loop with
430+ // integer counters, plus a bounded set of events.
431+ stampSseReadLoopSpan ( bodyStart , counters , endedOn , fetchUrl , pathname , {
432+ idleGapEventThresholdMs : IDLE_GAP_EVENT_THRESHOLD_MS ,
433+ } )
360434 }
361435}
362436
@@ -375,3 +449,91 @@ function estimateBodyBytes(body: BodyInit | null | undefined): number {
375449 }
376450 return 0
377451}
452+
453+ type SseReadLoopCounters = {
454+ bytes : number
455+ chunks : number
456+ events : number
457+ eventsByType : Record < MothershipStreamV1EventType , number >
458+ firstEventMs : number | undefined
459+ longestIdleGapMs : number
460+ }
461+
462+ /**
463+ * Ship a one-shot `copilot.sse.read_loop` OTel span with the aggregate
464+ * counters collected during the read loop. Uses `startTime` so the
465+ * span's duration reflects the actual loop wall clock even though we
466+ * only talk to OTel once at the end.
467+ *
468+ * Deliberately synchronous, no per-chunk span calls: total OTel cost
469+ * per read loop is fixed (~10 attrs + up to 3 events), independent of
470+ * chunk count.
471+ */
472+ function stampSseReadLoopSpan (
473+ startPerfMs : number ,
474+ counters : SseReadLoopCounters ,
475+ closeReason : string ,
476+ fetchUrl : string ,
477+ pathname : string ,
478+ opts : { idleGapEventThresholdMs : number }
479+ ) : void {
480+ // Translate performance.now() values into wall-clock Date values so
481+ // the span's timestamps land in real time (OTel accepts both, but we
482+ // need to pair startTime with a matching "now" for .end()).
483+ const nowPerf = performance . now ( )
484+ const nowWall = Date . now ( )
485+ const startWall = nowWall - ( nowPerf - startPerfMs )
486+
487+ const tracer = getCopilotTracer ( )
488+ const span = tracer . startSpan ( TraceSpan . CopilotSseReadLoop , {
489+ startTime : startWall ,
490+ attributes : {
491+ [ TraceAttr . HttpUrl ] : fetchUrl ,
492+ [ TraceAttr . HttpPath ] : pathname ,
493+ [ TraceAttr . CopilotSseBytesReceived ] : counters . bytes ,
494+ [ TraceAttr . CopilotSseChunksReceived ] : counters . chunks ,
495+ [ TraceAttr . CopilotSseEventsReceived ] : counters . events ,
496+ [ TraceAttr . CopilotSseEventsSession ] : counters . eventsByType . session ,
497+ [ TraceAttr . CopilotSseEventsText ] : counters . eventsByType . text ,
498+ [ TraceAttr . CopilotSseEventsTool ] : counters . eventsByType . tool ,
499+ [ TraceAttr . CopilotSseEventsSpan ] : counters . eventsByType . span ,
500+ [ TraceAttr . CopilotSseEventsResource ] : counters . eventsByType . resource ,
501+ [ TraceAttr . CopilotSseEventsRun ] : counters . eventsByType . run ,
502+ [ TraceAttr . CopilotSseEventsError ] : counters . eventsByType . error ,
503+ [ TraceAttr . CopilotSseEventsComplete ] : counters . eventsByType . complete ,
504+ [ TraceAttr . CopilotSseLongestIdleGapMs ] : Math . round ( counters . longestIdleGapMs ) ,
505+ [ TraceAttr . CopilotSseCloseReason ] : closeReason ,
506+ [ TraceAttr . CopilotSseTerminalEventSeen ] : counters . eventsByType . complete > 0 ,
507+ } ,
508+ } )
509+
510+ if ( counters . firstEventMs !== undefined ) {
511+ span . setAttribute ( TraceAttr . CopilotSseFirstEventMs , counters . firstEventMs )
512+ span . addEvent ( TraceEvent . CopilotSseFirstEvent , {
513+ [ TraceAttr . CopilotSseFirstEventMs ] : counters . firstEventMs ,
514+ } )
515+ }
516+ if ( counters . longestIdleGapMs >= opts . idleGapEventThresholdMs ) {
517+ span . addEvent ( TraceEvent . CopilotSseIdleGapExceeded , {
518+ [ TraceAttr . CopilotSseLongestIdleGapMs ] : Math . round ( counters . longestIdleGapMs ) ,
519+ } )
520+ }
521+ if ( counters . eventsByType . complete > 0 ) {
522+ span . addEvent ( TraceEvent . CopilotSseTerminalEventReceived )
523+ }
524+
525+ // Span status: only mark ERROR for real failures. User aborts and
526+ // clean terminals stay UNSET so dashboards filtering `status=error`
527+ // don't light up for normal cancellations.
528+ if (
529+ closeReason !== CopilotSseCloseReason . Terminal &&
530+ closeReason !== CopilotSseCloseReason . Aborted
531+ ) {
532+ span . setStatus ( {
533+ code : SpanStatusCode . ERROR ,
534+ message : `SSE read loop ended with reason: ${ closeReason } ` ,
535+ } )
536+ }
537+
538+ span . end ( nowWall )
539+ }
0 commit comments