Skip to content

Commit e490203

Browse files
authored
feat: Add agent-to-agent communication testing (#654)
Implements server-side agent-to-agent communication where an AgentExecutor can use a client to connect back to the same server, enabling agents to delegate work to other agents. Fixes #625 🦕
1 parent 6a22243 commit e490203

8 files changed

Lines changed: 449 additions & 2 deletions

File tree

reference/jsonrpc/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
<type>test-jar</type>
5252
<scope>test</scope>
5353
</dependency>
54+
<!-- JSON-RPC client transport for agent-to-agent communication tests -->
55+
<dependency>
56+
<groupId>${project.groupId}</groupId>
57+
<artifactId>a2a-java-sdk-client-transport-jsonrpc</artifactId>
58+
<scope>test</scope>
59+
</dependency>
5460
<dependency>
5561
<groupId>io.quarkus</groupId>
5662
<artifactId>quarkus-reactive-routes</artifactId>

reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.a2a.server.apps.quarkus;
22

3+
import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
34
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY;
45
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY;
56
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.TENANT_KEY;
@@ -62,6 +63,7 @@
6263
import io.a2a.spec.A2AError;
6364
import io.a2a.spec.InternalError;
6465
import io.a2a.spec.JSONParseError;
66+
import io.a2a.spec.TransportProtocol;
6567
import io.a2a.spec.UnsupportedOperationError;
6668
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
6769
import io.quarkus.security.Authenticated;
@@ -246,6 +248,7 @@ public String getUsername() {
246248
headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name)));
247249
state.put(HEADERS_KEY, headers);
248250
state.put(TENANT_KEY, extractTenant(rc));
251+
state.put(TRANSPORT_KEY, TransportProtocol.JSONRPC);
249252

250253
// Extract requested protocol version from X-A2A-Version header
251254
String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.a2a.server.rest.quarkus;
22

3+
import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
34
import static io.a2a.spec.A2AMethods.CANCEL_TASK_METHOD;
45
import static io.a2a.spec.A2AMethods.SEND_STREAMING_MESSAGE_METHOD;
56
import static io.a2a.transport.rest.context.RestContextKeys.HEADERS_KEY;
@@ -33,6 +34,7 @@
3334
import io.a2a.spec.InternalError;
3435
import io.a2a.spec.InvalidParamsError;
3536
import io.a2a.spec.MethodNotFoundError;
37+
import io.a2a.spec.TransportProtocol;
3638
import io.a2a.transport.rest.handler.RestHandler;
3739
import io.a2a.transport.rest.handler.RestHandler.HTTPRestResponse;
3840
import io.a2a.transport.rest.handler.RestHandler.HTTPRestStreamingResponse;
@@ -430,6 +432,7 @@ public String getUsername() {
430432
state.put(HEADERS_KEY, headers);
431433
state.put(METHOD_NAME_KEY, jsonRpcMethodName);
432434
state.put(TENANT_KEY, extractTenant(rc));
435+
state.put(TRANSPORT_KEY, TransportProtocol.HTTP_JSON);
433436

434437
// Extract requested protocol version from X-A2A-Version header
435438
String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);

server-common/src/main/java/io/a2a/server/ServerCallContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
import org.jspecify.annotations.Nullable;
1010

