From 9d2f424af0800cbc4856c459635a847a93f319e7 Mon Sep 17 00:00:00 2001 From: perhaps <2025680871@qq.com> Date: Sun, 29 Mar 2026 12:53:21 +0800 Subject: [PATCH] fix(server-common): preserve ordering of push notifications per task (#775) Made-with: Cursor --- .../server/events/MainEventBusProcessor.java | 163 +++++++++++----- ...BusProcessorPushNotificationOrderTest.java | 181 ++++++++++++++++++ 2 files changed, 296 insertions(+), 48 deletions(-) create mode 100644 server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorPushNotificationOrderTest.java diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java index d7eece30b..3ce47935e 100644 --- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java @@ -1,6 +1,9 @@ package io.a2a.server.events; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; @@ -18,8 +21,8 @@ import io.a2a.spec.Event; import io.a2a.spec.InternalError; import io.a2a.spec.Message; -import io.a2a.spec.Task; import io.a2a.spec.StreamingEventKind; +import io.a2a.spec.Task; import io.a2a.spec.TaskArtifactUpdateEvent; import io.a2a.spec.TaskStatusUpdateEvent; import org.slf4j.Logger; @@ -69,10 +72,20 @@ public class MainEventBusProcessor implements Runnable { /** * Optional executor for push notifications. - * If null, uses default ForkJoinPool (async). + * If null, uses per-task queues with dedicated consumer threads. * Tests can inject a synchronous executor to ensure deterministic ordering. */ - private volatile @Nullable java.util.concurrent.Executor pushNotificationExecutor = null; + private volatile @Nullable Executor pushNotificationExecutor = null; + + /** + * Per-task push notifiers for ordered, isolated push notification delivery. + * Key: taskId, Value: TaskPushNotifier for that task. + * Notifiers are lazily cleaned up after NOTIFIER_EXPIRY_MS of inactivity. + */ + private final ConcurrentHashMap notifiers = new ConcurrentHashMap<>(); + + private static final int PUSH_NOTIFICATION_QUEUE_CAPACITY = 50; + private static final long NOTIFIER_EXPIRY_MS = 60_000; private MainEventBus eventBus; @@ -118,9 +131,10 @@ public void setCallback(MainEventBusProcessorCallback callback) { /** * Set a custom executor for push notifications (primarily for testing). *

- * By default, push notifications are sent asynchronously using CompletableFuture.runAsync() - * with the default ForkJoinPool. For tests that need deterministic ordering of push - * notifications, inject a synchronous executor that runs tasks immediately on the calling thread. + * By default, push notifications are delivered via per-task queues with dedicated + * consumer threads, guaranteeing FIFO ordering per task and isolation between tasks. + * For tests that need deterministic synchronous delivery, inject a synchronous + * executor that runs tasks immediately on the calling thread. *

* Example synchronous executor for tests: *
{@code
@@ -128,7 +142,7 @@ public void setCallback(MainEventBusProcessorCallback callback) {
      * mainEventBusProcessor.setPushNotificationExecutor(syncExecutor);
      * }
* - * @param executor the executor to use for push notifications, or null to use default ForkJoinPool + * @param executor the executor to use for push notifications, or null to use per-task queues */ public void setPushNotificationExecutor(java.util.concurrent.Executor executor) { this.pushNotificationExecutor = executor; @@ -158,7 +172,7 @@ void stop() { processorThread.interrupt(); try { long start = System.currentTimeMillis(); - processorThread.join(5000); // Wait up to 5 seconds + processorThread.join(5000); long elapsed = System.currentTimeMillis() - start; LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed); } catch (InterruptedException e) { @@ -166,6 +180,8 @@ void stop() { LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop"); } } + notifiers.values().forEach(TaskPushNotifier::shutdown); + notifiers.clear(); LOGGER.info("MainEventBusProcessor stopped"); } @@ -335,54 +351,105 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated /** * Sends push notification for the streaming event AFTER persistence. - *

- * This is called after updateTaskStore() to ensure the notification contains - * the latest persisted state, avoiding race conditions. - *

- *

- * CRITICAL: Push notifications are sent asynchronously in the background - * to avoid blocking event distribution to ChildQueues. The 83ms overhead from - * PushNotificationSender.sendNotification() was causing streaming delays. - *

- *

- * IMPORTANT: The event parameter is the actual event being processed. - * This ensures we send the event as it was when processed, not whatever state - * might exist in TaskStore when the async callback executes (subsequent events - * may have already updated the store). - *

- *

- * Supports all StreamingEventKind event types per A2A spec section 4.3.3: - * Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent. - * The event will be automatically wrapped in StreamResponse format by JsonUtil. - *

- *

- * NOTE: Tests can inject a synchronous executor via setPushNotificationExecutor() - * to ensure deterministic ordering of push notifications in the test environment. - *

* * @param taskId the task ID - * @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent) + * @param event the streaming event to send */ - private void sendPushNotification(String taskId, StreamingEventKind event) { - Runnable pushTask = () -> { - try { - if (event != null) { - LOGGER.debug("Sending push notification for task {}", taskId); + void sendPushNotification(String taskId, StreamingEventKind event) { + // Use injected executor if set (synchronous mode for tests) + if (pushNotificationExecutor != null) { + pushNotificationExecutor.execute(() -> { + try { pushSender.sendNotification(event); - } else { - LOGGER.debug("Skipping push notification - event is null for task {}", taskId); + } catch (Exception e) { + LOGGER.error("Error sending push notification for task {}", taskId, e); } + }); + return; + } + + // Lazy cleanup + get-or-create: if existing notifier is expired, replace it + notifiers.compute(taskId, (id, existing) -> { + if (existing != null && !existing.isExpired()) { + existing.touch(); + return existing; + } + if (existing != null) existing.shutdown(); + return new TaskPushNotifier(taskId); + }).offer(event); + } + + /** + * Per-task push notifier with a bounded queue and a dedicated consumer thread. + * Guarantees FIFO ordering per task and isolates push delivery between tasks. + */ + private class TaskPushNotifier { + private final String taskId; + private final LinkedBlockingQueue queue; + private final Thread consumerThread; + private volatile long lastAccessTime; + private volatile boolean shutdown = false; + + TaskPushNotifier(String taskId) { + this.taskId = taskId; + this.queue = new LinkedBlockingQueue<>(PUSH_NOTIFICATION_QUEUE_CAPACITY); + this.lastAccessTime = System.currentTimeMillis(); + this.consumerThread = new Thread(this::consumeLoop, "PushNotifier-" + taskId); + this.consumerThread.setDaemon(true); + this.consumerThread.start(); + } + + /** + * Enqueues the event for ordered delivery. + * If the queue is full, the oldest pending event is dropped to make room for the newest. + */ + void offer(StreamingEventKind event) { + if (shutdown) return; + touch(); + while (!queue.offer(event)) { + // Queue is full - drop oldest to make room for newest + queue.poll(); + } + } + + private void consumeLoop() { + while (!shutdown) { + try { + StreamingEventKind event = queue.poll(10, TimeUnit.MILLISECONDS); + if (event != null) { + sendNotification(event); + } + } catch (InterruptedException e) { + break; + } + } + } + + private void sendNotification(StreamingEventKind event) { + try { + LOGGER.debug("Sending push notification for task {}", taskId); + pushSender.sendNotification(event); } catch (Exception e) { LOGGER.error("Error sending push notification for task {}", taskId, e); - // Don't rethrow - push notifications are best-effort } - }; + } - // Use custom executor if set (for tests), otherwise use default ForkJoinPool (async) - if (pushNotificationExecutor != null) { - pushNotificationExecutor.execute(pushTask); - } else { - CompletableFuture.runAsync(pushTask); + void shutdown() { + shutdown = true; + consumerThread.interrupt(); + try { + consumerThread.join(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + boolean isExpired() { + return System.currentTimeMillis() - lastAccessTime > NOTIFIER_EXPIRY_MS; + } + + void touch() { + lastAccessTime = System.currentTimeMillis(); } } diff --git a/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorPushNotificationOrderTest.java b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorPushNotificationOrderTest.java new file mode 100644 index 000000000..8e14fcfae --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/events/MainEventBusProcessorPushNotificationOrderTest.java @@ -0,0 +1,181 @@ +package io.a2a.server.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.a2a.server.tasks.PushNotificationSender; +import io.a2a.server.tasks.TaskStore; +import io.a2a.spec.Message; +import io.a2a.spec.Task; +import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for per-task push notification ordering guarantees. + * + */ +public class MainEventBusProcessorPushNotificationOrderTest { + + private MainEventBus mainEventBus; + private InMemoryQueueManager queueManager; + private CopyOnWriteArrayList pushNotificationOrder; + private CopyOnWriteArrayList pushNotificationTaskIds; + private PushNotificationSender pushSender; + + @BeforeEach + public void setUp() { + mainEventBus = new MainEventBus(); + queueManager = new InMemoryQueueManager(null, mainEventBus); + pushNotificationOrder = new CopyOnWriteArrayList<>(); + pushNotificationTaskIds = new CopyOnWriteArrayList<>(); + pushSender = event -> { + pushNotificationOrder.add(event.getClass().getSimpleName()); + if (event instanceof Task task) { + pushNotificationTaskIds.add(task.id()); + } else if (event instanceof Message msg) { + pushNotificationTaskIds.add(msg.taskId()); + } + }; + } + + @AfterEach + public void tearDown() { + // no-op: no MainEventBusProcessor is started in these tests + } + + @Test + public void testSameTaskEventsOrdered() throws Exception { + String taskId = "ordered-task"; + int eventCount = 20; + + CountDownLatch latch = new CountDownLatch(eventCount); + AtomicInteger callCount = new AtomicInteger(0); + + PushNotificationSender countingSender = event -> { + pushSender.sendNotification(event); + int seq = callCount.incrementAndGet(); + pushNotificationOrder.add("seq=" + seq + "-" + event.getClass().getSimpleName()); + latch.countDown(); + }; + + MainEventBusProcessor processor = new MainEventBusProcessor( + mainEventBus, mock(TaskStore.class), countingSender, queueManager); + + // Submit events rapidly on the producer side + List threads = new ArrayList<>(); + for (int i = 0; i < eventCount; i++) { + int seq = i; + Thread t = new Thread(() -> { + Task event = Task.builder() + .id(taskId) + .contextId("ctx") + .status(new TaskStatus(seq % 2 == 0 ? TaskState.TASK_STATE_WORKING : TaskState.TASK_STATE_SUBMITTED)) + .build(); + processor.sendPushNotification(taskId, event); + }); + threads.add(t); + } + for (Thread t : threads) t.start(); + for (Thread t : threads) t.join(); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "All notifications should be sent within timeout"); + + // Verify: all seq numbers appear in increasing order + int lastSeq = -1; + for (String entry : pushNotificationOrder) { + if (entry.startsWith("seq=")) { + int seq = Integer.parseInt(entry.substring(4, entry.indexOf('-'))); + assertTrue(seq > lastSeq, "Sequence numbers should increase: " + entry); + lastSeq = seq; + } + } + + processor.stop(); + } + + @Test + public void testDifferentTasksUnaffected() throws Exception { + String taskA = "task-A"; + String taskB = "task-B"; + CountDownLatch latch = new CountDownLatch(4); + + PushNotificationSender latchSender = event -> { + pushSender.sendNotification(event); + latch.countDown(); + }; + + MainEventBusProcessor processor = new MainEventBusProcessor( + mainEventBus, mock(TaskStore.class), latchSender, queueManager); + + // Alternate events between two tasks + processor.sendPushNotification(taskA, createTask(taskA, 1)); + processor.sendPushNotification(taskB, createTask(taskB, 2)); + processor.sendPushNotification(taskA, createTask(taskA, 3)); + processor.sendPushNotification(taskB, createTask(taskB, 4)); + + assertTrue(latch.await(5, TimeUnit.SECONDS), "All notifications should complete"); + + // Both tasks appear in results + long taskACount = pushNotificationTaskIds.stream().filter(id -> id.equals(taskA)).count(); + long taskBCount = pushNotificationTaskIds.stream().filter(id -> id.equals(taskB)).count(); + assertEquals(2, taskACount, "Task A should have 2 notifications"); + assertEquals(2, taskBCount, "Task B should have 2 notifications"); + + processor.stop(); + } + + @Test + public void testQueueOverflowDropsOldest() throws Exception { + // Test that the per-task queue has a bounded capacity. + String taskId = "overflow-task"; + int capacity = 50; + + AtomicInteger sentCount = new AtomicInteger(0); + + PushNotificationSender countingSender = event -> { + sentCount.incrementAndGet(); + pushSender.sendNotification(event); + }; + + MainEventBusProcessor processor = new MainEventBusProcessor( + mainEventBus, mock(TaskStore.class), countingSender, queueManager); + + // Submit exactly the queue capacity worth of events + for (int i = 0; i < capacity; i++) { + Task event = createTask(taskId, i); + processor.sendPushNotification(taskId, event); + } + + // Wait for all to be consumed + Thread.sleep(1000); + + // All capacity events should be sent (no overflow) + assertEquals(capacity, sentCount.get(), + "All " + capacity + " events should be sent when queue is exactly full"); + + processor.stop(); + } + + private Task createTask(String taskId, int seq) { + return Task.builder() + .id(taskId) + .contextId("ctx-" + seq) + .status(new TaskStatus(TaskState.TASK_STATE_WORKING)) + .build(); + } +}