Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tangle-network/agent-runtime",
"version": "0.33.0",
"version": "0.35.0",
"description": "Reusable runtime lifecycle for domain-specific agents.",
"homepage": "https://github.com/tangle-network/agent-runtime#readme",
"repository": {
Expand Down
64 changes: 60 additions & 4 deletions src/loops/run-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,33 @@ export async function runLoop<Task, Output, Decision>(
if (controller.signal.aborted) throwAbort()
const planned = await options.driver.plan(options.task, iterations)
const planDesc = options.driver.describePlan?.()
const roundIndex = round
const baseIndex = iterations.length
const remaining = maxIterations - iterations.length
const slice = planned.slice(0, remaining)
// Edge lineage: round 0 branches from root (undefined); later rounds branch
// from the best-valid (else latest) iteration so far — emitted, not guessed,
// so a viewer draws the actual topology instead of inferring it.
const parentIndex = roundIndex === 0 ? undefined : branchPoint(iterations)
const childIndices = slice.map((_, i) => baseIndex + i)
await emitTrace(options.ctx.traceEmitter, {
kind: 'loop.plan',
runId,
timestamp: now(),
payload: {
roundIndex: round,
roundIndex,
plannedCount: planned.length,
moveKind:
planDesc?.kind ??
(planned.length === 0 ? 'stop' : planned.length === 1 ? 'refine' : 'fanout'),
rationale: planDesc?.rationale,
parentIndex,
childIndices,
},
})
round += 1
if (planned.length === 0) break

const remaining = maxIterations - iterations.length
const slice = planned.slice(0, remaining)
const baseIndex = iterations.length
// Reserve slots up front so concurrent workers may mutate by index.
for (let i = 0; i < slice.length; i += 1) {
const spec = specs[(baseIndex + i) % specs.length]!
Expand Down Expand Up @@ -181,6 +189,8 @@ export async function runLoop<Task, Output, Decision>(
ctx: options.ctx,
runId,
now,
roundIndex,
parentIndex,
})

if (controller.signal.aborted) throwAbort()
Expand Down Expand Up @@ -242,6 +252,10 @@ interface RunBatchArgs<Task, Output> {
ctx: ExecCtx
runId: string
now: () => number
/** Plan round these iterations belong to — stamped as `groupId`. */
roundIndex: number
/** Iteration this round branched from — stamped as `parentIndex`. */
parentIndex?: number
}

async function runBatch<Task, Output>(args: RunBatchArgs<Task, Output>) {
Expand Down Expand Up @@ -279,6 +293,8 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
iterationIndex: args.item.index,
agentRunName: slot.agentRunName,
taskHash: hashJson(args.item.task),
groupId: args.roundIndex,
parentIndex: args.parentIndex,
},
})

Expand All @@ -296,6 +312,8 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
sandboxId: placement.sandboxId,
fleetId: placement.fleetId,
machineId: placement.machineId,
groupId: args.roundIndex,
parentIndex: args.parentIndex,
},
})
const message = spec.taskToPrompt(args.item.task)
Expand Down Expand Up @@ -337,11 +355,49 @@ async function executeIteration<Task, Output>(args: ExecuteIterationArgs<Task, O
durationMs: slot.endedAt - slot.startedAt,
tokenUsage:
slot.tokenUsage.input || slot.tokenUsage.output ? { ...slot.tokenUsage } : undefined,
groupId: args.roundIndex,
parentIndex: args.parentIndex,
outputPreview: slot.output !== undefined ? previewOutput(slot.output) : undefined,
},
})
}
}

/**
* Branch point for a new round — the iteration a later round descends from.
* Highest-valid-score iteration so far; ties + no-valid fall back to the latest
* index. Inferred (not driver-declared), so refine renders as a chain and
* fanout→refine chains off the fanout winner.
*/
function branchPoint<Task, Output>(
iterations: ReadonlyArray<Iteration<Task, Output>>,
): number | undefined {
if (iterations.length === 0) return undefined
let best = iterations.length - 1
let bestScore = -Infinity
for (const iter of iterations) {
if (iter.verdict?.valid !== true) continue
const score = iter.verdict.score ?? 0
if (score > bestScore) {
bestScore = score
best = iter.index
}
}
return best
}

/** Bounded string preview of a parsed output for a viewer's drawer — never the
* full payload. JSON when serializable, else `String()`, truncated to 280. */
function previewOutput(output: unknown): string {
let s: string
try {
s = typeof output === 'string' ? output : (JSON.stringify(output) ?? String(output))
} catch {
s = String(output)
}
return s.length > 280 ? `${s.slice(0, 280)}…` : s
}

