Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.a2a.server.events;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
Expand All @@ -18,8 +21,8 @@
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,10 +72,20 @@ public class MainEventBusProcessor implements Runnable {

/**
* Optional executor for push notifications.
* If null, uses default ForkJoinPool (async).
* If null, uses per-task queues with dedicated consumer threads.
* Tests can inject a synchronous executor to ensure deterministic ordering.
*/
private volatile @Nullable java.util.concurrent.Executor pushNotificationExecutor = null;
private volatile @Nullable Executor pushNotificationExecutor = null;

/**
* Per-task push notifiers for ordered, isolated push notification delivery.
* Key: taskId, Value: TaskPushNotifier for that task.
* Notifiers are lazily cleaned up after NOTIFIER_EXPIRY_MS of inactivity.
*/
private final ConcurrentHashMap<String, TaskPushNotifier> notifiers = new ConcurrentHashMap<>();

private static final int PUSH_NOTIFICATION_QUEUE_CAPACITY = 50;
private static final long NOTIFIER_EXPIRY_MS = 60_000;

private MainEventBus eventBus;

Expand Down Expand Up @@ -118,17 +131,18 @@ public void setCallback(MainEventBusProcessorCallback callback) {
/**
* Set a custom executor for push notifications (primarily for testing).
* <p>
* By default, push notifications are sent asynchronously using CompletableFuture.runAsync()
* with the default ForkJoinPool. For tests that need deterministic ordering of push
* notifications, inject a synchronous executor that runs tasks immediately on the calling thread.
* By default, push notifications are delivered via per-task queues with dedicated
* consumer threads, guaranteeing FIFO ordering per task and isolation between tasks.
* For tests that need deterministic synchronous delivery, inject a synchronous
* executor that runs tasks immediately on the calling thread.
* </p>
* Example synchronous executor for tests:
* <pre>{@code
* Executor syncExecutor = Runnable::run;
* mainEventBusProcessor.setPushNotificationExecutor(syncExecutor);
* }</pre>
*
* @param executor the executor to use for push notifications, or null to use default ForkJoinPool
* @param executor the executor to use for push notifications, or null to use per-task queues
*/
public void setPushNotificationExecutor(java.util.concurrent.Executor executor) {
this.pushNotificationExecutor = executor;
Expand Down Expand Up @@ -158,14 +172,16 @@ void stop() {
processorThread.interrupt();
try {
long start = System.currentTimeMillis();
processorThread.join(5000); // Wait up to 5 seconds
processorThread.join(5000);
long elapsed = System.currentTimeMillis() - start;
LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
}
}
notifiers.values().forEach(TaskPushNotifier::shutdown);
notifiers.clear();
LOGGER.info("MainEventBusProcessor stopped");
}

Expand Down Expand Up @@ -335,54 +351,105 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated

/**
* Sends push notification for the streaming event AFTER persistence.
* <p>
* This is called after updateTaskStore() to ensure the notification contains
* the latest persisted state, avoiding race conditions.
* </p>
* <p>
* <b>CRITICAL:</b> Push notifications are sent asynchronously in the background
* to avoid blocking event distribution to ChildQueues. The 83ms overhead from
* PushNotificationSender.sendNotification() was causing streaming delays.
* </p>
* <p>
* <b>IMPORTANT:</b> The event parameter is the actual event being processed.
* This ensures we send the event as it was when processed, not whatever state
* might exist in TaskStore when the async callback executes (subsequent events
* may have already updated the store).
* </p>
* <p>
* Supports all StreamingEventKind event types per A2A spec section 4.3.3:
* Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent.
* The event will be automatically wrapped in StreamResponse format by JsonUtil.
* </p>
* <p>
* <b>NOTE:</b> Tests can inject a synchronous executor via setPushNotificationExecutor()
* to ensure deterministic ordering of push notifications in the test environment.
* </p>
*
* @param taskId the task ID
* @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent)
* @param event the streaming event to send
*/
private void sendPushNotification(String taskId, StreamingEventKind event) {
Runnable pushTask = () -> {
try {
if (event != null) {
LOGGER.debug("Sending push notification for task {}", taskId);
void sendPushNotification(String taskId, StreamingEventKind event) {
// Use injected executor if set (synchronous mode for tests)
if (pushNotificationExecutor != null) {
pushNotificationExecutor.execute(() -> {
try {
pushSender.sendNotification(event);
} else {
LOGGER.debug("Skipping push notification - event is null for task {}", taskId);
} catch (Exception e) {
LOGGER.error("Error sending push notification for task {}", taskId, e);
}
});
return;
}

// Lazy cleanup + get-or-create: if existing notifier is expired, replace it
notifiers.compute(taskId, (id, existing) -> {
if (existing != null && !existing.isExpired()) {
existing.touch();
return existing;
}
if (existing != null) existing.shutdown();
return new TaskPushNotifier(taskId);
}).offer(event);
Comment on lines +372 to +379
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The shutdown() method on an expired TaskPushNotifier is called synchronously within the notifiers.compute() block. This block is executed by the single MainEventBusProcessor thread, which is a critical component for processing all events. The shutdown() method contains a consumerThread.join(2000), which can block for up to 2 seconds if the consumer thread is stuck (e.g., in a non-interruptible network call within pushSender). This could severely degrade or stall the main event processing loop.

Consider making the shutdown of expired notifiers asynchronous to avoid blocking the MainEventBusProcessor thread. You could use a dedicated executor or simply a new thread for this cleanup task.

        notifiers.compute(taskId, (id, existing) -> {
            if (existing != null && !existing.isExpired()) {
                existing.touch();
                return existing;
            }
            if (existing != null) {
                new Thread(existing::shutdown, "Notifier-Shutdown-" + existing.taskId).start();
            }
            return new TaskPushNotifier(taskId);
        }).offer(event);

}

/**
* Per-task push notifier with a bounded queue and a dedicated consumer thread.
* Guarantees FIFO ordering per task and isolates push delivery between tasks.
*/
private class TaskPushNotifier {
private final String taskId;
private final LinkedBlockingQueue<StreamingEventKind> queue;
private final Thread consumerThread;
private volatile long lastAccessTime;
private volatile boolean shutdown = false;

TaskPushNotifier(String taskId) {
this.taskId = taskId;
this.queue = new LinkedBlockingQueue<>(PUSH_NOTIFICATION_QUEUE_CAPACITY);
this.lastAccessTime = System.currentTimeMillis();
this.consumerThread = new Thread(this::consumeLoop, "PushNotifier-" + taskId);
this.consumerThread.setDaemon(true);
this.consumerThread.start();
}

/**
* Enqueues the event for ordered delivery.
* If the queue is full, the oldest pending event is dropped to make room for the newest.
*/
void offer(StreamingEventKind event) {
if (shutdown) return;
touch();
while (!queue.offer(event)) {
// Queue is full - drop oldest to make room for newest
queue.poll();
}
}
Comment on lines +406 to +413
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The offer method can be called by multiple producer threads concurrently for the same task. The current implementation of the while loop to handle a full queue is not atomic. This can lead to a race condition where more than one "oldest" event is dropped to make room for new events if multiple threads attempt to offer to a full queue at the same time. To ensure that only one event is dropped per new event added, you should synchronize this method.

Suggested change
void offer(StreamingEventKind event) {
if (shutdown) return;
touch();
while (!queue.offer(event)) {
// Queue is full - drop oldest to make room for newest
queue.poll();
}
}
synchronized void offer(StreamingEventKind event) {
if (shutdown) return;
touch();
while (!queue.offer(event)) {
// Queue is full - drop oldest to make room for newest
queue.poll();
}
}
References
  1. Using AtomicBoolean or synchronization is a general recommendation for managing shared mutable state across multiple threads to ensure atomicity.


private void consumeLoop() {
while (!shutdown) {
try {
StreamingEventKind event = queue.poll(10, TimeUnit.MILLISECONDS);
if (event != null) {
sendNotification(event);
}
} catch (InterruptedException e) {
break;
}
}
}

private void sendNotification(StreamingEventKind event) {
try {
LOGGER.debug("Sending push notification for task {}", taskId);
pushSender.sendNotification(event);
} catch (Exception e) {
LOGGER.error("Error sending push notification for task {}", taskId, e);
// Don't rethrow - push notifications are best-effort
}
};
}

// Use custom executor if set (for tests), otherwise use default ForkJoinPool (async)
if (pushNotificationExecutor != null) {
pushNotificationExecutor.execute(pushTask);
} else {
CompletableFuture.runAsync(pushTask);
void shutdown() {
shutdown = true;
consumerThread.interrupt();
try {
consumerThread.join(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

boolean isExpired() {
return System.currentTimeMillis() - lastAccessTime > NOTIFIER_EXPIRY_MS;
}

void touch() {
lastAccessTime = System.currentTimeMillis();
}
}

Expand Down
Loading
Loading