Skip to content

Commit 620d91a

Browse files
committed
fix: Move JSONRPC specific test cases to QuarkusA2AJSONRPCTest
1 parent e6c125e commit 620d91a

2 files changed

Lines changed: 123 additions & 126 deletions

File tree

reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/QuarkusA2AJSONRPCTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,45 @@
22

33
import static io.restassured.RestAssured.given;
44
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNull;
56
import static org.junit.jupiter.api.Assertions.fail;
67
import static org.wildfly.common.Assert.assertNotNull;
78
import jakarta.ws.rs.core.MediaType;
89

10+
import java.net.URI;
11+
import java.net.http.HttpClient;
12+
import java.net.http.HttpRequest;
13+
import java.net.http.HttpResponse;
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
import java.util.stream.Stream;
19+
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
922
import io.a2a.server.apps.common.AbstractA2AServerTest;
1023
import io.a2a.spec.A2AClientException;
1124
import io.a2a.spec.InvalidParamsError;
1225
import io.a2a.spec.InvalidRequestError;
1326
import io.a2a.spec.JSONParseError;
1427
import io.a2a.spec.JSONRPCErrorResponse;
28+
import io.a2a.spec.Message;
29+
import io.a2a.spec.MessageSendParams;
1530
import io.a2a.spec.MethodNotFoundError;
31+
import io.a2a.spec.Part;
32+
import io.a2a.spec.SendStreamingMessageRequest;
33+
import io.a2a.spec.SendStreamingMessageResponse;
34+
import io.a2a.spec.StreamingJSONRPCRequest;
1635
import io.a2a.spec.Task;
1736
import io.a2a.spec.TaskQueryParams;
1837
import io.a2a.spec.TaskState;
38+
import io.a2a.spec.TextPart;
1939
import io.a2a.spec.TransportProtocol;
40+
import io.a2a.util.Utils;
2041
import io.quarkus.test.junit.QuarkusTest;
2142

43+
import org.junit.jupiter.api.Assertions;
2244
import org.junit.jupiter.api.Test;
2345

2446
@QuarkusTest
@@ -177,4 +199,102 @@ private void testGetTask(String mediaType) throws Exception {
177199
}
178200
}
179201

202+
@Test
203+
public void testStreamingMethodWithAcceptHeader() throws Exception {
204+
testSendStreamingMessage(MediaType.SERVER_SENT_EVENTS);
205+
}
206+
207+
@Test
208+
public void testSendMessageStreamNewMessageSuccess() throws Exception {
209+
testSendStreamingMessage(null);
210+
}
211+
212+
private void testSendStreamingMessage(String mediaType) throws Exception {
213+
Message message = new Message.Builder(MESSAGE)
214+
.taskId(MINIMAL_TASK.getId())
215+
.contextId(MINIMAL_TASK.getContextId())
216+
.build();
217+
SendStreamingMessageRequest request = new SendStreamingMessageRequest(
218+
"1", new MessageSendParams(message, null, null));
219+
220+
CompletableFuture<HttpResponse<Stream<String>>> responseFuture = initialiseStreamingRequest(request, mediaType);
221+
222+
CountDownLatch latch = new CountDownLatch(1);
223+
AtomicReference<Throwable> errorRef = new AtomicReference<>();
224+
225+
responseFuture.thenAccept(response -> {
226+
if (response.statusCode() != 200) {
227+
//errorRef.set(new IllegalStateException("Status code was " + response.statusCode()));
228+
throw new IllegalStateException("Status code was " + response.statusCode());
229+
}
230+
response.body().forEach(line -> {
231+
try {
232+
SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line);
233+
if (jsonResponse != null) {
234+
assertNull(jsonResponse.getError());
235+
Message messageResponse = (Message) jsonResponse.getResult();
236+
assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId());
237+
assertEquals(MESSAGE.getRole(), messageResponse.getRole());
238+
Part<?> part = messageResponse.getParts().get(0);
239+
assertEquals(Part.Kind.TEXT, part.getKind());
240+
assertEquals("test message", ((TextPart) part).getText());
241+
latch.countDown();
242+
}
243+
} catch (JsonProcessingException e) {
244+
throw new RuntimeException(e);
245+
}
246+
});
247+
}).exceptionally(t -> {
248+
if (!isStreamClosedError(t)) {
249+
errorRef.set(t);
250+
}
251+
latch.countDown();
252+
return null;
253+
});
254+
255+
256+
boolean dataRead = latch.await(20, TimeUnit.SECONDS);
257+
Assertions.assertTrue(dataRead);
258+
Assertions.assertNull(errorRef.get());
259+
260+
}
261+
262+
private CompletableFuture<HttpResponse<Stream<String>>> initialiseStreamingRequest(
263+
StreamingJSONRPCRequest<?> request, String mediaType) throws Exception {
264+
265+
// Create the client
266+
HttpClient client = HttpClient.newBuilder()
267+
.version(HttpClient.Version.HTTP_2)
268+
.build();
269+
270+
// Create the request
271+
HttpRequest.Builder builder = HttpRequest.newBuilder()
272+
.uri(URI.create("http://localhost:" + serverPort + "/"))
273+
.POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(request)))
274+
.header("Content-Type", APPLICATION_JSON);
275+
if (mediaType != null) {
276+
builder.header("Accept", mediaType);
277+
}
278+
HttpRequest httpRequest = builder.build();
279+
280+
281+
// Send request async and return the CompletableFuture
282+
return client.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofLines());
283+
}
284+
285+
private SendStreamingMessageResponse extractJsonResponseFromSseLine(String line) throws JsonProcessingException {
286+
line = extractSseData(line);
287+
if (line != null) {
288+
return Utils.OBJECT_MAPPER.readValue(line, SendStreamingMessageResponse.class);
289+
}
290+
return null;
291+
}
292+
293+
private static String extractSseData(String line) {
294+
if (line.startsWith("data:")) {
295+
line = line.substring(5).trim();
296+
return line;
297+
}
298+
return null;
299+
}
180300
}

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 3 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -23,44 +23,28 @@
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicInteger;
2525
import java.util.concurrent.atomic.AtomicReference;
26-
import java.util.stream.Stream;
2726
import java.util.function.BiConsumer;
2827
import java.util.function.Consumer;
2928

