-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathinvoke.py
More file actions
119 lines (101 loc) · 4.38 KB
/
invoke.py
File metadata and controls
119 lines (101 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Implement the Durable invoke operation."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, TypeVar
from aws_durable_execution_sdk_python.config import InvokeConfig
from aws_durable_execution_sdk_python.exceptions import ExecutionError
from aws_durable_execution_sdk_python.lambda_service import (
ChainedInvokeOptions,
OperationUpdate,
)
from aws_durable_execution_sdk_python.serdes import (
DEFAULT_JSON_SERDES,
deserialize,
serialize,
)
from aws_durable_execution_sdk_python.suspend import suspend_with_optional_resume_delay
if TYPE_CHECKING:
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
from aws_durable_execution_sdk_python.state import ExecutionState
P = TypeVar("P") # Payload type
R = TypeVar("R") # Result type
logger = logging.getLogger(__name__)
def invoke_handler(
function_name: str,
payload: P,
state: ExecutionState,
operation_identifier: OperationIdentifier,
config: InvokeConfig[P, R] | None,
) -> R:
"""Invoke another Durable Function."""
logger.debug(
"🔗 Invoke %s (%s)",
operation_identifier.name or function_name,
operation_identifier.operation_id,
)
if not config:
config = InvokeConfig[P, R]()
tenant_id = config.tenant_id
# Check if we have existing step data
checkpointed_result = state.get_checkpoint_result(operation_identifier.operation_id)
if checkpointed_result.is_succeeded():
# Return persisted result - no need to check for errors in successful operations
if (
checkpointed_result.operation
and checkpointed_result.operation.chained_invoke_details
and checkpointed_result.operation.chained_invoke_details.result
):
return deserialize(
serdes=config.serdes_result or DEFAULT_JSON_SERDES,
data=checkpointed_result.operation.chained_invoke_details.result,
operation_id=operation_identifier.operation_id,
durable_execution_arn=state.durable_execution_arn,
)
return None # type: ignore
if (
checkpointed_result.is_failed()
or checkpointed_result.is_timed_out()
or checkpointed_result.is_stopped()
):
# Operation failed, throw the exact same error on replay as the checkpointed failure
checkpointed_result.raise_callable_error()
if checkpointed_result.is_started():
# Operation is still running, suspend until completion
logger.debug(
"⏳ Invoke %s still in progress, suspending",
operation_identifier.name or function_name,
)
msg = f"Invoke {operation_identifier.operation_id} still in progress"
suspend_with_optional_resume_delay(msg, config.timeout_seconds)
serialized_payload: str = serialize(
serdes=config.serdes_payload or DEFAULT_JSON_SERDES,
value=payload,
operation_id=operation_identifier.operation_id,
durable_execution_arn=state.durable_execution_arn,
)
# the backend will do the invoke once it gets this checkpoint
start_operation: OperationUpdate = OperationUpdate.create_invoke_start(
identifier=operation_identifier,
payload=serialized_payload,
chained_invoke_options=ChainedInvokeOptions(
function_name=function_name,
tenant_id=tenant_id,
),
)
# Checkpoint invoke START with blocking (is_sync=True, default).
# Must ensure the chained invocation is recorded before suspending execution.
# This guarantees the invoke operation is durable and will be tracked by the backend.
state.create_checkpoint(operation_update=start_operation)
logger.debug(
"🚀 Invoke %s started, suspending for async execution",
operation_identifier.name or function_name,
)
# Suspend so invoke executes asynchronously without consuming cpu here
msg = (
f"Invoke {operation_identifier.operation_id} started, suspending for completion"
)
suspend_with_optional_resume_delay(msg, config.timeout_seconds)
# This line should never be reached since suspend_with_optional_resume_delay always raises
# if it is ever reached, we will crash in a non-retryable manner via ExecutionError
msg = "suspend_with_optional_resume_delay should have raised an exception, but did not."
raise ExecutionError(msg) from None