function describePlacementSafe(
client: LoopSandboxClient,
box: SandboxInstance,
Expand Down
23 changes: 23 additions & 0 deletions src/loops/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,25 @@ export interface LoopPlanPayload {
moveKind: string
/** Driver rationale for the move, when available. */
rationale?: string
/**
* Iteration index this round branched FROM (the edge source). `undefined`
* for round 0 (root). Kernel-inferred branch point — the best-valid (else
* latest) iteration so far — unless a driver later declares it explicitly.
*/
parentIndex?: number
/** Iteration indices this round dispatched (the edge targets). */
childIndices: number[]
}

/** @experimental */
export interface LoopIterationStartedPayload {
iterationIndex: number
agentRunName: string
taskHash: string
/** Plan round (== `LoopPlanPayload.roundIndex`) this iteration belongs to. */
groupId?: number
/** Iteration this one was planned from; `undefined` ⇒ root. */
parentIndex?: number
}

/**
Expand All @@ -287,6 +299,10 @@ export interface LoopIterationDispatchPayload {
fleetId?: string
/** Set only when `placement === 'fleet'`. */
machineId?: string
/** Plan round this iteration belongs to. */
groupId?: number
/** Iteration this one was planned from; `undefined` ⇒ root. */
parentIndex?: number
}

/** @experimental */
Expand All @@ -301,6 +317,13 @@ export interface LoopIterationEndedPayload {
/** Summed LLM token usage for this iteration — maps to gen_ai.usage.* on the
* branch span. Omitted when no `llm_call` events carried token counts. */
tokenUsage?: LoopTokenUsage
/** Plan round this iteration belongs to. */
groupId?: number
/** Iteration this one was planned from; `undefined` ⇒ root. */
parentIndex?: number
/** Truncated string preview of the parsed output — for a viewer's drawer.
* Bounded to ~280 chars; never the full payload. */
outputPreview?: string
}