30-
import jakarta.ws.rs.core.MediaType;
31-
32-
import com.fasterxml.jackson.core.JsonProcessingException;
33-
3429
import io.a2a.client.Client;
3530
import io.a2a.client.ClientEvent;
3631
import io.a2a.client.ClientFactory;
3732
import io.a2a.client.MessageEvent;
3833
import io.a2a.client.TaskUpdateEvent;
3934
import io.a2a.client.config.ClientConfig;
4035
import io.a2a.client.config.ClientTransportConfig;
41-
import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfig;
42-
import io.a2a.client.transport.grpc.GrpcTransportConfig;
43-
import io.a2a.client.http.JdkA2AHttpClient;
44-
import io.grpc.ManagedChannelBuilder;
4536
import io.a2a.spec.A2AClientException;
4637
import io.a2a.spec.AgentCard;
4738
import io.a2a.spec.AgentCapabilities;
4839
import io.a2a.spec.AgentInterface;
49-
import io.a2a.spec.TransportProtocol;
5040
import io.a2a.spec.Artifact;
5141
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
5242
import io.a2a.spec.Event;
53-
import io.a2a.spec.GetAuthenticatedExtendedCardRequest;
54-
import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
5543
import io.a2a.spec.GetTaskPushNotificationConfigParams;
5644
import io.a2a.spec.ListTaskPushNotificationConfigParams;
5745
import io.a2a.spec.Message;
58-
import io.a2a.spec.MessageSendParams;
5946
import io.a2a.spec.Part;
6047
import io.a2a.spec.PushNotificationConfig;
61-
import io.a2a.spec.SendStreamingMessageRequest;
62-
import io.a2a.spec.SendStreamingMessageResponse;
63-
import io.a2a.spec.StreamingJSONRPCRequest;
6448
import io.a2a.spec.Task;
6549
import io.a2a.spec.TaskArtifactUpdateEvent;
6650
import io.a2a.spec.TaskIdParams;
@@ -74,8 +58,6 @@
7458
import io.a2a.spec.UnsupportedOperationError;
7559
import io.a2a.util.Utils;
7660

77-
import org.junit.jupiter.api.Assertions;
78-
import org.junit.jupiter.api.Assumptions;
7961
import org.junit.jupiter.api.Test;
8062
import org.junit.jupiter.api.Timeout;
8163

@@ -109,14 +91,14 @@ public abstract class AbstractA2AServerTest {
10991
.status(new TaskStatus(TaskState.SUBMITTED))
11092
.build();
11193

112-
private static final Message MESSAGE = new Message.Builder()
94+
protected static final Message MESSAGE = new Message.Builder()
11395
.messageId("111")
11496
.role(Message.Role.AGENT)
11597
.parts(new TextPart("test message"))
11698
.build();
11799
public static final String APPLICATION_JSON = "application/json";
118100

119-
private final int serverPort;
101+
protected final int serverPort;
120102
private Client client;
121103
private Client nonStreamingClient;
122104

@@ -586,72 +568,6 @@ public void testResubscribeNoExistingTaskError() throws Exception {
586568
}
587569
}
588570

