11package io .a2a .server .apps .common ;
22
3- import static io .restassured .RestAssured .given ;
4- import static org .hamcrest .Matchers .equalTo ;
53import static org .junit .jupiter .api .Assertions .assertEquals ;
64import static org .junit .jupiter .api .Assertions .assertFalse ;
75import static org .junit .jupiter .api .Assertions .assertInstanceOf ;
2119import java .util .List ;
2220import java .util .concurrent .CompletableFuture ;
2321import java .util .concurrent .CountDownLatch ;
24- import java .util .concurrent .ExecutorService ;
25- import java .util .concurrent .Executors ;
2622import java .util .concurrent .TimeUnit ;
2723import java .util .concurrent .atomic .AtomicBoolean ;
2824import java .util .concurrent .atomic .AtomicInteger ;
4541import io .a2a .client .transport .jsonrpc .JSONRPCTransportConfig ;
4642import io .a2a .client .transport .grpc .GrpcTransportConfig ;
4743import io .a2a .client .http .JdkA2AHttpClient ;
48- import io .a2a .spec .AuthenticatedExtendedCardNotConfiguredError ;
49- import io .a2a .spec .JSONRPCError ;
5044import io .grpc .ManagedChannelBuilder ;
5145import io .a2a .spec .A2AClientException ;
5246import io .a2a .spec .AgentCard ;
@@ -474,7 +468,9 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
474468
475469 // Create error handler
476470 Consumer <Throwable > errorHandler = error -> {
477- errorRef .set (error );
471+ if (!isStreamClosedError (error )) {
472+ errorRef .set (error );
473+ }
478474 eventLatch .countDown ();
479475 };
480476
@@ -484,8 +480,7 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
484480 .whenComplete ((unused , throwable ) -> subscriptionLatch .countDown ());
485481
486482 // Resubscribe to the task with specific consumer and error handler
487- List <BiConsumer <ClientEvent , AgentCard >> consumers = consumer != null ? List .of (consumer ) : List .of ();
488- getClient ().resubscribe (new TaskIdParams (MINIMAL_TASK .getId ()), consumers , errorHandler , null );
483+ getClient ().resubscribe (new TaskIdParams (MINIMAL_TASK .getId ()), List .of (consumer ), errorHandler , null );
489484
490485 // Wait for subscription to be established
491486 assertTrue (subscriptionLatch .await (15 , TimeUnit .SECONDS ));
@@ -544,7 +539,9 @@ public void testResubscribeNoExistingTaskError() throws Exception {
544539
545540 // Create error handler to capture the TaskNotFoundError
546541 Consumer <Throwable > errorHandler = error -> {
547- errorRef .set (error );
542+ if (!isStreamClosedError (error )) {
543+ errorRef .set (error );
544+ }
548545 errorLatch .countDown ();
549546 };
550547
@@ -578,8 +575,7 @@ public void testResubscribeNoExistingTaskError() throws Exception {
578575 fail ("Expected error for non-existent task resubscription" );
579576 }
580577 } catch (A2AClientException e ) {
581- // Also acceptable - the client might throw an exception immediately
582- assertInstanceOf (TaskNotFoundError .class , e .getCause ());
578+ fail ("Expected error for non-existent task resubscription" );
583579 }
584580 }
585581
0 commit comments