Skip to content

Commit 60b9261

Browse files
committed
fix: improve SSE error handling and add early task validation
- Detect SSE vs plain JSON responses in HTTP clients - Add early task validation before opening streams - Ensure JSON-RPC spec compliance for error responses - Apply validation consistently across all transports Fixes a2aproject#791 Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent 786e5fd commit 60b9261

9 files changed

Lines changed: 277 additions & 76 deletions

File tree

extras/http-client-vertx/src/main/java/org/a2aproject/sdk/client/http/VertxA2AHttpClient.java

Lines changed: 205 additions & 60 deletions
Large diffs are not rendered by default.

extras/opentelemetry/server/src/main/java/org/a2aproject/sdk/extras/opentelemetry/OpenTelemetryRequestHandlerDecorator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import jakarta.enterprise.inject.Any;
4444
import jakarta.inject.Inject;
4545
import java.util.concurrent.Flow;
46+
import org.jspecify.annotations.Nullable;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -455,7 +456,11 @@ public void onDeleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigP
455456
span.end();
456457
}
457458
}
458-
459+
460+
public void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError {
461+
delegate.validateRequestedTask(requestedTaskId);
462+
}
463+
459464
private boolean extractRequest() {
460465
return Boolean.getBoolean(EXTRACT_REQUEST_SYS_PROPERTY);
461466
}

