Skip to content

Commit 4b1ddab

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: refactor OpenTelemetry (OTel) instrumentation within the ADK core, moving from manual span management to structured helper classes
### Key Changes * **Structured Instrumentation:** Replaces manual `Tracing.trace` calls and explicit `Scope` management with `Flowable.using` and `Maybe.using` patterns. It introduces helper classes like `AgentInvocation` and `ToolExecution` to encapsulate telemetry logic. * **Metrics Integration:** Adds support for tracking new metrics during agent execution: * `gen_ai.agent.invocation.duration` * `gen_ai.agent.request.size` / `gen_ai.agent.response.size` * `gen_ai.agent.workflow.steps` * **Reactive API Improvements:** Leverages `Tracing.withContext()` and `doOnNext`/`doOnError` hooks within `AgentInvocation` and `ToolExecution` to automatically capture events and errors without polluting the core logic. * **Trace Hierarchy Refinement:** Updates how spans are nested. For example, in `ContextPropagationTest`, the child agent span is now correctly parented to the specific LLM call span that triggered it, rather than the parent agent span. * **Testing:** Significantly enhances `BaseAgentTest` to verify metric collection using `InMemoryMetricReader`. Updates various tests to match the new span naming convention (e.g., removing brackets from tool execution spans). ### Impact This refactor simplifies the core agent and flow logic by removing boilerplate telemetry code, making the instrumentation more robust and easier to maintain while expanding the observability of the ADK through new metrics. PiperOrigin-RevId: 918116236
1 parent f1155ec commit 4b1ddab

9 files changed

Lines changed: 169 additions & 58 deletions

File tree

core/src/main/java/com/google/adk/agents/BaseAgent.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
2525
import com.google.adk.events.Event;
2626
import com.google.adk.plugins.Plugin;
27-
import com.google.adk.telemetry.Tracing;
27+
import com.google.adk.telemetry.Instrumentation;
28+
import com.google.adk.telemetry.Instrumentation.AgentInvocation;
2829
import com.google.adk.utils.AgentEnums.AgentOrigin;
2930
import com.google.common.collect.ImmutableList;
3031
import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -322,11 +323,13 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
322323
private Flowable<Event> run(
323324
InvocationContext parentContext,
324325
Function<InvocationContext, Flowable<Event>> runImplementation) {
325-
Context parentSpanContext = Context.current();
326-
return Flowable.defer(
327-
() -> {
328-
InvocationContext invocationContext = createInvocationContext(parentContext);
329-
326+
Context otelContext = Context.current();
327+
return Flowable.using(
328+
() ->
329+
Instrumentation.recordAgentInvocation(
330+
createInvocationContext(parentContext), this, otelContext),
331+
agentInvocation -> {
332+
InvocationContext invocationContext = agentInvocation.getCtx();
330333
Flowable<Event> mainAndAfterEvents =
331334
Flowable.defer(() -> runImplementation.apply(invocationContext))
332335
.concatWith(
@@ -350,14 +353,10 @@ private Flowable<Event> run(
350353
return Flowable.just(beforeEvent).concatWith(mainAndAfterEvents);
351354
})
352355
.switchIfEmpty(mainAndAfterEvents)
353-
.compose(
354-
Tracing.<Event>trace("invoke_agent " + name())
355-
.setParent(parentSpanContext)
356-
.configure(
357-
span ->
358-
Tracing.traceAgentInvocation(
359-
span, name(), description(), invocationContext)));
360-
});
356+
.doOnNext(agentInvocation::addEvent)
357+
.doOnError(agentInvocation::setError);
358+
},
359+
AgentInvocation::close);
361360
}
362361

