Skip to content

Commit 5e13341

Browse files
authored
feat: propagate OTel context in async ops and fix TransportProtocol config injection (a2aproject#707)
- Replace direct TransportProtocol enum injection in AgentCardProducer with String injection + TransportProtocol.fromString() to avoid SmallRye Config's HyphenateEnumConverter failing on "HTTP+JSON" values passed via -D flags - Add AsyncManagedExecutorProducer to the OpenTelemetry extras module, using MicroProfile ManagedExecutor to propagate OTel trace context across async boundaries (priority 20, overrides default AsyncExecutorProducer) - Add microprofile-context-propagation-api dependency to opentelemetry/server Fixes a2aproject#698 🦕 Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent fad1a15 commit 5e13341

6 files changed

Lines changed: 470 additions & 1 deletion

File tree

examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
public class AgentCardProducer {
1818

1919
@ConfigProperty(name = "quarkus.agentcard.protocol", defaultValue = "JSONRPC")
20-
TransportProtocol protocol;
20+
String protocolStr;
2121

2222
@Produces
2323
@PublicAgentCard
@@ -48,6 +48,7 @@ public AgentCard agentCard() {
4848
}
4949

5050
private AgentInterface getAgentInterface() {
51+
TransportProtocol protocol = TransportProtocol.fromString(protocolStr);
5152
String url = switch (protocol) {
5253
case GRPC -> "localhost:9000";
5354
case JSONRPC, HTTP_JSON -> "http://localhost:9999";

extras/opentelemetry/README.md

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# OpenTelemetry Integration for A2A
2+
3+
This module provides OpenTelemetry observability integration for A2A servers, including distributed tracing, metrics, and context propagation across asynchronous boundaries.
4+
5+
## Features
6+
7+
- **Distributed Tracing**: Automatic span creation for all A2A protocol methods
8+
- **Context Propagation**: OpenTelemetry trace context propagation across async operations
9+
- **Request/Response Logging**: Optional extraction of request and response data into spans
10+
- **Error Tracking**: Automatic error status and error type attributes on failures
11+
12+
## Modules
13+
14+
### `opentelemetry-common`
15+
Common utilities and constants shared across OpenTelemetry modules.
16+
17+
### `opentelemetry-client`
18+
OpenTelemetry integration for A2A clients.
19+
20+
### `opentelemetry-client-propagation`
21+
Context propagation support for A2A clients.
22+
23+
### `opentelemetry-server`
24+
OpenTelemetry integration for A2A servers, including the context-aware executor.
25+
26+
### `opentelemetry-integration-tests`
27+
Integration tests for OpenTelemetry functionality.
28+
29+
## Usage
30+
31+
### Basic Setup
32+
33+
Add the OpenTelemetry server module to your dependencies:
34+
35+
```xml
36+
<dependency>
37+
<groupId>io.a2a</groupId>
38+
<artifactId>a2a-extras-opentelemetry-server</artifactId>
39+
<version>${a2a.version}</version>
40+
</dependency>
41+
```
42+
43+
### Context-Aware Async Executor
44+
45+
The `AsyncManagedExecutorProducer` provides a `ManagedExecutor` that automatically propagates OpenTelemetry trace context across asynchronous boundaries. This ensures that spans created in async tasks are properly linked to their parent spans.
46+
47+
#### How It Works
48+
49+
When the OpenTelemetry server module is included, the `AsyncManagedExecutorProducer` automatically replaces the default `AsyncExecutorProducer` using CDI alternatives:
50+
51+
- **Priority 20**: Takes precedence over the default executor producer (priority 10)
52+
- **Automatic Activation**: No configuration needed - just include the module
53+
- **Context Propagation**: Uses MicroProfile Context Propagation to maintain trace context
54+
55+
#### Configuration
56+
57+
The `ManagedExecutor` is container-managed and configured through your runtime environment:
58+
59+
**Quarkus:**
60+
```properties
61+
# Configure the managed executor pool
62+
quarkus.thread-pool.core-threads=10
63+
quarkus.thread-pool.max-threads=50
64+
quarkus.thread-pool.queue-size=100
65+
```
66+
67+
**Other Runtimes:**
68+
Consult your MicroProfile Context Propagation implementation documentation for configuration options.
69+
70+
> **Note**: Unlike the default `AsyncExecutorProducer`, the `AsyncManagedExecutorProducer` does not use the `a2a.executor.*` configuration properties. Pool sizing is controlled by the container's ManagedExecutor configuration.
71+
72+
#### Example
73+
74+
```java
75+
@ApplicationScoped
76+
public class MyAgent implements Agent {
77+
78+
@Inject
79+
@Internal
80+
Executor executor; // Automatically uses ManagedExecutor with context propagation
81+
82+
@Override
83+
public void execute(RequestContext context, AgentEmitter emitter) {
84+
// Current span context is automatically propagated
85+
executor.execute(() -> {
86+
// This code runs in a different thread but maintains the trace context
87+
Span currentSpan = Span.current();
88+
currentSpan.addEvent("Processing in async task");
89+
90+
// Do async work...
91+
});
92+
}
93+
}
94+
```
95+
96+
### Request/Response Extraction
97+
98+
Enable request and response data extraction in spans:
99+
100+
```properties
101+
# Extract request parameters into span attributes
102+
a2a.opentelemetry.extract-request=true
103+
104+
# Extract response data into span attributes
105+
a2a.opentelemetry.extract-response=true
106+
```
107+
108+
> **Warning**: Extracting request/response data may expose sensitive information in traces. Use with caution in production environments.
109+
110+
### Span Attributes
111+
112+
The following attributes are automatically added to spans:
113+
114+
- `genai.request`: Request parameters (if extraction enabled)
115+
- `genai.response`: Response data (if extraction enabled)
116+
- `error.type`: Error message (on failures)
117+
118+
## Architecture
119+
120+
### Request Handler Decoration
121+
122+
The `OpenTelemetryRequestHandlerDecorator` wraps the default request handler and creates spans for each A2A protocol method:
123+
124+
```
125+
Client Request
126+
127+
OpenTelemetryRequestHandlerDecorator
128+
↓ (creates span)
129+
Default RequestHandler
130+
131+
Agent Execution (with context propagation)
132+
133+
Response
134+
```
135+
136+
### Context Propagation Flow
137+
138+
```
139+
HTTP Request (with trace headers)
140+
141+
OpenTelemetry extracts context
142+
143+
Span created for A2A method
144+
145+
ManagedExecutor propagates context
146+
147+
Async agent execution (maintains trace context)
148+
149+
Response (with trace headers)
150+
```
151+
152+
## Testing
153+
154+
The module includes comprehensive unit tests:
155+
156+
- `AsyncManagedExecutorProducerTest`: Tests for the context-aware executor producer
157+
- `OpenTelemetryRequestHandlerDecoratorTest`: Tests for span creation and error handling
158+
159+
Run tests:
160+
```bash
161+
mvn test -pl extras/opentelemetry/server
162+
```
163+
164+
## Troubleshooting
165+
166+
### Context Not Propagating
167+
168+
**Symptom**: Spans in async tasks are not linked to parent spans.
169+
170+
**Solution**: Ensure the OpenTelemetry server module is included and the `ManagedExecutor` is being injected correctly. Check logs for:
171+
```
172+
Initializing OpenTelemetry-aware ManagedExecutor for async operations
173+
```
174+
175+
### ManagedExecutor Not Available
176+
177+
**Symptom**: `IllegalStateException: ManagedExecutor not injected - ensure MicroProfile Context Propagation is available`
178+
179+
**Solution**: Ensure your runtime provides MicroProfile Context Propagation support. For Quarkus, add:
180+
```xml
181+
<dependency>
182+
<groupId>io.quarkus</groupId>
183+
<artifactId>quarkus-smallrye-context-propagation</artifactId>
184+
</dependency>
185+
```
186+
187+
### Performance Impact
188+
189+
**Symptom**: Increased latency with OpenTelemetry enabled.
190+
191+
**Solution**:
192+
- Disable request/response extraction in production
193+
- Configure sampling rate to reduce trace volume
194+
- Ensure your OpenTelemetry collector is properly sized
195+
196+
## Best Practices
197+
198+
1. **Sampling**: Configure appropriate sampling rates for production environments
199+
2. **Sensitive Data**: Disable request/response extraction if handling sensitive data
200+
3. **Resource Attributes**: Add service name and version as resource attributes
201+
4. **Collector Configuration**: Use batch processors to reduce network overhead
202+
5. **Monitoring**: Monitor the OpenTelemetry collector's health and performance
203+
204+
## Dependencies
205+
206+
- MicroProfile Telemetry 2.0.1+
207+
- MicroProfile Context Propagation 1.3+
208+
- OpenTelemetry API
209+
- A2A Server Common
210+
211+
## See Also
212+
213+
- [OpenTelemetry Documentation](https://opentelemetry.io/docs/)
214+
- [MicroProfile Telemetry Specification](https://github.com/eclipse/microprofile-telemetry)
215+
- [MicroProfile Context Propagation](https://github.com/eclipse/microprofile-context-propagation)

extras/opentelemetry/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
<properties>
2121
<version.org.eclipse.microprofile.telemetry>2.0.1</version.org.eclipse.microprofile.telemetry>
22+
<version.org.eclipse.microprofile.context-propagation>1.3</version.org.eclipse.microprofile.context-propagation>
2223
</properties>
2324
<dependencies>
2425
<dependency>

extras/opentelemetry/server/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
<type>pom</type>
4040
<scope>provided</scope>
4141
</dependency>
42+
<dependency>
43+
<groupId>org.eclipse.microprofile.context-propagation</groupId>
44+
<artifactId>microprofile-context-propagation-api</artifactId>
45+
<version>${version.org.eclipse.microprofile.context-propagation}</version>
46+
<scope>provided</scope>
47+
</dependency>
4248
<dependency>
4349
<groupId>jakarta.enterprise</groupId>
4450
<artifactId>jakarta.enterprise.cdi-api</artifactId>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.a2a.extras.opentelemetry;
2+
3+
import io.a2a.server.util.async.Internal;
4+
import jakarta.annotation.PostConstruct;
5+
import jakarta.annotation.Priority;
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
import jakarta.enterprise.inject.Alternative;
8+
import jakarta.enterprise.inject.Produces;
9+
import jakarta.inject.Inject;
10+
import java.util.concurrent.Executor;
11+
import org.eclipse.microprofile.context.ManagedExecutor;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
/**
16+
* Alternative executor producer that provides a ManagedExecutor with OpenTelemetry context propagation.
17+
* <p>
18+
* This producer replaces the default {@code AsyncExecutorProducer} when the OpenTelemetry extras module
19+
* is included in the application. The ManagedExecutor ensures that OpenTelemetry trace context is
20+
* properly propagated across asynchronous boundaries.
21+
* <p>
22+
* Priority 20 ensures this alternative takes precedence over the default producer (priority 10).
23+
*
24+
* <h2>Configuration</h2>
25+
* The ManagedExecutor is container-managed and injected via CDI. Its configuration depends on the
26+
* runtime environment:
27+
* <ul>
28+
* <li><b>Quarkus:</b> Configure via {@code quarkus.thread-pool.*} properties</li>
29+
* <li><b>Other runtimes:</b> Consult your MicroProfile Context Propagation implementation documentation</li>
30+
* </ul>
31+
* <p>
32+
* Unlike the default {@code AsyncExecutorProducer}, this producer does not use the {@code a2a.executor.*}
33+
* configuration properties. The executor pool sizing and behavior are controlled by the container's
34+
* ManagedExecutor configuration.
35+
*
36+
* @see org.eclipse.microprofile.context.ManagedExecutor
37+
*/
38+
@ApplicationScoped
39+
@Alternative
40+
@Priority(20)
41+
public class AsyncManagedExecutorProducer {
42+
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncManagedExecutorProducer.class);
43+
44+
@Inject
45+
ManagedExecutor managedExecutor;
46+
47+
@PostConstruct
48+
public void init() {
49+
LOGGER.info("Initializing OpenTelemetry-aware ManagedExecutor for async operations");
50+
if (managedExecutor == null) {
51+
LOGGER.warn("ManagedExecutor not available - context propagation may not work correctly");
52+
}
53+
}
54+
55+
@Produces
56+
@Internal
57+
public Executor produce() {
58+
LOGGER.debug("Using ManagedExecutor for async operations with OpenTelemetry context propagation");
59+
if (managedExecutor == null) {
60+
throw new IllegalStateException("ManagedExecutor not injected - ensure MicroProfile Context Propagation is available");
61+
}
62+
return managedExecutor;
63+
}
64+
65+
}

0 commit comments

Comments
 (0)