1111
public class ServerCallContext {
12+
/**
13+
* Key for transport protocol type in the state map.
14+
* Value should be a {@link io.a2a.spec.TransportProtocol} instance.
15+
*/
16+
public static final String TRANSPORT_KEY = "transport";
17+
1218
// TODO Not totally sure yet about these field types
1319
private final Map<Object, Object> modelConfig = new ConcurrentHashMap<>();
1420
private final Map<String, Object> state;

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.nio.charset.StandardCharsets;
2020
import java.util.List;
2121
import java.util.Optional;
22+
import java.util.UUID;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CopyOnWriteArrayList;
2425
import java.util.concurrent.CountDownLatch;
@@ -28,6 +29,7 @@
2829
import java.util.concurrent.atomic.AtomicReference;
2930
import java.util.function.BiConsumer;
3031
import java.util.function.Consumer;
32+
import java.util.stream.Collectors;
3133
import java.util.stream.Stream;
3234

3335
import jakarta.ws.rs.core.MediaType;
@@ -617,7 +619,6 @@ public void testGetExtendedAgentCard() throws A2AClientException {
617619
.filter(i -> getTransportProtocol().equals(i.protocolBinding()))
618620
.findFirst();
619621
assertTrue(transportInterface.isPresent());
620-
System.out.println("transportInterface = " + transportInterface);
621622
assertEquals(getTransportUrl(),transportInterface.get().url());
622623
assertEquals("1.0", agentCard.version());
623624
assertEquals("http://example.com/docs", agentCard.documentationUrl());
@@ -2451,4 +2452,142 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception {
24512452
}
24522453
}
24532454

2455+
/**
2456+
* Test agent-to-agent communication with delegation pattern.
2457+
* <p>
2458+
* Verifies that an AgentExecutor can use a client to delegate work to another agent
2459+
* by using the "delegate:" prefix. The delegated request is forwarded to another agent
2460+
* on the same server, and the artifacts from the delegated task are extracted and returned.
2461+
* <p>
2462+
* This test verifies:
2463+
* <ul>
2464+
* <li>Transport type is correctly passed via ServerCallContext state</li>
2465+
* <li>AgentExecutor can create a client with matching transport</li>
2466+
* <li>Delegation pattern ("delegate:" prefix) is recognized</li>
2467+
* <li>Client successfully communicates with same server</li>
2468+
* <li>Artifacts from delegated task are extracted and returned</li>
2469+
* <li>Original task ID is preserved (not replaced by delegated task ID)</li>
2470+
* </ul>
2471+
*/
2472+
@Test
2473+
public void testAgentToAgentDelegation() throws Exception {
2474+
String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID();
2475+
2476+
Message delegationMessage = Message.builder()
2477+
.taskId(delegationTaskId)
2478+
.contextId("agent-to-agent-context")
2479+
.role(Message.Role.USER)
2480+
.parts(new TextPart("delegate:What is 2+2?"))
2481+
.build();
2482+
2483+
CountDownLatch delegationLatch = new CountDownLatch(1);
2484+
AtomicReference<Task> delegationResultRef = new AtomicReference<>();
2485+
AtomicReference<Throwable> delegationErrorRef = new AtomicReference<>();
2486+
2487+
BiConsumer<ClientEvent, AgentCard> delegationConsumer =
2488+
AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch);
2489+
2490+
getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
2491+
delegationErrorRef.set(error);
2492+
delegationLatch.countDown();
2493+
});
2494+
2495+
assertTrue(delegationLatch.await(30, TimeUnit.SECONDS), "Delegation should complete within timeout");
2496+
2497+
Task delegationResult = delegationResultRef.get();
2498+
2499+
// Only fail on errors if we didn't get a successful result
2500+
// (errors can occur after completion due to stream cleanup)
2501+
if (delegationResult == null && delegationErrorRef.get() != null) {
2502+
fail("Delegation failed: " + delegationErrorRef.get().getMessage());
2503+
}
2504+
2505+
assertNotNull(delegationResult, "Delegation task should not be null");
2506+
assertEquals(TaskState.TASK_STATE_COMPLETED, delegationResult.status().state(),
2507+
"Delegation task should be completed");
2508+
assertNotNull(delegationResult.artifacts(), "Delegation should have artifacts");
2509+
assertFalse(delegationResult.artifacts().isEmpty(), "Delegation should have at least one artifact");
2510+
2511+
// Extract text from result
2512+
String delegatedText = extractTextFromTask(delegationResult);
2513+
assertTrue(delegatedText.contains("Handled locally:"),
2514+
"Delegated content should have been handled locally by target agent. Got: " + delegatedText);
2515+
2516+
// Verify the task ID is the original one (not the delegated task's ID)
2517+
assertEquals(delegationTaskId, delegationResult.id(),
2518+
"Task ID should be the original task ID, not the delegated task's ID");
2519+
}
2520+
2521+
/**
2522+
* Test agent-to-agent communication with local handling (no delegation).
2523+
* <p>
2524+
* Verifies that requests without the "delegate:" prefix are handled locally
2525+
* by the agent without creating a client connection.
2526+
* <p>
2527+
* This test verifies:
2528+
* <ul>
2529+
* <li>Requests without "delegate:" prefix are handled locally</li>
2530+
* <li>No client-to-client communication occurs for local handling</li>
2531+
* <li>Task completes successfully with expected content</li>
2532+
* </ul>
2533+
*/
2534+
@Test
2535+
public void testAgentToAgentLocalHandling() throws Exception {
2536+
String localTaskId = "agent-to-agent-test-" + UUID.randomUUID();
2537+
2538+
Message localMessage = Message.builder()
2539+
.taskId(localTaskId)
2540+
.contextId("agent-to-agent-context")
2541+
.role(Message.Role.USER)
2542+
.parts(new TextPart("Hello directly"))
2543+
.build();
2544+
2545+
CountDownLatch localLatch = new CountDownLatch(1);
2546+
AtomicReference<Task> localResultRef = new AtomicReference<>();
2547+
AtomicReference<Throwable> localErrorRef = new AtomicReference<>();
2548+
2549+
BiConsumer<ClientEvent, AgentCard> localConsumer =
2550+
AgentToAgentClientFactory.createTaskCaptureConsumer(localResultRef, localLatch);
2551+
2552+
getClient().sendMessage(localMessage, List.of(localConsumer), error -> {
2553+
localErrorRef.set(error);
2554+
localLatch.countDown();
2555+
});
2556+
2557+
assertTrue(localLatch.await(30, TimeUnit.SECONDS), "Local handling should complete within timeout");
2558+
2559+
Task localResult = localResultRef.get();
2560+
2561+
// Only fail on errors if we didn't get a successful result
2562+
// (errors can occur after completion due to stream cleanup)
2563+
if (localResult == null && localErrorRef.get() != null) {
2564+
fail("Local handling failed: " + localErrorRef.get().getMessage());
2565+
}
2566+
2567+
assertNotNull(localResult, "Local task should not be null");
2568+
assertEquals(TaskState.TASK_STATE_COMPLETED, localResult.status().state(),
2569+
"Local task should be completed");
2570+
2571+
String localText = extractTextFromTask(localResult);
2572+
assertTrue(localText.contains("Handled locally: Hello directly"),
2573+
"Should be handled locally without delegation. Got: " + localText);
2574+
}
2575+
2576+
/**
2577+
* Extracts all text from a task's artifacts.
2578+
*
2579+
* @param task the task containing artifacts
2580+
* @return concatenated text from all TextParts in all artifacts
2581+
*/
2582+
private String extractTextFromTask(Task task) {
2583+
if (task.artifacts() == null || task.artifacts().isEmpty()) {
2584+
return "";
2585+
}
2586+
return task.artifacts().stream()
2587+
.flatMap(artifact -> artifact.parts().stream())
2588+
.filter(part -> part instanceof TextPart)
2589+
.map(part -> ((TextPart) part).text())
2590+
.collect(Collectors.joining("\n"));
2591+
}
2592+
24542593
}

0 commit comments

Comments
 (0)