363362
/**

core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -479,12 +479,10 @@ private Flowable<Event> runOneStep(Context spanContext, InvocationContext contex
479479
"Agent not found: " + agentToTransfer)));
480480
}
481481
return postProcessedEvents.concatWith(
482-
Flowable.defer(
483-
() -> {
484-
try (Scope s = spanContext.makeCurrent()) {
485-
return nextAgent.get().runAsync(context);
486-
}
487-
}));
482+
nextAgent
483+
.get()
484+
.runAsync(context)
485+
.compose(Tracing.withContext(spanContext)));
488486
}
489487
return postProcessedEvents;
490488
});
@@ -666,12 +664,10 @@ public void onError(Throwable e) {
666664
"Agent not found: " + event.actions().transferToAgent().get());
667665
}
668666
Flowable<Event> nextAgentEvents =
669-
Flowable.defer(
670-
() -> {
671-
try (Scope s = spanContext.makeCurrent()) {
672-
return nextAgent.get().runLive(invocationContext);
673-
}
674-
});
667+
nextAgent
668+
.get()
669+
.runLive(invocationContext)
670+
.compose(Tracing.withContext(spanContext));
675671
events = Flowable.concat(events, nextAgentEvents);
676672
}
677673
return events;

core/src/main/java/com/google/adk/flows/llmflows/Functions.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.google.adk.events.Event;
3030
import com.google.adk.events.EventActions;
3131
import com.google.adk.events.ToolConfirmation;
32+
import com.google.adk.telemetry.Instrumentation;
33+
import com.google.adk.telemetry.Instrumentation.ToolExecution;
3234
import com.google.adk.telemetry.Tracing;
3335
import com.google.adk.tools.BaseTool;
3436
import com.google.adk.tools.FunctionTool;
@@ -430,6 +432,25 @@ private static Maybe<Event> postProcessFunctionResult(
430432
ToolContext toolContext,
431433
boolean isLive,
432434
Context parentContext) {
435+
return Maybe.using(
436+
() ->
437+
Instrumentation.recordToolExecution(
438+
tool, invocationContext.agent(), functionArgs, parentContext),
439+
toolExecution ->
440+
processFunctionResult(
441+
maybeFunctionResult, invocationContext, tool, functionArgs, toolContext, isLive)
442+
.doOnSuccess(event -> toolExecution.context().setFunctionResponseEvent(event))
443+
.doOnError(toolExecution::setError),
444+
ToolExecution::close);
445+
}
446+
447+
private static Maybe<Event> processFunctionResult(
448+
Maybe<Map<String, Object>> maybeFunctionResult,
449+
InvocationContext invocationContext,
450+
BaseTool tool,
451+
Map<String, Object> functionArgs,
452+
ToolContext toolContext,
453+
boolean isLive) {
433454
return maybeFunctionResult
434455
.map(Optional::of)
435456
.defaultIfEmpty(Optional.empty())
@@ -467,20 +488,7 @@ private static Maybe<Event> postProcessFunctionResult(
467488
tool, finalFunctionResult, toolContext, invocationContext);
468489
return Maybe.just(event);
469490
});
470-
})
471-
.compose(
472-
Tracing.<Event>trace("execute_tool [" + tool.name() + "]")
473-
.setParent(parentContext)
474-
.onSuccess(
475-
(span, event) ->
476-
Tracing.traceToolExecution(
477-
span,
478-
tool.name(),
479-
tool.description(),
480-
tool.getClass().getSimpleName(),
481-
functionArgs,
482-
event,
483-
null)));
491+
});
484492
}
485493

486494
private static Optional<Event> mergeParallelFunctionResponseEvents(

core/src/main/java/com/google/adk/telemetry/Instrumentation.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,12 @@ public static final class AgentInvocation extends ClosableTelemetryScope {
125125
private final InvocationContext ctx;
126126
private final List<Event> events = Collections.synchronizedList(new ArrayList<>());
127127

128-
public AgentInvocation(InvocationContext ctx, BaseAgent agent) {
129-
super(Tracing.getTracer().spanBuilder("invoke_agent " + agent.name()).startSpan());
128+
public AgentInvocation(InvocationContext ctx, BaseAgent agent, Context parentContext) {
129+
super(
130+
Tracing.getTracer()
131+
.spanBuilder("invoke_agent " + agent.name())
132+
.setParent(parentContext)
133+
.startSpan());
130134
this.agent = agent;
131135
this.ctx = ctx;
132136
Tracing.traceAgentInvocation(span, agent.name(), agent.description(), ctx);
@@ -160,8 +164,13 @@ public static final class ToolExecution extends ClosableTelemetryScope {
160164
private final BaseAgent agent;
161165
private final Map<String, Object> functionArgs;
162166

163-
public ToolExecution(BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs) {
164-
super(Tracing.getTracer().spanBuilder("execute_tool " + tool.name()).startSpan());
167+
public ToolExecution(
168+
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
169+
super(
170+
Tracing.getTracer()
171+
.spanBuilder("execute_tool " + tool.name())
172+
.setParent(parentContext)
173+
.startSpan());
165174
this.tool = tool;
166175
this.agent = agent;
167176
this.functionArgs = functionArgs;
@@ -196,12 +205,22 @@ protected void handleMetricsError(RuntimeException e) {
196205

197206
/** Creates an AgentInvocation context to record agent invocation telemetry. */
198207
public static AgentInvocation recordAgentInvocation(InvocationContext ctx, BaseAgent agent) {
199-
return new AgentInvocation(ctx, agent);
208+
return recordAgentInvocation(ctx, agent, Context.current());
209+
}
210+
211+
public static AgentInvocation recordAgentInvocation(
212+
InvocationContext ctx, BaseAgent agent, Context parentContext) {
213+
return new AgentInvocation(ctx, agent, parentContext);
200214
}
201215

