Skip to content

Commit 7f50a90

Browse files
kabirclaude
andauthored
fix: synchronize ClientTaskManager to prevent race condition in concu… (#742)
…rrent SSE event processing ClientTaskManager was not thread-safe, causing intermittent test failures when SSE events were delivered concurrently by the HTTP client. The race condition occurred when multiple events (startWork, addArtifact, complete) were emitted rapidly in streaming mode. Without synchronization, concurrent threads could interleave read-modify-write operations on currentTask, resulting in lost updates. Specifically, the artifact update could be overwritten by a status update, leaving the final COMPLETED task with no artifacts. The fast time to failure of the test 0.017s, indicates that the COMPLETED status update was received, and that the latch did not time out. This manifested as intermittent failures in agent-to-agent tests where the test received a COMPLETED task but extractTextFromTask() returned an empty string because the artifact list was empty. Fix: Added synchronized keyword to all ClientTaskManager methods (getCurrentTask, saveTaskEvent variants, updateWithMessage) to ensure atomic processing of events regardless of HTTP client threading behavior. Impact: All client transports (JSON-RPC, gRPC, REST) in streaming mode Validated: 100 consecutive test iterations passed on CI Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent ab77fd0 commit 7f50a90

1 file changed

Lines changed: 5 additions & 5 deletions

File tree

client/base/src/main/java/io/a2a/client/ClientTaskManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,22 @@ public ClientTaskManager() {
3535
this.contextId = null;
3636
}
3737

38-
public Task getCurrentTask() throws A2AClientInvalidStateError {
38+
public synchronized Task getCurrentTask() throws A2AClientInvalidStateError {
3939
if (currentTask == null) {
4040
throw new A2AClientInvalidStateError("No current task");
4141
}
4242
return currentTask;
4343
}
4444

45-
public Task saveTaskEvent(Task task) throws A2AClientInvalidArgsError {
45+
public synchronized Task saveTaskEvent(Task task) throws A2AClientInvalidArgsError {
4646
if (currentTask != null) {
4747
throw new A2AClientInvalidArgsError("Task is already set, create new manager for new tasks.");
4848
}
4949
saveTask(task);
5050
return task;
5151
}
5252

53-
public Task saveTaskEvent(TaskStatusUpdateEvent taskStatusUpdateEvent) throws A2AClientError {
53+
public synchronized Task saveTaskEvent(TaskStatusUpdateEvent taskStatusUpdateEvent) throws A2AClientError {
5454
if (taskId == null) {
5555
taskId = taskStatusUpdateEvent.taskId();
5656
}
@@ -86,7 +86,7 @@ public Task saveTaskEvent(TaskStatusUpdateEvent taskStatusUpdateEvent) throws A2
8686
return currentTask;
8787
}
8888

89-
public Task saveTaskEvent(TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
89+
public synchronized Task saveTaskEvent(TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
9090
if (taskId == null) {
9191
taskId = taskArtifactUpdateEvent.taskId();
9292
}
@@ -113,7 +113,7 @@ public Task saveTaskEvent(TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
113113
* @param task the task to update
114114
* @return the updated task
115115
*/
116-
public Task updateWithMessage(Message message, Task task) {
116+
public synchronized Task updateWithMessage(Message message, Task task) {
117117
Task.Builder taskBuilder = Task.builder(task);
118118
List<Message> history = new ArrayList<>(task.history());
119119
if (task.status().message() != null) {

0 commit comments

Comments
 (0)