-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcallback_failure.py
More file actions
35 lines (30 loc) · 1.28 KB
/
callback_failure.py
File metadata and controls
35 lines (30 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""Demonstrates callback failure scenarios where the error propagates and is handled by framework."""
from typing import Any
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
@durable_execution
def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating callback failure scenarios."""
should_catch_error = event.get("shouldCatchError", False)
callback_config = CallbackConfig(timeout=Duration.from_seconds(60))
if should_catch_error:
# Pattern where error is caught and returned in result
try:
callback = context.create_callback(
name="failing-operation",
config=callback_config,
)
return callback.result()
except Exception as error:
return {
"success": False,
"error": str(error),
}
else:
# Pattern where error propagates to framework (for basic failure case)
callback = context.create_callback(
name="failing-operation",
config=callback_config,
)
return callback.result()