http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ protected CompletableFuture<Void> asyncRequest(
142142
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
143143
private Flow.@Nullable Subscription subscription;
144144
private volatile boolean errorRaised = false;
145+
private boolean isSseStream = false;
146+
private boolean firstMeaningfulLineSeen = false;
145147

146148
@Override
147149
public void onSubscribe(Flow.Subscription subscription) {
@@ -151,10 +153,23 @@ public void onSubscribe(Flow.Subscription subscription) {
151153

152154
@Override
153155
public void onNext(String item) {
154-
// SSE messages sometimes start with "data:". Strip that off
155-
if (item != null && item.startsWith("data:")) {
156-
item = item.substring(5).trim();
157-
if (!item.isEmpty()) {
156+
if (item != null && !item.isEmpty()) {
157+
if (!firstMeaningfulLineSeen) {
158+
firstMeaningfulLineSeen = true;
159+
isSseStream = item.startsWith("data:") || item.startsWith(":")
160+
|| item.startsWith("event:") || item.startsWith("id:")
161+
|| item.startsWith("retry:");
162+
}
163+
if (isSseStream) {
164+
if (item.startsWith("data:")) {
165+
String data = item.substring(5).trim();
166+
if (!data.isEmpty()) {
167+
messageConsumer.accept(data);
168+
}
169+
}
170+
// Other SSE control lines (event:, id:, retry:, :) are ignored
171+
} else {
172+
// Plain error body: deliver so SSEEventListener can parse the typed error
158173
messageConsumer.accept(item);
159174
}
160175
}

reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,12 @@ private A2AResponse<?> processNonStreamingRequest(NonStreamingJSONRPCRequest<?>
438438
* @return a Multi stream of JSON-RPC responses
439439
*/
440440
private Multi<? extends A2AResponse<?>> processStreamingRequest(
441-
A2ARequest<?> request, ServerCallContext context) {
441+
A2ARequest<?> request, ServerCallContext context) throws A2AError {
442+
if (request instanceof SendStreamingMessageRequest req) {
443+
jsonRpcHandler.validateRequestedTask(req.getParams().message().taskId());
444+
} else if (request instanceof SubscribeToTaskRequest req) {
445+
jsonRpcHandler.validateRequestedTask(req.getParams().id());
446+
}
442447
try {
443448
Flow.Publisher<? extends A2AResponse<?>> publisher;
444449
if (request instanceof SendStreamingMessageRequest req) {
@@ -450,8 +455,6 @@ private Multi<? extends A2AResponse<?>> processStreamingRequest(
450455
}
451456
return Multi.createFrom().publisher(publisher);
452457
} catch (A2AError error) {
453-
// For streaming endpoints, wrap immediate errors (like TaskNotFoundError,
454-
// UnsupportedOperationError) in error response and send as first SSE event
455458
return Multi.createFrom().item(generateErrorResponse(request, error));
456459
}
457460
}

server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,23 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon
10611061
return new MessageSendSetup(taskManager, task, requestContext);
10621062
}
10631063

1064+
@Override
1065+
public void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError {
1066+
if (requestedTaskId == null) {
1067+
return;
1068+
}
1069+
Task task = taskStore.get(requestedTaskId);
1070+
if (task == null) {
1071+
throw new TaskNotFoundError();
1072+
}
1073+
1074+
if (task.status().state().isFinal()) {
1075+
throw new UnsupportedOperationError(null, String.format(
1076+
"Cannot send message to task %s: task is in terminal state %s and cannot accept further messages",
1077+
task.id(), task.status().state()), null);
1078+
}
1079+
}
1080+
10641081
private @Nullable Task validateRequestedTask(MessageSendParams params) throws A2AError {
10651082
String requestedTaskId = params.message().taskId();
10661083
if (requestedTaskId == null) {

server-common/src/main/java/org/a2aproject/sdk/server/requesthandlers/RequestHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.a2aproject.sdk.spec.TaskIdParams;
1919
import org.a2aproject.sdk.spec.TaskPushNotificationConfig;
2020
import org.a2aproject.sdk.spec.TaskQueryParams;
21+
import org.jspecify.annotations.Nullable;
2122

2223
public interface RequestHandler {
2324
Task onGetTask(
@@ -59,4 +60,6 @@ ListTaskPushNotificationConfigsResult onListTaskPushNotificationConfigs(
5960
void onDeleteTaskPushNotificationConfig(
6061
DeleteTaskPushNotificationConfigParams params,
6162
ServerCallContext context) throws A2AError;
63+
64+
void validateRequestedTask(@Nullable String requestedTaskId) throws A2AError;
6265
}

spec-grpc/src/main/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,12 @@ public static StreamResponse parseResponseEvent(String body) throws JsonMappingE
284284
JsonElement jelement = JsonParser.parseString(body);
285285
JsonObject jsonRpc = jelement.getAsJsonObject();
286286
String version = getAndValidateJsonrpc(jsonRpc);
287-
Object id = getAndValidateId(jsonRpc);
288-
JsonElement paramsNode = jsonRpc.get("result");
287+
// Check for error before validating id: per JSON-RPC spec, error responses may have null id
289288
if (jsonRpc.has("error")) {
290289
throw processError(jsonRpc.getAsJsonObject("error"));
291290
}
291+
Object id = getAndValidateId(jsonRpc);
292+
JsonElement paramsNode = jsonRpc.get("result");
292293
StreamResponse.Builder builder = StreamResponse.newBuilder();
293294
parseRequestBody(paramsNode, builder, id);
294295
return builder.build();

transport/jsonrpc/src/main/java/org/a2aproject/sdk/transport/jsonrpc/handler/JSONRPCHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,16 +381,13 @@ public Flow.Publisher<SendStreamingMessageResponse> onSubscribeToTask(
381381
request.getId(),
382382
new InvalidRequestError("Streaming is not supported by the agent")));
383383
}
384-
384+
requestHandler.validateRequestedTask(request.getParams().id());
385385
try {
386386
Flow.Publisher<StreamingEventKind> publisher =
387387
requestHandler.onSubscribeToTask(request.getParams(), context);
388388
// We can't use the convertingProcessor convenience method since that propagates any errors as an error handled
389389
// via Subscriber.onError() rather than as part of the SendStreamingResponse payload
390390
return convertToSendStreamingMessageResponse(request.getId(), publisher);
391-
} catch (TaskNotFoundError | UnsupportedOperationError e) {
392-
// Re-throw initial validation errors for routing layer to wrap in SSE format
393-
throw e;
394391
} catch (A2AError e) {
395392
// Other A2AError types - wrap inline as part of the stream
396393
return ZeroPublisher.fromItems(new SendStreamingMessageResponse(request.getId(), e));
@@ -745,4 +742,8 @@ public void onComplete() {
745742
}, executor);
746743
});
747744
}
745+
746+
public void validateRequestedTask(String requestedTaskId) {
747+
requestHandler.validateRequestedTask(requestedTaskId);
748+
}
748749
}

transport/rest/src/main/java/org/a2aproject/sdk/transport/rest/handler/RestHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsParams;
5454
import org.a2aproject.sdk.spec.ListTaskPushNotificationConfigsResult;
5555
import org.a2aproject.sdk.spec.ListTasksParams;
56+
import org.a2aproject.sdk.spec.MessageSendParams;
5657
import org.a2aproject.sdk.spec.PushNotificationNotSupportedError;
5758
import org.a2aproject.sdk.spec.StreamingEventKind;
5859
import org.a2aproject.sdk.spec.Task;
@@ -297,7 +298,13 @@ public HTTPRestResponse sendStreamingMessage(ServerCallContext context, String t
297298
org.a2aproject.sdk.grpc.SendMessageRequest.Builder request = org.a2aproject.sdk.grpc.SendMessageRequest.newBuilder();
298299
parseRequestBody(body, request);
299300
request.setTenant(tenant);
300-
Flow.Publisher<StreamingEventKind> publisher = requestHandler.onMessageSendStream(ProtoUtils.FromProto.messageSendParams(request), context);
301+
MessageSendParams params = ProtoUtils.FromProto.messageSendParams(request);
302+
try {
303+
requestHandler.validateRequestedTask(params.message().taskId());
304+
} catch (A2AError e) {
305+
return createErrorResponse(e);
306+
}
307+
Flow.Publisher<StreamingEventKind> publisher = requestHandler.onMessageSendStream(params, context);
301308
return createStreamingResponse(publisher);
302309
} catch (A2AError e) {
303310
return new HTTPRestStreamingResponse(ZeroPublisher.fromItems(new HTTPRestErrorResponse(e).toJson()));
@@ -370,7 +377,6 @@ public HTTPRestResponse createTaskPushNotificationConfiguration(ServerCallContex
370377
if (!taskIdFromBody.isEmpty() && !taskIdFromBody.equals(taskId)) {
371378
throw new InvalidParamsError("Task ID in request body (" + taskIdFromBody + ") does not match task ID in URL path (" + taskId + ").");
372379
}
373-
374380
builder.setTenant(tenant);
375381
builder.setTaskId(taskId);
376382
TaskPushNotificationConfig result = requestHandler.onCreateTaskPushNotificationConfig(ProtoUtils.FromProto.createTaskPushNotificationConfig(builder), context);
@@ -420,6 +426,11 @@ public HTTPRestResponse subscribeToTask(ServerCallContext context, String tenant
420426
return createErrorResponse(new InvalidRequestError("Streaming is not supported by the agent"));
421427
}
422428
TaskIdParams params = TaskIdParams.builder().id(taskId).tenant(tenant).build();
429+
try {
430+
requestHandler.validateRequestedTask(params.id());
431+
} catch (A2AError e) {
432+
return createErrorResponse(e);
433+
}
423434
Flow.Publisher<StreamingEventKind> publisher = requestHandler.onSubscribeToTask(params, context);
424435
return createStreamingResponse(publisher);
425436
} catch (A2AError e) {

0 commit comments

Comments
 (0)