202216
/** Creates a ToolExecution context to record tool execution telemetry. */
203217
public static ToolExecution recordToolExecution(
204218
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs) {
205-
return new ToolExecution(tool, agent, functionArgs);
219+
return recordToolExecution(tool, agent, functionArgs, Context.current());
220+
}
221+
222+
public static ToolExecution recordToolExecution(
223+
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
224+
return new ToolExecution(tool, agent, functionArgs, parentContext);
206225
}
207226
}

core/src/test/java/com/google/adk/agents/BaseAgentTest.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,42 @@
2222
import com.google.adk.agents.Callbacks.AfterAgentCallback;
2323
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
2424
import com.google.adk.events.Event;
25+
import com.google.adk.telemetry.Metrics;
2526
import com.google.adk.testing.TestBaseAgent;
2627
import com.google.adk.testing.TestCallback;
2728
import com.google.adk.testing.TestUtils;
2829
import com.google.common.collect.ImmutableList;
2930
import com.google.genai.types.Content;
3031
import com.google.genai.types.Part;
32+
import io.opentelemetry.api.GlobalOpenTelemetry;
33+
import io.opentelemetry.api.common.AttributeKey;
34+
import io.opentelemetry.api.metrics.Meter;
35+
import io.opentelemetry.sdk.OpenTelemetrySdk;
36+
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
37+
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
38+
import io.opentelemetry.sdk.metrics.data.MetricData;
39+
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
40+
import io.opentelemetry.sdk.testing.time.TestClock;
41+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
3142
import io.reactivex.rxjava3.core.Completable;
3243
import io.reactivex.rxjava3.core.Maybe;
3344
import java.util.List;
3445
import java.util.concurrent.atomic.AtomicBoolean;
46+
import org.junit.After;
47+
import org.junit.Before;
3548
import org.junit.Test;
3649
import org.junit.runner.RunWith;
3750
import org.junit.runners.JUnit4;
3851

