Skip to content

Commit ddf65aa

Browse files
kabirclaude
andauthored
feat: improve TaskStore error reporting with domain-specific exceptio… (a2aproject#682)
…ns (a2aproject#516) Replace generic RuntimeException with TaskSerializationException and TaskPersistenceException throughout the TaskStore persistence layer. Wire execptions into MainEventQueueBusProcessor. Fixes a2aproject#516 🦕 --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent c9df260 commit ddf65aa

14 files changed

Lines changed: 1753 additions & 115 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,4 @@ nbproject/
5555
.serena/
5656
.bob/
5757
claudedocs
58+
backlog/

extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java

Lines changed: 146 additions & 114 deletions
Large diffs are not rendered by default.

server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import io.a2a.server.tasks.PushNotificationSender;
1212
import io.a2a.server.tasks.TaskManager;
13+
import io.a2a.server.tasks.TaskPersistenceException;
14+
import io.a2a.server.tasks.TaskSerializationException;
1315
import io.a2a.server.tasks.TaskStore;
1416
import io.a2a.spec.A2AError;
1517
import io.a2a.spec.A2AServerException;
@@ -41,6 +43,18 @@
4143
* <b>Note:</b> This bean is eagerly initialized by {@link MainEventBusProcessorInitializer}
4244
* to ensure the background thread starts automatically when the application starts.
4345
* </p>
46+
*
47+
* <h2>Exception Handling</h2>
48+
* TaskStore persistence failures are caught and handled gracefully:
49+
* <ul>
50+
* <li>{@link TaskSerializationException} - Data corruption or schema mismatch.
51+
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
52+
* <li>{@link TaskPersistenceException} - Database/storage system failure.
53+
* Logged at ERROR level, distributed as {@link InternalError} to clients.</li>
54+
* </ul>
55+
*
56+
* <p>Processing continues after errors - the failed event is distributed as InternalError
57+
* to all ChildQueues, and the MainEventBusProcessor continues consuming subsequent events.</p>
4458
*/
4559
@ApplicationScoped
4660
public class MainEventBusProcessor implements Runnable {
@@ -293,11 +307,26 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
293307
LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})",
294308
taskId, event.getClass().getSimpleName(), isFinal, isReplicated);
295309
return isFinal;
310+
311+
} catch (TaskSerializationException e) {
312+
// Data corruption or schema mismatch - ALWAYS permanent
313+
LOGGER.error("Task {} event serialization failed - data corruption detected: {}",
314+
taskId, e.getMessage(), e);
315+
throw new InternalError("Failed to serialize task " + taskId + ": " + e.getMessage());
316+
317+
} catch (TaskPersistenceException e) {
318+
// Database/storage failure
319+
LOGGER.error("Task {} event persistence failed: {}", taskId, e.getMessage(), e);
320+
throw new InternalError("Storage failure for task " + taskId + ": " + e.getMessage());
321+
296322
} catch (InternalError e) {
323+
// Already an InternalError from TaskManager validation - pass through
297324
LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e);
298325
// Rethrow to prevent distributing unpersisted event to clients
299326
throw e;
327+
300328
} catch (Exception e) {
329+
// Unexpected exception type - treat as permanent failure
301330
LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e);
302331
// Rethrow to prevent distributing unpersisted event to clients
303332
throw new InternalError("TaskStore persistence failed: " + e.getMessage());