/** @experimental */
Expand Down
19 changes: 18 additions & 1 deletion src/otel-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ export function buildLoopOtelSpans(
if (win !== undefined) rootAttrs['tangle.loop.winner.iteration_index'] = win
const cost = num(ep.totalCostUsd)
if (cost !== undefined) rootAttrs['tangle.cost.usd'] = cost
const dur = num(ep.durationMs)
if (dur !== undefined) rootAttrs['tangle.loop.duration_ms'] = dur
const iters = num(ep.iterations)
if (iters !== undefined) rootAttrs['tangle.loop.iterations'] = iters
}
Expand All @@ -293,14 +295,21 @@ export function buildLoopOtelSpans(
case 'loop.plan': {
flushRound(e.timestamp)
const id = generateSpanId()
const roundIdx = num(p.roundIndex) ?? 0
const attrs: Record<string, string | number | boolean> = {
[GEN_AI.operation]: 'invoke_workflow',
'tangle.loop.round.index': num(p.roundIndex) ?? 0,
'tangle.loop.round.index': roundIdx,
'tangle.loop.move.kind': str(p.moveKind) ?? 'unknown',
'tangle.loop.move.round': roundIdx,
'tangle.loop.move.width': num(p.plannedCount) ?? 0,
}
const r = str(p.rationale)
if (r) attrs['tangle.loop.move.rationale'] = r
const parent = num(p.parentIndex)
if (parent !== undefined) attrs['tangle.loop.move.parent_index'] = parent
if (Array.isArray(p.childIndices) && p.childIndices.length > 0) {
attrs['tangle.loop.move.child_indices'] = p.childIndices.map(String).join(',')
}
pendingRound = { id, start: e.timestamp, attrs }
currentRoundId = id
break
Expand Down Expand Up @@ -347,6 +356,14 @@ export function buildLoopOtelSpans(
const score = num(verdict.score)
if (score !== undefined) attrs['tangle.loop.verdict.score'] = score
if (err) attrs['tangle.loop.error'] = err
const gid = num(p.groupId)
if (gid !== undefined) attrs['tangle.loop.iteration.group_id'] = gid
const par = num(p.parentIndex)
if (par !== undefined) attrs['tangle.loop.iteration.parent_index'] = par
const dur = num(p.durationMs)
if (dur !== undefined) attrs['tangle.loop.iteration.duration_ms'] = dur
const preview = str(p.outputPreview)
if (preview) attrs['tangle.loop.iteration.output_preview'] = preview
Object.assign(attrs, placementByIdx.get(idx) ?? {})
out.push(
make(
Expand Down
5 changes: 5 additions & 0 deletions tests/loops/dynamic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,16 @@ describe('runLoop dynamic driver — trace emission for topology viewers', () =>
expect(planPayloads.map((p) => p.moveKind)).toEqual(['refine', 'stop'])
expect(planPayloads[0]?.rationale).toBe('first pass, refine')
expect(planPayloads[1]?.rationale).toBe('valid result exists')
// edge lineage: round 0 dispatches iteration 0 from root (no parent)
expect(planPayloads[0]?.childIndices).toEqual([0])
expect(planPayloads[0]?.parentIndex).toBeUndefined()

const ended = all.find((e) => e.kind === 'loop.iteration.ended')
expect(ended?.kind).toBe('loop.iteration.ended')
if (ended?.kind === 'loop.iteration.ended') {
expect(ended.payload.tokenUsage).toEqual({ input: 800, output: 200 })
expect(ended.payload.groupId).toBe(0)
expect(typeof ended.payload.outputPreview).toBe('string')
}
})
})
118 changes: 118 additions & 0 deletions tests/otel-export.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,124 @@ describe('buildLoopOtelSpans — nested GenAI topology tree', () => {
it('returns [] for an empty event stream', () => {
expect(buildLoopOtelSpans([], 'trace-abc')).toEqual([])
})

it('emits edge lineage (move child_indices/parent_index + iteration group_id/parent_index/output_preview)', () => {
// fanout(0,1) at round 0 → refine(2) at round 1 branching off iteration 0.
const lineageEvents = [
{
kind: 'loop.started',
runId: 'r2',
timestamp: 0,
payload: { driver: 'dynamic', agentRunNames: ['a'] },
},
{
kind: 'loop.plan',
runId: 'r2',
timestamp: 1,
payload: { roundIndex: 0, plannedCount: 2, moveKind: 'fanout', childIndices: [0, 1] },
},
{
kind: 'loop.iteration.started',
runId: 'r2',
timestamp: 2,
payload: { iterationIndex: 0, agentRunName: 'a', groupId: 0 },
},
{
kind: 'loop.iteration.started',
runId: 'r2',
timestamp: 2,
payload: { iterationIndex: 1, agentRunName: 'a', groupId: 0 },
},
{
kind: 'loop.iteration.ended',
runId: 'r2',
timestamp: 10,
payload: {
iterationIndex: 0,
agentRunName: 'a',
costUsd: 0.01,
durationMs: 8,
groupId: 0,
verdict: { valid: true, score: 0.8 },
outputPreview: '{"answer":"alpha"}',
},
},
{
kind: 'loop.iteration.ended',
runId: 'r2',
timestamp: 11,
payload: { iterationIndex: 1, agentRunName: 'a', costUsd: 0.01, durationMs: 9, groupId: 0 },
},
{
kind: 'loop.decision',
runId: 'r2',
timestamp: 12,
payload: { decision: 'continue', historyLength: 2 },
},
{
kind: 'loop.plan',
runId: 'r2',
timestamp: 13,
payload: {
roundIndex: 1,
plannedCount: 1,
moveKind: 'refine',
parentIndex: 0,
childIndices: [2],
},
},
{
kind: 'loop.iteration.started',
runId: 'r2',
timestamp: 14,
payload: { iterationIndex: 2, agentRunName: 'a', groupId: 1, parentIndex: 0 },
},
{
kind: 'loop.iteration.ended',
runId: 'r2',
timestamp: 20,
payload: {
iterationIndex: 2,
agentRunName: 'a',
costUsd: 0.01,
durationMs: 6,
groupId: 1,
parentIndex: 0,
outputPreview: '{"answer":"beta"}',
},
},
{
kind: 'loop.decision',
runId: 'r2',
timestamp: 21,
payload: { decision: 'done', historyLength: 3 },
},
{
kind: 'loop.ended',
runId: 'r2',
timestamp: 25,
payload: { winnerIterationIndex: 2, totalCostUsd: 0.03, durationMs: 25, iterations: 3 },
},
]
const spans = buildLoopOtelSpans(lineageEvents, 'trace-xyz')
const moves = spans.filter((s) => s.name === 'loop.round').map(attrMap)
const fanout = moves.find((m) => m['tangle.loop.move.kind'] === 'fanout')!
expect(fanout['tangle.loop.move.child_indices']).toBe('0,1')
expect(fanout['tangle.loop.move.parent_index']).toBeUndefined()
const refine = moves.find((m) => m['tangle.loop.move.kind'] === 'refine')!
expect(refine['tangle.loop.move.parent_index']).toBe(0)
expect(refine['tangle.loop.move.child_indices']).toBe('2')

const iters = spans.filter((s) => s.name === 'loop.iteration').map(attrMap)
const iter2 = iters.find((i) => i['tangle.loop.iteration.index'] === 2)!
expect(iter2['tangle.loop.iteration.group_id']).toBe(1)
expect(iter2['tangle.loop.iteration.parent_index']).toBe(0)
expect(iter2['tangle.loop.iteration.duration_ms']).toBe(6)
expect(iter2['tangle.loop.iteration.output_preview']).toBe('{"answer":"beta"}')

const root = attrMap(spans.find((s) => s.name === 'loop')!)
expect(root['tangle.loop.duration_ms']).toBe(25)
})
})

describe('otel-export', () => {
Expand Down
Loading