589-
@Test
590-
public void testStreamingMethodWithAcceptHeader() throws Exception {
591-
// Skip this test for gRPC transport as it uses HTTP SSE which is JSONRPC-specific
592-
Assumptions.assumeTrue(TransportProtocol.JSONRPC.asString().equals(getTransportProtocol()),
593-
"HTTP streaming tests are only supported for JSONRPC transport");
594-
testSendStreamingMessage(MediaType.SERVER_SENT_EVENTS);
595-
}
596-
597-
@Test
598-
public void testSendMessageStreamNewMessageSuccess() throws Exception {
599-
// Skip this test for gRPC transport as it uses HTTP SSE which is JSONRPC-specific
600-
Assumptions.assumeTrue(TransportProtocol.JSONRPC.asString().equals(getTransportProtocol()),
601-
"HTTP streaming tests are only supported for JSONRPC transport");
602-
testSendStreamingMessage(null);
603-
}
604-
605-
private void testSendStreamingMessage(String mediaType) throws Exception {
606-
Message message = new Message.Builder(MESSAGE)
607-
.taskId(MINIMAL_TASK.getId())
608-
.contextId(MINIMAL_TASK.getContextId())
609-
.build();
610-
SendStreamingMessageRequest request = new SendStreamingMessageRequest(
611-
"1", new MessageSendParams(message, null, null));
612-
613-
CompletableFuture<HttpResponse<Stream<String>>> responseFuture = initialiseStreamingRequest(request, mediaType);
614-
615-
CountDownLatch latch = new CountDownLatch(1);
616-
AtomicReference<Throwable> errorRef = new AtomicReference<>();
617-
618-
responseFuture.thenAccept(response -> {
619-
if (response.statusCode() != 200) {
620-
//errorRef.set(new IllegalStateException("Status code was " + response.statusCode()));
621-
throw new IllegalStateException("Status code was " + response.statusCode());
622-
}
623-
response.body().forEach(line -> {
624-
try {
625-
SendStreamingMessageResponse jsonResponse = extractJsonResponseFromSseLine(line);
626-
if (jsonResponse != null) {
627-
assertNull(jsonResponse.getError());
628-
Message messageResponse = (Message) jsonResponse.getResult();
629-
assertEquals(MESSAGE.getMessageId(), messageResponse.getMessageId());
630-
assertEquals(MESSAGE.getRole(), messageResponse.getRole());
631-
Part<?> part = messageResponse.getParts().get(0);
632-
assertEquals(Part.Kind.TEXT, part.getKind());
633-
assertEquals("test message", ((TextPart) part).getText());
634-
latch.countDown();
635-
}
636-
} catch (JsonProcessingException e) {
637-
throw new RuntimeException(e);
638-
}
639-
});
640-
}).exceptionally(t -> {
641-
if (!isStreamClosedError(t)) {
642-
errorRef.set(t);
643-
}
644-
latch.countDown();
645-
return null;
646-
});
647-
648-
649-
boolean dataRead = latch.await(20, TimeUnit.SECONDS);
650-
Assertions.assertTrue(dataRead);
651-
Assertions.assertNull(errorRef.get());
652-
653-
}
654-
655571
@Test
656572
public void testListPushNotificationConfigWithConfigId() throws Exception {
657573
saveTaskInTaskStore(MINIMAL_TASK);
@@ -871,23 +787,7 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio
871787
}
872788
}
873789

874-
private SendStreamingMessageResponse extractJsonResponseFromSseLine(String line) throws JsonProcessingException {
875-
line = extractSseData(line);
876-
if (line != null) {
877-
return Utils.OBJECT_MAPPER.readValue(line, SendStreamingMessageResponse.class);
878-
}
879-
return null;
880-
}
881-
882-
private static String extractSseData(String line) {
883-
if (line.startsWith("data:")) {
884-
line = line.substring(5).trim();
885-
return line;
886-
}
887-
return null;
888-
}
889-
890-
private boolean isStreamClosedError(Throwable throwable) {
790+
protected boolean isStreamClosedError(Throwable throwable) {
891791
// Unwrap the CompletionException
892792
Throwable cause = throwable;
893793

@@ -900,29 +800,6 @@ private boolean isStreamClosedError(Throwable throwable) {
900800
return false;
901801
}
902802

903-
private CompletableFuture<HttpResponse<Stream<String>>> initialiseStreamingRequest(
904-
StreamingJSONRPCRequest<?> request, String mediaType) throws Exception {
905-
906-
// Create the client
907-
HttpClient client = HttpClient.newBuilder()
908-
.version(HttpClient.Version.HTTP_2)
909-
.build();
910-
911-
// Create the request
912-
HttpRequest.Builder builder = HttpRequest.newBuilder()
913-
.uri(URI.create("http://localhost:" + serverPort + "/"))
914-
.POST(HttpRequest.BodyPublishers.ofString(Utils.OBJECT_MAPPER.writeValueAsString(request)))
915-
.header("Content-Type", APPLICATION_JSON);
916-
if (mediaType != null) {
917-
builder.header("Accept", mediaType);
918-
}
919-
HttpRequest httpRequest = builder.build();
920-
921-
922-
// Send request async and return the CompletableFuture
923-
return client.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofLines());
924-
}
925-
926803
protected void saveTaskInTaskStore(Task task) throws Exception {
927804
HttpClient client = HttpClient.newBuilder()
928805
.version(HttpClient.Version.HTTP_2)

0 commit comments

Comments
 (0)