-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_wait.py
More file actions
27 lines (21 loc) · 892 Bytes
/
test_wait.py
File metadata and controls
27 lines (21 loc) · 892 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"""Tests for wait example."""
import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from src.wait import wait
from test.conftest import deserialize_operation_payload
@pytest.mark.example
@pytest.mark.durable_execution(
handler=wait.handler,
lambda_function_name="Wait State",
)
def test_wait(durable_runner):
"""Test wait example."""
with durable_runner:
result = durable_runner.run(input="test", timeout=10, skip_time=True)
assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == "Wait completed"
# Find the wait operation (it should be the only non-execution operation)
wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"]
assert len(wait_ops) == 1
wait_op = wait_ops[0]
assert wait_op.scheduled_end_timestamp is not None