Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export type {
} from './otel-export'
// ── OTEL export + trace propagation + eval-run provenance ────────────
export {
buildLoopOtelSpans,
createOtelExporter,
exportEvalRuns,
INTELLIGENCE_WIRE_VERSION,
Expand Down
9 changes: 9 additions & 0 deletions src/loops/drivers/dynamic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ export function createDynamicDriver<Task, Output>(
// plan() — and thus the planner — runs again next round.
return pending?.kind === 'stop' ? 'done' : 'continue'
},
describePlan() {
// Surface the move the planner just chose (kind + rationale) so the
// kernel's loop.plan trace event carries the agent's intent, not just the
// inferred fan-width. `pending` is the move set by the preceding plan().
if (!pending) return undefined
return pending.rationale !== undefined
? { kind: pending.kind, rationale: pending.rationale }
: { kind: pending.kind }
},
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/loops/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ export type {
TopologyMoveEnvelope,
} from './drivers/sandbox-planner'
export { createSandboxPlanner } from './drivers/sandbox-planner'
export type { RunLoopOptions } from './run-loop'
export { runLoop } from './run-loop'
export { reportLoopUsage, type UsageSink } from './report-usage'
export {
loopCampaignDispatch,
loopDispatch,
type LoopDispatchOptions,
type LoopOptionsForDispatch,
loopCampaignDispatch,
loopDispatch,
} from './loop-dispatch'
export { reportLoopUsage, type UsageSink } from './report-usage'
export type { RunLoopOptions } from './run-loop'
export { runLoop } from './run-loop'
export type {
AgentRunSpec,
DefaultVerdict,
Expand All @@ -58,6 +58,8 @@ export type {
LoopIterationDispatchPayload,
LoopIterationEndedPayload,
LoopIterationStartedPayload,
LoopPlanDescription,
LoopPlanPayload,
LoopResult,
LoopSandboxClient,
LoopSandboxPlacement,
Expand Down
18 changes: 18 additions & 0 deletions src/loops/run-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export async function runLoop<Task, Output, Decision>(
const loopStart = now()
const driverName = options.driver.name ?? 'driver'
const iterations: Iteration<Task, Output>[] = []
let round = 0

await emitTrace(options.ctx.traceEmitter, {
kind: 'loop.started',
Expand All @@ -133,6 +134,21 @@ export async function runLoop<Task, Output, Decision>(
while (iterations.length < maxIterations) {
if (controller.signal.aborted) throwAbort()
const planned = await options.driver.plan(options.task, iterations)
const planDesc = options.driver.describePlan?.()
await emitTrace(options.ctx.traceEmitter, {
kind: 'loop.plan',
runId,
timestamp: now(),
payload: {
roundIndex: round,
plannedCount: planned.length,
moveKind:
planDesc?.kind ??
(planned.length === 0 ? 'stop' : planned.length === 1 ? 'refine' : 'fanout'),
rationale: planDesc?.rationale,
},
})
round += 1
if (planned.length === 0) break

const remaining = maxIterations - iterations.length
Expand Down Expand Up @@ -319,6 +335,8 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
error: slot.error?.message,
costUsd: slot.costUsd,
durationMs: slot.endedAt - slot.startedAt,
tokenUsage:
slot.tokenUsage.input || slot.tokenUsage.output ? { ...slot.tokenUsage } : undefined,
},
})
}
Expand Down
41 changes: 41 additions & 0 deletions src/loops/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ export interface Driver<Task, Output, Decision> {
* is hit, or when the abort signal fires.
*/
decide(history: ReadonlyArray<Iteration<Task, Output>>): Decision | Promise<Decision>
/**
* Optional: describe the move `plan()` just produced, for trace emission.
* The kernel calls this immediately after `plan()` and emits the result in
* the `loop.plan` event so a topology viewer can render the agent's chosen
* move + rationale (not just the inferred fan-width). Drivers whose topology
* is a pure function of count (refine/fanout-vote) omit it — the kernel
* infers `moveKind` from the planned-task count. Agent-authored drivers
* (`createDynamicDriver`) return their chosen move's kind + rationale.
*/
describePlan?(): LoopPlanDescription | undefined
}

/** @experimental Driver-supplied description of the just-planned move. */
export interface LoopPlanDescription {
/** Topology move this round — e.g. `'refine' | 'fanout' | 'verify' | 'stop'`. */
kind: string
/** Why the driver chose this move (the agent's rationale), when available. */
rationale?: string
}

/** @experimental */
Expand Down Expand Up @@ -195,6 +213,7 @@ export interface LoopTraceEmitter {
/** @experimental */
export type LoopTraceEvent =
| { kind: 'loop.started'; runId: string; timestamp: number; payload: LoopStartedPayload }
| { kind: 'loop.plan'; runId: string; timestamp: number; payload: LoopPlanPayload }
| {
kind: 'loop.iteration.started'
runId: string
Expand Down Expand Up @@ -224,6 +243,25 @@ export interface LoopStartedPayload {
maxConcurrency: number
}

/**
* Emitted once per `plan()` round, immediately after the driver plans. Carries
* the topology move so a viewer renders WHAT the agent decided + WHY, not just
* the inferred fan-width. `moveKind` is the driver's `describePlan().kind` when
* provided, else inferred from `plannedCount` (0→stop, 1→refine, N→fanout).
*
* @experimental
*/
export interface LoopPlanPayload {
/** 0-based plan round (one per `plan()` call). */
roundIndex: number
/** Tasks the driver issued this round. */
plannedCount: number
/** Topology move — `'refine' | 'fanout' | 'verify' | 'stop'` etc. */
moveKind: string
/** Driver rationale for the move, when available. */
rationale?: string
}

/** @experimental */
export interface LoopIterationStartedPayload {
iterationIndex: number
Expand Down Expand Up @@ -260,6 +298,9 @@ export interface LoopIterationEndedPayload {
error?: string
costUsd: number
durationMs: number
/** Summed LLM token usage for this iteration — maps to gen_ai.usage.* on the
* branch span. Omitted when no `llm_call` events carried token counts. */
tokenUsage?: LoopTokenUsage
}

/** @experimental */
Expand Down
21 changes: 18 additions & 3 deletions src/mcp/trace-propagation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import type { LoopTraceEmitter, LoopTraceEvent } from '../loops/types'
import type { OtelExporter } from '../otel-export'
import { createOtelExporter, loopEventToOtelSpan } from '../otel-export'
import { buildLoopOtelSpans, createOtelExporter } from '../otel-export'

export interface TraceContext {
/** Trace id inherited from the parent process, or a fresh one. */
Expand Down Expand Up @@ -52,11 +52,26 @@ export function createPropagatingTraceEmitter(ctx: TraceContext): {
} {
const exporter = createOtelExporter()

// Buffer events per loop run, then emit the full nested span tree on
// `loop.ended` so the topology hierarchy (loop → round → branch) reaches the
// OTLP collector — not a flat list of zero-duration point spans. A run that
// never reaches `loop.ended` (hard abort) drops its buffer; acceptable for
// the short-lived MCP subprocess.
const buffers = new Map<string, LoopTraceEvent[]>()

const emitter: LoopTraceEmitter = {
emit(event: LoopTraceEvent) {
if (!exporter) return
const span = loopEventToOtelSpan(event, ctx.traceId, ctx.parentSpanId)
exporter.exportSpan(span)
const buf = buffers.get(event.runId)
if (buf) buf.push(event)
else buffers.set(event.runId, [event])
if (event.kind === 'loop.ended') {
const events = buffers.get(event.runId) ?? [event]
buffers.delete(event.runId)
for (const span of buildLoopOtelSpans(events, ctx.traceId, ctx.parentSpanId)) {
exporter.exportSpan(span)
}
}
},
}

Expand Down
Loading
Loading