diff --git a/docs/architecture.md b/docs/architecture.md index d6a3180..8744898 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -176,7 +176,6 @@ classDiagram } class InvokeConfig~P,R~ { - +int timeout_seconds +SerDes~P~ serdes_payload +SerDes~R~ serdes_result } diff --git a/docs/core/invoke.md b/docs/core/invoke.md index a6bac65..0b75444 100644 --- a/docs/core/invoke.md +++ b/docs/core/invoke.md @@ -28,8 +28,6 @@ **Payload** - The input data sent to the invoked function. Can be any JSON-serializable value or use custom serialization. -**Timeout** - The maximum time to wait for an invoked function to complete. If exceeded, the invoke operation fails with a timeout error. - [↑ Back to top](#table-of-contents) ## What are invoke operations? @@ -59,7 +57,6 @@ When you invoke a function, the SDK: - **Asynchronous execution** - Invoked functions run independently without blocking resources - **Result handling** - Results are automatically deserialized and returned - **Error propagation** - Errors from invoked functions propagate to the caller -- **Timeout support** - Configure maximum wait time for invoked functions - **Custom serialization** - Control how payloads and results are serialized - **Named operations** - Identify invoke operations by name for debugging @@ -131,7 +128,7 @@ def invoke( - `function_name` - The name of the Lambda function to invoke. This should be the function name, not the ARN. - `payload` - The input data to send to the invoked function. Can be any JSON-serializable value. - `name` (optional) - A name for the invoke operation, useful for debugging and testing. -- `config` (optional) - An `InvokeConfig` object to configure timeout and serialization. +- `config` (optional) - An `InvokeConfig` object to configure serialization and tenant isolation. **Returns:** The result returned by the invoked function. @@ -291,52 +288,33 @@ from aws_durable_execution_sdk_python import ( DurableContext, durable_execution, ) -from aws_durable_execution_sdk_python.config import Duration, InvokeConfig +from aws_durable_execution_sdk_python.config import InvokeConfig @durable_execution def handler(event: dict, context: DurableContext) -> dict: - # Configure invoke with timeout invoke_config = InvokeConfig( - timeout=Duration.from_minutes(5), + serdes_payload=my_payload_serdes, + serdes_result=my_result_serdes, ) - + result = context.invoke( - function_name="long-running-function", + function_name="my-function", payload=event, - name="long_running", + name="my_invoke", config=invoke_config, ) - + return result ``` ### InvokeConfig parameters -**timeout** - Maximum duration to wait for the invoked function to complete. Default is no timeout. Use this to prevent long-running invocations from blocking execution indefinitely. - **serdes_payload** - Custom serialization/deserialization for the payload sent to the invoked function. If None, uses default JSON serialization. **serdes_result** - Custom serialization/deserialization for the result returned from the invoked function. If None, uses default JSON serialization. **tenant_id** - Optional tenant identifier for multi-tenant isolation. If provided, the invocation will be scoped to this tenant. -### Setting timeouts - -Use the `Duration` class to set timeouts: - -```python -from aws_durable_execution_sdk_python.config import Duration, InvokeConfig - -# Timeout after 30 seconds -config = InvokeConfig(timeout=Duration.from_seconds(30)) - -# Timeout after 5 minutes -config = InvokeConfig(timeout=Duration.from_minutes(5)) - -# Timeout after 2 hours -config = InvokeConfig(timeout=Duration.from_hours(2)) -``` - [↑ Back to top](#table-of-contents) ## Error handling @@ -372,33 +350,6 @@ def handler(event: dict, context: DurableContext) -> dict: } ``` -### Timeout handling - -Handle timeout errors specifically: - -```python -from aws_durable_execution_sdk_python.config import Duration, InvokeConfig - -@durable_execution -def handler(event: dict, context: DurableContext) -> dict: - """Handle timeout errors.""" - config = InvokeConfig(timeout=Duration.from_seconds(30)) - - try: - result = context.invoke( - function_name="slow-function", - payload=event, - config=config, - ) - return {"status": "success", "result": result} - - except CallableRuntimeError as e: - if "timed out" in str(e).lower(): - context.logger.warning("Function timed out, using fallback") - return {"status": "timeout", "fallback": True} - raise -``` - ### Retry patterns Implement retry logic for failed invocations: @@ -551,8 +502,6 @@ def handler(event: dict, context: DurableContext) -> dict: **Name invoke operations** - Use the `name` parameter to identify invoke operations in logs and tests. -**Set appropriate timeouts** - Configure timeouts based on expected execution time. Don't set them too short or too long. - **Handle errors explicitly** - Catch and handle errors from invoked functions. Don't let them propagate unexpectedly. **Keep payloads small** - Large payloads increase serialization overhead. Consider passing references instead of large data. @@ -603,10 +552,6 @@ A: Yes, you can invoke the same function multiple times with different payloads A: The `function_name` parameter accepts function names in the same account. For cross-account invocations, you need appropriate IAM permissions and may need to use function ARNs (check AWS documentation for cross-account Lambda invocations). -**Q: What's the maximum timeout I can set?** - -A: The timeout is limited by Lambda's maximum execution time (15 minutes). However, durable functions can run longer by suspending and resuming. - **Q: Can I invoke functions in parallel?** A: Not directly with `context.invoke()`. For parallel execution, consider using `context.parallel()` with steps that perform invocations, or invoke multiple functions sequentially. @@ -615,10 +560,6 @@ A: Not directly with `context.invoke()`. For parallel execution, consider using A: Use the `name` parameter to identify operations in logs. Check CloudWatch logs for both the calling and invoked functions. -**Q: What happens if I don't set a timeout?** - -A: The invoke operation waits indefinitely for the invoked function to complete. It's recommended to set timeouts for better error handling. - **Q: What's the difference between context.invoke() and using boto3's Lambda client to invoke functions?** A: When you use `context.invoke()`, the SDK suspends your durable function's execution while waiting for the result. This means you don't pay for Lambda compute time while waiting. With boto3's Lambda client, your function stays active and consumes billable compute time while waiting for the response. Additionally, `context.invoke()` automatically checkpoints the operation, handles errors durably, and integrates with the durable execution lifecycle. @@ -703,27 +644,6 @@ def test_invoke_error_handling(durable_runner): assert "error" in result.result ``` -### Testing timeouts - -Test that timeouts are handled correctly: - -```python -from aws_durable_execution_sdk_python.config import Duration, InvokeConfig - -@pytest.mark.durable_execution( - handler=handler_with_timeout, - lambda_function_name="timeout_function", -) -def test_invoke_timeout(durable_runner): - """Test invoke timeout handling.""" - with durable_runner: - result = durable_runner.run(input={}, timeout=60) - - # Check that timeout was handled - assert result.status is InvocationStatus.SUCCEEDED - assert result.result["status"] == "timeout" -``` - ### Mocking invoked functions When testing, you can mock the invoked functions to control their behavior: diff --git a/src/aws_durable_execution_sdk_python/config.py b/src/aws_durable_execution_sdk_python/config.py index 548b6c1..1329599 100644 --- a/src/aws_durable_execution_sdk_python/config.py +++ b/src/aws_durable_execution_sdk_python/config.py @@ -384,13 +384,9 @@ class InvokeConfig(Generic[P, R]): Configuration for invoke operations. This class configures how function invocations are executed, including - timeout behavior, serialization, and tenant isolation. + serialization and tenant isolation. Args: - timeout: Maximum duration to wait for the invoked function to complete. - Default is no timeout. Use this to prevent long-running invocations - from blocking execution indefinitely. - serdes_payload: Custom serialization/deserialization for the payload sent to the invoked function. Defaults to DEFAULT_JSON_SERDES when not set. @@ -404,16 +400,10 @@ class InvokeConfig(Generic[P, R]): """ # retry_strategy: Callable[[Exception, int], RetryDecision] | None = None - timeout: Duration = field(default_factory=Duration) serdes_payload: SerDes[P] | None = None serdes_result: SerDes[R] | None = None tenant_id: str | None = None - @property - def timeout_seconds(self) -> int: - """Get timeout in seconds.""" - return self.timeout.to_seconds() - @dataclass(frozen=True) class CallbackConfig: diff --git a/src/aws_durable_execution_sdk_python/operation/invoke.py b/src/aws_durable_execution_sdk_python/operation/invoke.py index 9288c98..6d18134 100644 --- a/src/aws_durable_execution_sdk_python/operation/invoke.py +++ b/src/aws_durable_execution_sdk_python/operation/invoke.py @@ -166,7 +166,7 @@ def execute(self, _checkpointed_result: CheckpointedResult) -> R: ExecutionError: If suspend doesn't raise (should never happen) """ msg: str = f"Invoke {self.operation_identifier.operation_id} started, suspending for completion" - suspend_with_optional_resume_delay(msg, self.config.timeout_seconds) + suspend_with_optional_resume_delay(msg) # This line should never be reached since suspend_with_optional_resume_delay always raises error_msg: str = "suspend_with_optional_resume_delay should have raised an exception, but did not." raise ExecutionError(error_msg) from None diff --git a/tests/config_test.py b/tests/config_test.py index 24edf6d..a56aa8f 100644 --- a/tests/config_test.py +++ b/tests/config_test.py @@ -282,7 +282,6 @@ def test_invoke_config_defaults(): """Test InvokeConfig defaults.""" config = InvokeConfig() assert config.tenant_id is None - assert config.timeout_seconds == 0 def test_invoke_config_with_tenant_id(): diff --git a/tests/context_test.py b/tests/context_test.py index 507cfc5..7cfb102 100644 --- a/tests/context_test.py +++ b/tests/context_test.py @@ -616,7 +616,7 @@ def test_invoke_with_name_and_config(mock_executor_class): mock_state.durable_execution_arn = ( "arn:aws:durable:us-east-1:123456789012:execution/test" ) - config = InvokeConfig[str, str](timeout=Duration.from_seconds(30)) + config = InvokeConfig[str, str]() context = create_test_context(state=mock_state) [context._create_step_id() for _ in range(5)] # Set counter to 5 # noqa: SLF001 @@ -756,7 +756,6 @@ def test_invoke_with_custom_serdes(mock_executor_class): config = InvokeConfig[dict, dict]( serdes_payload=payload_serdes, serdes_result=result_serdes, - timeout=Duration.from_minutes(1), ) context = create_test_context(state=mock_state) diff --git a/tests/operation/invoke_test.py b/tests/operation/invoke_test.py index 5bb98da..7267e09 100644 --- a/tests/operation/invoke_test.py +++ b/tests/operation/invoke_test.py @@ -7,7 +7,7 @@ import pytest -from aws_durable_execution_sdk_python.config import Duration, InvokeConfig +from aws_durable_execution_sdk_python.config import InvokeConfig from aws_durable_execution_sdk_python.exceptions import ( CallableRuntimeError, ExecutionError, @@ -205,8 +205,8 @@ def test_invoke_handler_already_started(status): @pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING]) -def test_invoke_handler_already_started_with_timeout(status): - """Test invoke_handler when operation is already started with timeout config.""" +def test_invoke_handler_already_started_suspends(status): + """Test invoke_handler when operation is already started suspends indefinitely.""" mock_state = Mock(spec=ExecutionState) mock_state.durable_execution_arn = "test_arn" @@ -219,9 +219,9 @@ def test_invoke_handler_already_started_with_timeout(status): mock_result = CheckpointedResult.create_from_operation(operation) mock_state.get_checkpoint_result.return_value = mock_result - config = InvokeConfig[str, str](timeout=Duration.from_seconds(30)) + config = InvokeConfig[str, str]() - with pytest.raises(TimedSuspendExecution): + with pytest.raises(SuspendExecution): invoke_handler( function_name="test_function", payload="test_input", @@ -246,7 +246,7 @@ def test_invoke_handler_new_operation(): started = CheckpointedResult.create_from_operation(started_op) mock_state.get_checkpoint_result.side_effect = [not_found, started] - config = InvokeConfig[str, str](timeout=Duration.from_minutes(1)) + config = InvokeConfig[str, str]() with pytest.raises( SuspendExecution, match="Invoke invoke8 started, suspending for completion" @@ -271,58 +271,6 @@ def test_invoke_handler_new_operation(): assert operation_update.chained_invoke_options.function_name == "test_function" -def test_invoke_handler_new_operation_with_timeout(): - """Test invoke_handler when starting a new operation with timeout.""" - mock_state = Mock(spec=ExecutionState) - mock_state.durable_execution_arn = "test_arn" - - not_found = CheckpointedResult.create_not_found() - started_op = Operation( - operation_id="invoke_test", - operation_type=OperationType.CHAINED_INVOKE, - status=OperationStatus.STARTED, - ) - started = CheckpointedResult.create_from_operation(started_op) - mock_state.get_checkpoint_result.side_effect = [not_found, started] - - config = InvokeConfig[str, str](timeout=Duration.from_seconds(30)) - - with pytest.raises(TimedSuspendExecution): - invoke_handler( - function_name="test_function", - payload="test_input", - state=mock_state, - operation_identifier=OperationIdentifier("invoke9", None, "test_invoke"), - config=config, - ) - - -def test_invoke_handler_new_operation_no_timeout(): - """Test invoke_handler when starting a new operation without timeout.""" - mock_state = Mock(spec=ExecutionState) - mock_state.durable_execution_arn = "test_arn" - - not_found = CheckpointedResult.create_not_found() - started_op = Operation( - operation_id="invoke_test", - operation_type=OperationType.CHAINED_INVOKE, - status=OperationStatus.STARTED, - ) - started = CheckpointedResult.create_from_operation(started_op) - mock_state.get_checkpoint_result.side_effect = [not_found, started] - - config = InvokeConfig[str, str](timeout=Duration.from_seconds(0)) - - with pytest.raises(SuspendExecution): - invoke_handler( - function_name="test_function", - payload="test_input", - state=mock_state, - operation_identifier=OperationIdentifier("invoke10", None, "test_invoke"), - config=config, - ) - - def test_invoke_handler_no_config(): """Test invoke_handler when no config is provided.""" mock_state = Mock(spec=ExecutionState) @@ -1008,8 +956,8 @@ def test_invoke_immediate_response_already_completed(): assert mock_state.get_checkpoint_result.call_count == 1 -def test_invoke_immediate_response_with_timeout_immediate_success(): - """Test immediate success with timeout configuration.""" +def test_invoke_immediate_response_immediate_success(): + """Test immediate success response.""" mock_state = Mock(spec=ExecutionState) mock_state.durable_execution_arn = "test_arn" @@ -1020,13 +968,13 @@ def test_invoke_immediate_response_with_timeout_immediate_success(): operation_type=OperationType.CHAINED_INVOKE, status=OperationStatus.SUCCEEDED, chained_invoke_details=ChainedInvokeDetails( - result=json.dumps("timeout_result") + result=json.dumps("immediate_result") ), ) succeeded = CheckpointedResult.create_from_operation(succeeded_op) mock_state.get_checkpoint_result.side_effect = [not_found, succeeded] - config = InvokeConfig[str, str](timeout=Duration.from_seconds(30)) + config = InvokeConfig[str, str]() result = invoke_handler( function_name="test_function", @@ -1039,15 +987,12 @@ def test_invoke_immediate_response_with_timeout_immediate_success(): ) # Verify result was returned without suspend - assert result == "timeout_result" + assert result == "immediate_result" assert mock_state.get_checkpoint_result.call_count == 2 -def test_invoke_immediate_response_with_timeout_no_immediate_response(): - """Test no immediate response with timeout configuration. - - When no immediate response, operation should suspend with timeout. - """ +def test_invoke_immediate_response_no_immediate_response(): + """Test no immediate response — operation suspends indefinitely.""" mock_state = Mock(spec=ExecutionState) mock_state.durable_execution_arn = "test_arn" @@ -1061,10 +1006,9 @@ def test_invoke_immediate_response_with_timeout_no_immediate_response(): started = CheckpointedResult.create_from_operation(started_op) mock_state.get_checkpoint_result.side_effect = [not_found, started] - config = InvokeConfig[str, str](timeout=Duration.from_seconds(30)) + config = InvokeConfig[str, str]() - # Verify operation suspends with timeout - with pytest.raises(TimedSuspendExecution): + with pytest.raises(SuspendExecution): invoke_handler( function_name="test_function", payload="test_input",