server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,62 @@
2525
* <p>
2626
* This is the default TaskStore used when no other implementation is provided.
2727
* </p>
28+
*
29+
* <h2>Exception Behavior</h2>
30+
* InMemoryTaskStore has minimal exception scenarios compared to database-backed implementations:
31+
* <ul>
32+
* <li><b>No TaskSerializationException:</b> Task objects are stored directly in memory without
33+
* serialization. No JSON parsing or schema compatibility issues can occur.</li>
34+
* <li><b>No TaskPersistenceException:</b> ConcurrentHashMap operations do not involve I/O,
35+
* network, or transactional concerns. Standard put/get/remove operations are guaranteed
36+
* to succeed under normal JVM operation.</li>
37+
* <li><b>OutOfMemoryError (potential):</b> The only failure scenario is JVM heap exhaustion if
38+
* too many tasks are stored. This is an {@link Error} (not Exception) and indicates a fatal
39+
* system condition requiring JVM restart and capacity planning.</li>
40+
* </ul>
41+
*
42+
* <h3>Design Rationale</h3>
43+
* This implementation intentionally does NOT throw {@link TaskStoreException} or its subclasses
44+
* because:
45+
* <ul>
46+
* <li>No serialization step exists - tasks stored as Java objects</li>
47+
* <li>No I/O or network operations that can fail</li>
48+
* <li>ConcurrentHashMap guarantees thread-safe operations without checked exceptions</li>
49+
* <li>Memory exhaustion (OutOfMemoryError) is an unrecoverable system failure</li>
50+
* </ul>
51+
*
52+
* <h3>Comparison to Database Implementations</h3>
53+
* Database-backed implementations (e.g., JpaDatabaseTaskStore) throw exceptions for:
54+
* <ul>
55+
* <li>Serialization errors (JSON parsing, schema mismatches)</li>
56+
* <li>Connection failures (network, timeouts)</li>
57+
* <li>Transaction failures (deadlocks, constraint violations)</li>
58+
* <li>Capacity issues (disk full, quota exceeded)</li>
59+
* </ul>
60+
* InMemoryTaskStore avoids all of these by operating entirely in-process.
61+
*
62+
* <h3>Memory Management Considerations</h3>
63+
* Callers should monitor memory usage and implement task cleanup policies:
64+
* <pre>{@code
65+
* // Example: Delete finalized tasks older than 48 hours
66+
* ListTasksParams params = new ListTasksParams.Builder()
67+
* .statusTimestampBefore(Instant.now().minus(Duration.ofHours(48)))
68+
* .build();
69+
*
70+
* List<Task> oldTasks = taskStore.list(params).tasks();
71+
* oldTasks.stream()
72+
* .filter(task -> task.status().state().isFinal())
73+
* .forEach(task -> taskStore.delete(task.id()));
74+
* }</pre>
75+
*
76+
* <h3>Thread Safety</h3>
77+
* All operations are thread-safe via {@link ConcurrentHashMap}. Multiple threads can
78+
* concurrently save, get, list, and delete tasks without synchronization. Last-write-wins
79+
* semantics apply for concurrent {@code save()} calls to the same task ID.
80+
*
81+
* @see TaskStore for interface contract and exception documentation
82+
* @see TaskStoreException for exception hierarchy (not thrown by this implementation)
83+
* @see TaskStateProvider for queue lifecycle integration
2884
*/
2985
@ApplicationScoped
3086
public class InMemoryTaskStore implements TaskStore, TaskStateProvider {

server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
* Implementations should handle errors gracefully:
6464
* <ul>
6565
* <li>Log failures but don't throw exceptions (notifications are best-effort)</li>
66-
* <li>Consider retry logic for transient failures</li>
6766
* <li>Don't block on network I/O - execute asynchronously if needed</li>
6867
* <li>Circuit breaker patterns for repeatedly failing endpoints</li>
6968
* </ul>
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.a2a.server.tasks;
2+
3+
import org.jspecify.annotations.Nullable;
4+
5+
/**
6+
* Exception for database/storage system failures during task persistence operations.
7+
* <p>
8+
* Indicates failures in the underlying storage system (database, filesystem, etc.) rather
9+
* than data format issues.
10+
*
11+
* <h2>Common Scenarios</h2>
12+
* <ul>
13+
* <li>Database connection timeout or network partition</li>
14+
* <li>Transaction deadlock or lock wait timeout</li>
15+
* <li>Connection pool exhausted</li>
16+
* <li>Disk full / storage quota exceeded</li>
17+
* <li>Database constraint violations (unique key, foreign key)</li>
18+
* <li>Insufficient permissions or authentication failures</li>
19+
* <li>Database schema incompatibilities</li>
20+
* </ul>
21+
*
22+
* <h2>Usage Example</h2>
23+
* <pre>{@code
24+
* try {
25+
* em.merge(jpaTask);
26+
* } catch (PersistenceException e) {
27+
* throw new TaskPersistenceException(taskId, "Database save failed", e);
28+
* }
29+
* }</pre>
30+
*
31+
* @see TaskStoreException
32+
* @see TaskSerializationException for data format errors
33+
*/
34+
public class TaskPersistenceException extends TaskStoreException {
35+
36+
/**
37+
* Creates a new TaskPersistenceException with no message or cause.
38+
*/
39+
public TaskPersistenceException() {
40+
super();
41+
}
42+
43+
/**
44+
* Creates a new TaskPersistenceException with the specified message.
45+
*
46+
* @param msg the exception message
47+
*/
48+
public TaskPersistenceException(final String msg) {
49+
super(msg);
50+
}
51+
52+
/**
53+
* Creates a new TaskPersistenceException with the specified cause.
54+
*
55+
* @param cause the underlying cause
56+
*/
57+
public TaskPersistenceException(final Throwable cause) {
58+
super(cause);
59+
}
60+
61+
/**
62+
* Creates a new TaskPersistenceException with the specified message and cause.
63+
*
64+
* @param msg the exception message
65+
* @param cause the underlying cause
66+
*/
67+
public TaskPersistenceException(final String msg, final Throwable cause) {
68+
super(msg, cause);
69+
}
70+
71+
/**
72+
* Creates a new TaskPersistenceException with the specified task ID and message.
73+
*
74+
* @param taskId the task identifier (may be null for operations not tied to a specific task)
75+
* @param msg the exception message
76+
*/
77+
public TaskPersistenceException(@Nullable final String taskId, final String msg) {
78+
super(taskId, msg);
79+
}
80+
81+
/**
82+
* Creates a new TaskPersistenceException with the specified task ID, message, and cause.
83+
*
84+
* @param taskId the task identifier (may be null for operations not tied to a specific task)
85+
* @param msg the exception message
86+
* @param cause the underlying cause
87+
*/
88+
public TaskPersistenceException(@Nullable final String taskId, final String msg, final Throwable cause) {
89+
super(taskId, msg, cause);
90+
}
91+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package io.a2a.server.tasks;
2+
3+
import org.jspecify.annotations.Nullable;
4+
5+
/**
6+
* Exception for task serialization/deserialization failures.
7+
* <p>
8+
* Indicates failures converting between Task domain objects and persistent storage format (JSON).
9+
*
10+
* <h2>Common Scenarios</h2>
11+
* <ul>
12+
* <li>JSON parsing errors during {@code get()} operations</li>
13+
* <li>JSON serialization errors during {@code save()} operations</li>
14+
* <li>Invalid enum values or missing required fields</li>
15+
* <li>Data format version mismatches after upgrades</li>
16+
* </ul>
17+
*
18+
* <h2>Usage Example</h2>
19+
* <pre>{@code
20+
* try {
21+
* Task task = jsonMapper.readValue(json, Task.class);
22+
* } catch (JsonProcessingException e) {
23+
* throw new TaskSerializationException(taskId, "Failed to deserialize task", e);
24+
* }
25+
* }</pre>
26+
*
27+
* @see TaskStoreException
28+
* @see TaskPersistenceException for database failures
29+
*/
30+
public class TaskSerializationException extends TaskStoreException {
31+
32+
/**
33+
* Creates a new TaskSerializationException with no message or cause.
34+
*/
35+
public TaskSerializationException() {
36+
super();
37+
}
38+
39+
/**
40+
* Creates a new TaskSerializationException with the specified message.
41+
*
42+
* @param msg the exception message
43+
*/
44+
public TaskSerializationException(final String msg) {
45+
super(msg);
46+
}
47+
48+
/**
49+
* Creates a new TaskSerializationException with the specified cause.
50+
*
51+
* @param cause the underlying cause
52+
*/
53+
public TaskSerializationException(final Throwable cause) {
54+
super(cause);
55+
}
56+
57+
/**
58+
* Creates a new TaskSerializationException with the specified message and cause.
59+
*
60+
* @param msg the exception message
61+
* @param cause the underlying cause
62+
*/
63+
public TaskSerializationException(final String msg, final Throwable cause) {
64+
super(msg, cause);
65+
}
66+
67+
/**
68+
* Creates a new TaskSerializationException with the specified task ID and message.
69+
*
70+
* @param taskId the task identifier (may be null for operations not tied to a specific task)
71+
* @param msg the exception message
72+
*/
73+
public TaskSerializationException(@Nullable final String taskId, final String msg) {
74+
super(taskId, msg);
75+
}
76+
77+
/**
78+
* Creates a new TaskSerializationException with the specified task ID, message, and cause.
79+
*
80+
* @param taskId the task identifier (may be null for operations not tied to a specific task)
81+
* @param msg the exception message
82+
* @param cause the underlying cause
83+
*/
84+
public TaskSerializationException(@Nullable final String taskId, final String msg, final Throwable cause) {
85+
super(taskId, msg, cause);
86+
}
87+
}

0 commit comments

Comments
 (0)