-
Notifications
You must be signed in to change notification settings - Fork 147
Expand file tree
/
Copy pathMainEventBusProcessor.java
More file actions
491 lines (440 loc) · 20.8 KB
/
MainEventBusProcessor.java
File metadata and controls
491 lines (440 loc) · 20.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
package io.a2a.server.events;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskPersistenceException;
import io.a2a.server.tasks.TaskSerializationException;
import io.a2a.server.tasks.TaskStore;
import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
import io.a2a.spec.Message;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Background processor for the MainEventBus.
* <p>
* This processor runs in a dedicated background thread, consuming events from the MainEventBus
* and performing two critical operations in order:
* </p>
* <ol>
* <li>Update TaskStore with event data (persistence FIRST)</li>
* <li>Distribute event to ChildQueues (clients see it AFTER persistence)</li>
* </ol>
* <p>
* This architecture ensures clients never receive events before they're persisted,
* eliminating race conditions and enabling reliable event replay.
* </p>
* <p>
* <b>Note:</b> This bean is eagerly initialized by {@link MainEventBusProcessorInitializer}
* to ensure the background thread starts automatically when the application starts.
* </p>
*
* <h2>Exception Handling</h2>
* TaskStore persistence failures are caught and handled gracefully:
* <ul>
* <li>{@link TaskSerializationException} - Data corruption or schema mismatch.
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
* <li>{@link TaskPersistenceException} - Database/storage system failure.
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
* </ul>
*
* <p>Processing continues after errors - the failed event is distributed as InternalError
* to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.</p>
*/
@ApplicationScoped
public class MainEventBusProcessor implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class);
/**
* Callback for testing synchronization with async event processing.
* Default is NOOP to avoid null checks in production code.
* Tests can inject their own callback via setCallback().
*/
private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
/**
* Optional executor for push notifications.
* If null, uses per-task queues with dedicated consumer threads.
* Tests can inject a synchronous executor to ensure deterministic ordering.
*/
private volatile @Nullable Executor pushNotificationExecutor = null;
/**
* Per-task push notifiers for ordered, isolated push notification delivery.
* Key: taskId, Value: TaskPushNotifier for that task.
* Notifiers are lazily cleaned up after NOTIFIER_EXPIRY_MS of inactivity.
*/
private final ConcurrentHashMap<String, TaskPushNotifier> notifiers = new ConcurrentHashMap<>();
private static final int PUSH_NOTIFICATION_QUEUE_CAPACITY = 50;
private static final long NOTIFIER_EXPIRY_MS = 60_000;
private MainEventBus eventBus;
private TaskStore taskStore;
private PushNotificationSender pushSender;
private QueueManager queueManager;
private volatile boolean running = true;
private @Nullable Thread processorThread;
/**
* No-arg constructor for CDI proxying.
* CDI requires this for @ApplicationScoped beans.
* Fields are initialized via the @Inject constructor.
*/
@SuppressWarnings("NullAway")
protected MainEventBusProcessor() {
}
@Inject
public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNotificationSender pushSender, QueueManager queueManager) {
this.eventBus = eventBus;
this.taskStore = taskStore;
this.pushSender = pushSender;
this.queueManager = queueManager;
}
/**
* Set a callback for testing synchronization with async event processing.
* <p>
* This is primarily intended for tests that need to wait for event processing to complete.
* Pass null to reset to the default NOOP callback.
* </p>
*
* @param callback the callback to invoke during event processing, or null for NOOP
*/
public void setCallback(MainEventBusProcessorCallback callback) {
this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
}
/**
* Set a custom executor for push notifications (primarily for testing).
* <p>
* By default, push notifications are delivered via per-task queues with dedicated
* consumer threads, guaranteeing FIFO ordering per task and isolation between tasks.
* For tests that need deterministic synchronous delivery, inject a synchronous
* executor that runs tasks immediately on the calling thread.
* </p>
* Example synchronous executor for tests:
* <pre>{@code
* Executor syncExecutor = Runnable::run;
* mainEventBusProcessor.setPushNotificationExecutor(syncExecutor);
* }</pre>
*
* @param executor the executor to use for push notifications, or null to use per-task queues
*/
public void setPushNotificationExecutor(java.util.concurrent.Executor executor) {
this.pushNotificationExecutor = executor;
}
@PostConstruct
void start() {
processorThread = new Thread(this, "MainEventBusProcessor");
processorThread.setDaemon(true); // Allow JVM to exit even if this thread is running
processorThread.start();
LOGGER.info("MainEventBusProcessor started");
}
/**
* No-op method to force CDI proxy resolution and ensure @PostConstruct has been called.
* Called by MainEventBusProcessorInitializer during application startup.
*/
public void ensureStarted() {
// Method intentionally empty - just forces proxy resolution
}
@PreDestroy
void stop() {
LOGGER.info("MainEventBusProcessor stopping...");
running = false;
if (processorThread != null) {
processorThread.interrupt();
try {
long start = System.currentTimeMillis();
processorThread.join(5000);
long elapsed = System.currentTimeMillis() - start;
LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
}
}
notifiers.values().forEach(TaskPushNotifier::shutdown);
notifiers.clear();
LOGGER.info("MainEventBusProcessor stopped");
}
@Override
public void run() {
LOGGER.info("MainEventBusProcessor processing loop started");
while (running) {
try {
LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus...");
MainEventBusContext context = eventBus.take();
LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus",
context.taskId());
processEvent(context);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("MainEventBusProcessor interrupted, shutting down");
break;
} catch (Exception e) {
LOGGER.error("Error processing event from MainEventBus", e);
// Continue processing despite errors
}
}
LOGGER.info("MainEventBusProcessor processing loop ended");
}
private void processEvent(MainEventBusContext context) {
String taskId = context.taskId();
Event event = context.eventQueueItem().getEvent();
// MainEventBus.submit() guarantees this is always a MainQueue
EventQueue.MainQueue mainQueue = (EventQueue.MainQueue) context.eventQueue();
LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {}",
taskId, event.getClass().getSimpleName());
Event eventToDistribute = null;
boolean isReplicated = context.eventQueueItem().isReplicated();
try {
// Step 1: Update TaskStore FIRST (persistence before clients see it)
// If this throws, we distribute an error to ensure "persist before client visibility"
try {
boolean isFinal = updateTaskStore(taskId, event, isReplicated);
eventToDistribute = event; // Success - distribute original event
// Trigger replication AFTER successful persistence
// SKIP replication if task is final - ReplicatedQueueManager handles this via TaskFinalizedEvent
// to ensure final Task is sent before poison pill (QueueClosedEvent)
if (!isFinal) {
EventEnqueueHook hook = mainQueue.getEnqueueHook();
if (hook != null) {
LOGGER.debug("Triggering replication hook for task {} after successful persistence", taskId);
hook.onEnqueue(context.eventQueueItem());
}
} else {
LOGGER.debug("Task {} is final - skipping replication hook (handled by ReplicatedQueueManager)", taskId);
}
} catch (InternalError e) {
// Persistence failed - create error event to distribute instead
LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
String errorMessage = "Failed to persist event: " + e.getMessage();
eventToDistribute = e;
} catch (Exception e) {
LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
String errorMessage = "Failed to persist event: " + e.getMessage();
eventToDistribute = new InternalError(errorMessage);
}
// Step 2: Send push notification AFTER successful persistence (only from active node)
// Skip push notifications for replicated events to avoid duplicate notifications in multi-instance deployments
// Push notifications are sent for all StreamingEventKind events (Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
// per A2A spec section 4.3.3
if (!isReplicated && event instanceof StreamingEventKind streamingEvent) {
// Send the streaming event directly - it will be wrapped in StreamResponse format by PushNotificationSender
sendPushNotification(taskId, streamingEvent);
}
// Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
int childCount = mainQueue.getChildCount();
LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",
eventToDistribute.getClass().getSimpleName(), childCount, taskId);
// Create new EventQueueItem with the event to distribute (original or error)
EventQueueItem itemToDistribute = new LocalEventQueueItem(eventToDistribute);
mainQueue.distributeToChildren(itemToDistribute);
LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}",
eventToDistribute.getClass().getSimpleName(), childCount, taskId);
LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
} finally {
try {
// Step 4: Notify callback after all processing is complete
// Call callback with the distributed event (original or error)
if (eventToDistribute != null) {
callback.onEventProcessed(taskId, eventToDistribute);
// Step 5: If this is a final event, notify task finalization
// Only for successful persistence (not for errors)
if (eventToDistribute == event && isFinalEvent(event)) {
callback.onTaskFinalized(taskId);
}
}
} finally {
// ALWAYS release semaphore, even if processing fails
// Balances the acquire() in MainQueue.enqueueEvent()
mainQueue.releaseSemaphore();
}
}
}
/**
* Updates TaskStore using TaskManager.process().
* <p>
* Creates a temporary TaskManager instance for this event and delegates to its process() method,
* which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
* This leverages existing TaskManager logic for status updates, artifact appending, message history, etc.
* </p>
* <p>
* If persistence fails, the exception is propagated to processEvent() which distributes an
* InternalError to clients instead of the original event, ensuring "persist before visibility".
* See Gemini's comment: https://github.com/a2aproject/a2a-java/pull/515#discussion_r2604621833
* </p>
*
* @param taskId the task ID
* @param event the event to persist
* @return true if the task reached a final state, false otherwise
* @throws InternalError if persistence fails
*/
private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError {
try {
// Extract contextId from event (all relevant events have it)
String contextId = extractContextId(event);
// Create temporary TaskManager instance for this event
TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null);
// Use TaskManager.process() - handles all event types with existing logic
boolean isFinal = taskManager.process(event, isReplicated);
LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})",
taskId, event.getClass().getSimpleName(), isFinal, isReplicated);
return isFinal;
} catch (TaskSerializationException e) {
// Data corruption or schema mismatch - ALWAYS permanent
LOGGER.error("Task {} event serialization failed - data corruption detected: {}",
taskId, e.getMessage(), e);
throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage());
} catch (TaskPersistenceException e) {
// Database/storage failure
LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e);
throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage());
} catch (InternalError e) {
// Already an InternalError from TaskManager validation - pass through
LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e);
// Rethrow to prevent distributing unpersisted event to clients
throw e;
} catch (Exception e) {
// Unexpected exception type - treat as permanent failure
LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e);
// Rethrow to prevent distributing unpersisted event to clients
throw new InternalError("TaskStore persistence failed: " + e.getMessage());
}
}
/**
* Sends push notification for the streaming event AFTER persistence.
*
* @param taskId the task ID
* @param event the streaming event to send
*/
void sendPushNotification(String taskId, StreamingEventKind event) {
// Use injected executor if set (synchronous mode for tests)
if (pushNotificationExecutor != null) {
pushNotificationExecutor.execute(() -> {
try {
pushSender.sendNotification(event);
} catch (Exception e) {
LOGGER.error("Error sending push notification for task {}", taskId, e);
}
});
return;
}
// Lazy cleanup + get-or-create: if existing notifier is expired, replace it
notifiers.compute(taskId, (id, existing) -> {
if (existing != null && !existing.isExpired()) {
existing.touch();
return existing;
}
if (existing != null) existing.shutdown();
return new TaskPushNotifier(taskId);
}).offer(event);
}
/**
* Per-task push notifier with a bounded queue and a dedicated consumer thread.
* Guarantees FIFO ordering per task and isolates push delivery between tasks.
*/
private class TaskPushNotifier {
private final String taskId;
private final LinkedBlockingQueue<StreamingEventKind> queue;
private final Thread consumerThread;
private volatile long lastAccessTime;
private volatile boolean shutdown = false;
TaskPushNotifier(String taskId) {
this.taskId = taskId;
this.queue = new LinkedBlockingQueue<>(PUSH_NOTIFICATION_QUEUE_CAPACITY);
this.lastAccessTime = System.currentTimeMillis();
this.consumerThread = new Thread(this::consumeLoop, "PushNotifier-" + taskId);
this.consumerThread.setDaemon(true);
this.consumerThread.start();
}
/**
* Enqueues the event for ordered delivery.
* If the queue is full, the oldest pending event is dropped to make room for the newest.
*/
void offer(StreamingEventKind event) {
if (shutdown) return;
touch();
while (!queue.offer(event)) {
// Queue is full - drop oldest to make room for newest
queue.poll();
}
}
private void consumeLoop() {
while (!shutdown) {
try {
StreamingEventKind event = queue.poll(10, TimeUnit.MILLISECONDS);
if (event != null) {
sendNotification(event);
}
} catch (InterruptedException e) {
break;
}
}
}
private void sendNotification(StreamingEventKind event) {
try {
LOGGER.debug("Sending push notification for task {}", taskId);
pushSender.sendNotification(event);
} catch (Exception e) {
LOGGER.error("Error sending push notification for task {}", taskId, e);
}
}
void shutdown() {
shutdown = true;
consumerThread.interrupt();
try {
consumerThread.join(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
boolean isExpired() {
return System.currentTimeMillis() - lastAccessTime > NOTIFIER_EXPIRY_MS;
}
void touch() {
lastAccessTime = System.currentTimeMillis();
}
}
/**
* Extracts contextId from an event.
* Returns null if the event type doesn't have a contextId (e.g., Message).
*/
@Nullable
private String extractContextId(Event event) {
if (event instanceof Task task) {
return task.contextId();
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
return statusUpdate.contextId();
} else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) {
return artifactUpdate.contextId();
}
// Message and other events don't have contextId
return null;
}
/**
* Checks if an event represents a final task state.
*
* @param event the event to check
* @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN, or A2AError)
*/
private boolean isFinalEvent(Event event) {
if (event instanceof Task task) {
return task.status() != null && task.status().state() != null
&& task.status().state().isFinal();
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
return statusUpdate.isFinal();
} else if (event instanceof A2AError) {
// A2AError events are terminal - they trigger FAILED state transition
return true;
}
return false;
}
}