Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ classDiagram
}

class InvokeConfig~P,R~ {
+int timeout_seconds
+SerDes~P~ serdes_payload
+SerDes~R~ serdes_result
}
Expand Down
96 changes: 8 additions & 88 deletions docs/core/invoke.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

**Payload** - The input data sent to the invoked function. Can be any JSON-serializable value or use custom serialization.

**Timeout** - The maximum time to wait for an invoked function to complete. If exceeded, the invoke operation fails with a timeout error.

[↑ Back to top](#table-of-contents)

## What are invoke operations?
Expand Down Expand Up @@ -59,7 +57,6 @@ When you invoke a function, the SDK:
- **Asynchronous execution** - Invoked functions run independently without blocking resources
- **Result handling** - Results are automatically deserialized and returned
- **Error propagation** - Errors from invoked functions propagate to the caller
- **Timeout support** - Configure maximum wait time for invoked functions
- **Custom serialization** - Control how payloads and results are serialized
- **Named operations** - Identify invoke operations by name for debugging

Expand Down Expand Up @@ -131,7 +128,7 @@ def invoke(
- `function_name` - The name of the Lambda function to invoke. This should be the function name, not the ARN.
- `payload` - The input data to send to the invoked function. Can be any JSON-serializable value.
- `name` (optional) - A name for the invoke operation, useful for debugging and testing.
- `config` (optional) - An `InvokeConfig` object to configure timeout and serialization.
- `config` (optional) - An `InvokeConfig` object to configure serialization and tenant isolation.

**Returns:** The result returned by the invoked function.

Expand Down Expand Up @@ -291,52 +288,33 @@ from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
)
from aws_durable_execution_sdk_python.config import Duration, InvokeConfig
from aws_durable_execution_sdk_python.config import InvokeConfig

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
# Configure invoke with timeout
invoke_config = InvokeConfig(
timeout=Duration.from_minutes(5),
serdes_payload=my_payload_serdes,
serdes_result=my_result_serdes,
)

result = context.invoke(
function_name="long-running-function",
function_name="my-function",
payload=event,
name="long_running",
name="my_invoke",
config=invoke_config,
)

return result
```

### InvokeConfig parameters

**timeout** - Maximum duration to wait for the invoked function to complete. Default is no timeout. Use this to prevent long-running invocations from blocking execution indefinitely.

**serdes_payload** - Custom serialization/deserialization for the payload sent to the invoked function. If None, uses default JSON serialization.

**serdes_result** - Custom serialization/deserialization for the result returned from the invoked function. If None, uses default JSON serialization.

**tenant_id** - Optional tenant identifier for multi-tenant isolation. If provided, the invocation will be scoped to this tenant.

### Setting timeouts

Use the `Duration` class to set timeouts:

```python
from aws_durable_execution_sdk_python.config import Duration, InvokeConfig

# Timeout after 30 seconds
config = InvokeConfig(timeout=Duration.from_seconds(30))

# Timeout after 5 minutes
config = InvokeConfig(timeout=Duration.from_minutes(5))

# Timeout after 2 hours
config = InvokeConfig(timeout=Duration.from_hours(2))
```

[↑ Back to top](#table-of-contents)

## Error handling
Expand Down Expand Up @@ -372,33 +350,6 @@ def handler(event: dict, context: DurableContext) -> dict:
}
```

### Timeout handling

Handle timeout errors specifically:

```python
from aws_durable_execution_sdk_python.config import Duration, InvokeConfig

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
"""Handle timeout errors."""
config = InvokeConfig(timeout=Duration.from_seconds(30))

try:
result = context.invoke(
function_name="slow-function",
payload=event,
config=config,
)
return {"status": "success", "result": result}

except CallableRuntimeError as e:
if "timed out" in str(e).lower():
context.logger.warning("Function timed out, using fallback")
return {"status": "timeout", "fallback": True}
raise
```

### Retry patterns

Implement retry logic for failed invocations:
Expand Down Expand Up @@ -551,8 +502,6 @@ def handler(event: dict, context: DurableContext) -> dict:

**Name invoke operations** - Use the `name` parameter to identify invoke operations in logs and tests.

**Set appropriate timeouts** - Configure timeouts based on expected execution time. Don't set them too short or too long.

**Handle errors explicitly** - Catch and handle errors from invoked functions. Don't let them propagate unexpectedly.

**Keep payloads small** - Large payloads increase serialization overhead. Consider passing references instead of large data.
Expand Down Expand Up @@ -603,10 +552,6 @@ A: Yes, you can invoke the same function multiple times with different payloads

A: The `function_name` parameter accepts function names in the same account. For cross-account invocations, you need appropriate IAM permissions and may need to use function ARNs (check AWS documentation for cross-account Lambda invocations).

**Q: What's the maximum timeout I can set?**

A: The timeout is limited by Lambda's maximum execution time (15 minutes). However, durable functions can run longer by suspending and resuming.

**Q: Can I invoke functions in parallel?**

A: Not directly with `context.invoke()`. For parallel execution, consider using `context.parallel()` with steps that perform invocations, or invoke multiple functions sequentially.
Expand All @@ -615,10 +560,6 @@ A: Not directly with `context.invoke()`. For parallel execution, consider using

A: Use the `name` parameter to identify operations in logs. Check CloudWatch logs for both the calling and invoked functions.

**Q: What happens if I don't set a timeout?**

A: The invoke operation waits indefinitely for the invoked function to complete. It's recommended to set timeouts for better error handling.

**Q: What's the difference between context.invoke() and using boto3's Lambda client to invoke functions?**

A: When you use `context.invoke()`, the SDK suspends your durable function's execution while waiting for the result. This means you don't pay for Lambda compute time while waiting. With boto3's Lambda client, your function stays active and consumes billable compute time while waiting for the response. Additionally, `context.invoke()` automatically checkpoints the operation, handles errors durably, and integrates with the durable execution lifecycle.
Expand Down Expand Up @@ -703,27 +644,6 @@ def test_invoke_error_handling(durable_runner):
assert "error" in result.result
```

### Testing timeouts

Test that timeouts are handled correctly:

```python
from aws_durable_execution_sdk_python.config import Duration, InvokeConfig

@pytest.mark.durable_execution(
handler=handler_with_timeout,
lambda_function_name="timeout_function",
)
def test_invoke_timeout(durable_runner):
"""Test invoke timeout handling."""
with durable_runner:
result = durable_runner.run(input={}, timeout=60)

# Check that timeout was handled
assert result.status is InvocationStatus.SUCCEEDED
assert result.result["status"] == "timeout"
```

### Mocking invoked functions

When testing, you can mock the invoked functions to control their behavior:
Expand Down
12 changes: 1 addition & 11 deletions src/aws_durable_execution_sdk_python/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,9 @@ class InvokeConfig(Generic[P, R]):
Configuration for invoke operations.

This class configures how function invocations are executed, including
timeout behavior, serialization, and tenant isolation.
serialization and tenant isolation.

Args:
timeout: Maximum duration to wait for the invoked function to complete.
Default is no timeout. Use this to prevent long-running invocations
from blocking execution indefinitely.

serdes_payload: Custom serialization/deserialization for the payload
sent to the invoked function. Defaults to DEFAULT_JSON_SERDES when
not set.
Expand All @@ -404,16 +400,10 @@ class InvokeConfig(Generic[P, R]):
"""

# retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
timeout: Duration = field(default_factory=Duration)
serdes_payload: SerDes[P] | None = None
serdes_result: SerDes[R] | None = None
tenant_id: str | None = None

@property
def timeout_seconds(self) -> int:
"""Get timeout in seconds."""
return self.timeout.to_seconds()


@dataclass(frozen=True)
class CallbackConfig:
Expand Down
2 changes: 1 addition & 1 deletion src/aws_durable_execution_sdk_python/operation/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def execute(self, _checkpointed_result: CheckpointedResult) -> R:
ExecutionError: If suspend doesn't raise (should never happen)
"""
msg: str = f"Invoke {self.operation_identifier.operation_id} started, suspending for completion"
suspend_with_optional_resume_delay(msg, self.config.timeout_seconds)
suspend_with_optional_resume_delay(msg)
# This line should never be reached since suspend_with_optional_resume_delay always raises
error_msg: str = "suspend_with_optional_resume_delay should have raised an exception, but did not."
raise ExecutionError(error_msg) from None
1 change: 0 additions & 1 deletion tests/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def test_invoke_config_defaults():
"""Test InvokeConfig defaults."""
config = InvokeConfig()
assert config.tenant_id is None
assert config.timeout_seconds == 0


def test_invoke_config_with_tenant_id():
Expand Down
3 changes: 1 addition & 2 deletions tests/context_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def test_invoke_with_name_and_config(mock_executor_class):
mock_state.durable_execution_arn = (
"arn:aws:durable:us-east-1:123456789012:execution/test"
)
config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))
config = InvokeConfig[str, str]()

context = create_test_context(state=mock_state)
[context._create_step_id() for _ in range(5)] # Set counter to 5 # noqa: SLF001
Expand Down Expand Up @@ -756,7 +756,6 @@ def test_invoke_with_custom_serdes(mock_executor_class):
config = InvokeConfig[dict, dict](
serdes_payload=payload_serdes,
serdes_result=result_serdes,
timeout=Duration.from_minutes(1),
)

context = create_test_context(state=mock_state)
Expand Down
Loading
Loading