3952
@RunWith(JUnit4.class)
4053
public final class BaseAgentTest {
41-
4254
private static final String TEST_AGENT_NAME = "testAgent";
4355
private static final String TEST_AGENT_DESCRIPTION = "A test agent";
4456

57+
private InMemoryMetricReader inMemoryMetricReader;
58+
private TestClock testClock;
59+
private Meter originalMeter;
60+
4561
private static class ClosableTestAgent extends TestBaseAgent {
4662
final AtomicBoolean closed = new AtomicBoolean(false);
4763

@@ -56,6 +72,35 @@ public Completable close() {
5672
}
5773
}
5874

75+
@Before
76+
public void setUp() {
77+
GlobalOpenTelemetry.resetForTest();
78+
testClock = TestClock.create();
79+
inMemoryMetricReader = InMemoryMetricReader.create();
80+
SdkMeterProvider sdkMeterProvider =
81+
SdkMeterProvider.builder()
82+
.registerMetricReader(inMemoryMetricReader)
83+
.setClock(testClock)
84+
.build();
85+
86+
OpenTelemetrySdk openTelemetrySdk =
87+
OpenTelemetrySdk.builder()
88+
.setTracerProvider(SdkTracerProvider.builder().build())
89+
.setMeterProvider(sdkMeterProvider)
90+
.build();
91+
92+
GlobalOpenTelemetry.set(openTelemetrySdk);
93+
originalMeter = GlobalOpenTelemetry.getMeter("gcp.vertex.agent");
94+
Metrics.setMeterForTesting(openTelemetrySdk.getMeter("gcp.vertex.agent"));
95+
}
96+
97+
@After
98+
public void tearDown() {
99+
if (originalMeter != null) {
100+
Metrics.setMeterForTesting(originalMeter);
101+
}
102+
}
103+
59104
@Test
60105
public void constructor_setsNameAndDescription() {
61106
String name = "testName";
@@ -173,6 +218,36 @@ public void runAsync_noCallbacks_invokesRunAsyncImpl() {
173218
assertThat(results).hasSize(1);
174219
assertThat(results.get(0).content()).hasValue(runAsyncImplContent);
175220
assertThat(runAsyncImpl.wasCalled()).isTrue();
221+
MetricData durationMetric = findMetricByName("gen_ai.agent.invocation.duration");
222+
assertThat(durationMetric.getUnit()).isEqualTo("ms");
223+
HistogramPointData durationPoint =
224+
durationMetric.getHistogramData().getPoints().iterator().next();
225+
assertThat(durationPoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
226+
.isEqualTo("testAgent");
227+
228+
MetricData reqSizeMetric = findMetricByName("gen_ai.agent.request.size");
229+
assertThat(reqSizeMetric.getUnit()).isEqualTo("By");
230+
HistogramPointData reqSizePoint =
231+
reqSizeMetric.getHistogramData().getPoints().iterator().next();
232+
assertThat(reqSizePoint.getSum()).isEqualTo(12.0);
233+
assertThat(reqSizePoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
234+
.isEqualTo("testAgent");
235+
236+
MetricData respSizeMetric = findMetricByName("gen_ai.agent.response.size");
237+
assertThat(respSizeMetric.getUnit()).isEqualTo("By");
238+
HistogramPointData respSizePoint =
239+
respSizeMetric.getHistogramData().getPoints().iterator().next();
240+
assertThat(respSizePoint.getSum()).isEqualTo(11.0);
241+
assertThat(respSizePoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
242+
.isEqualTo("testAgent");
243+
244+
MetricData workflowStepsMetric = findMetricByName("gen_ai.agent.workflow.steps");
245+
assertThat(workflowStepsMetric.getUnit()).isEqualTo("1");
246+
HistogramPointData workflowStepsPoint =
247+
workflowStepsMetric.getHistogramData().getPoints().iterator().next();
248+
assertThat(workflowStepsPoint.getSum()).isEqualTo(1.0);
249+
assertThat(workflowStepsPoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
250+
.isEqualTo("testAgent");
176251
}
177252

178253
@Test
@@ -627,4 +702,11 @@ public void close_twoLevelsSubAgents_closesAllSubAgents() {
627702
assertThat(subAgent.closed.get()).isTrue();
628703
assertThat(subSubAgent.closed.get()).isTrue();
629704
}
705+
706+
private MetricData findMetricByName(String name) {
707+
return inMemoryMetricReader.collectAllMetrics().stream()
708+
.filter(m -> m.getName().equals(name))
709+
.findFirst()
710+
.orElseThrow(() -> new AssertionError("Metric not found: " + name));
711+
}
630712
}

core/src/test/java/com/google/adk/agents/LlmAgentTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ public void runAsync_withTools_createsToolSpans() throws InterruptedException {
494494
List<SpanData> spans = openTelemetryRule.getSpans();
495495
SpanData agentSpan = findSpanByName(spans, "invoke_agent test agent");
496496
List<SpanData> llmSpans = findSpansByName(spans, "call_llm");
497-
List<SpanData> toolSpans = findSpansByName(spans, "execute_tool [echo_tool]");
497+
List<SpanData> toolSpans = findSpansByName(spans, "execute_tool echo_tool");
498498

499499
assertThat(llmSpans).hasSize(2);
500500
assertThat(toolSpans).hasSize(1);

core/src/test/java/com/google/adk/runner/RunnerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,7 +1366,7 @@ public void runAsync_createsToolSpansWithCorrectParent() {
13661366
List<SpanData> spans = openTelemetryRule.getSpans();
13671367
List<SpanData> llmSpans = spans.stream().filter(s -> s.getName().equals("call_llm")).toList();
13681368
List<SpanData> toolSpans =
1369-
spans.stream().filter(s -> s.getName().equals("execute_tool [echo_tool]")).toList();
1369+
spans.stream().filter(s -> s.getName().equals("execute_tool echo_tool")).toList();
13701370

13711371
assertThat(llmSpans).hasSize(2);
13721372
assertThat(toolSpans).hasSize(1);
@@ -1401,7 +1401,7 @@ public void runLive_createsToolSpansWithCorrectParent() throws Exception {
14011401
List<SpanData> spans = openTelemetryRule.getSpans();
14021402
List<SpanData> llmSpans = spans.stream().filter(s -> s.getName().equals("call_llm")).toList();
14031403
List<SpanData> toolSpans =
1404-
spans.stream().filter(s -> s.getName().equals("execute_tool [echo_tool]")).toList();
1404+
spans.stream().filter(s -> s.getName().equals("execute_tool echo_tool")).toList();
14051405

14061406
// In runLive, there is one call_llm span for the execution
14071407
assertThat(llmSpans).hasSize(1);

0 commit comments

Comments
 (0)