Skip to content

Commit 3e2da0f

Browse files
committed
feat: implement filesystem store
1 parent a5f594a commit 3e2da0f

14 files changed

Lines changed: 595 additions & 72 deletions

File tree

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
if TYPE_CHECKING:
2828
from aws_durable_execution_sdk_python_testing.execution import Execution
2929
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
30-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
30+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
3131

3232

3333
class CheckpointProcessor:

src/aws_durable_execution_sdk_python_testing/cli.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class CliConfig:
5050
local_runner_region: str = "us-west-2"
5151
local_runner_mode: str = "local"
5252

53+
# Store configuration
54+
store_type: str = "memory"
55+
store_path: str | None = None
56+
5357
@classmethod
5458
def from_environment(cls) -> CliConfig:
5559
"""Create configuration from environment variables with defaults."""
@@ -65,6 +69,8 @@ def from_environment(cls) -> CliConfig:
6569
),
6670
local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"),
6771
local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"),
72+
store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"),
73+
store_path=os.getenv("AWS_DEX_STORE_PATH"),
6874
)
6975

7076

@@ -175,6 +181,17 @@ def _create_start_server_parser(self, subparsers) -> None:
175181
default=self.config.local_runner_mode,
176182
help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)",
177183
)
184+
start_server_parser.add_argument(
185+
"--store-type",
186+
choices=["memory", "filesystem"],
187+
default=self.config.store_type,
188+
help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)",
189+
)
190+
start_server_parser.add_argument(
191+
"--store-path",
192+
default=self.config.store_path,
193+
help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)",
194+
)
178195
start_server_parser.set_defaults(func=self.start_server_command)
179196

180197
def _create_invoke_parser(self, subparsers) -> None:
@@ -245,6 +262,8 @@ def start_server_command(self, args: argparse.Namespace) -> int:
245262
local_runner_endpoint=args.local_runner_endpoint,
246263
local_runner_region=args.local_runner_region,
247264
local_runner_mode=args.local_runner_mode,
265+
store_type=args.store_type,
266+
store_path=args.store_path,
248267
)
249268

250269
logger.info(
@@ -260,6 +279,10 @@ def start_server_command(self, args: argparse.Namespace) -> int:
260279
logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint)
261280
logger.info(" Local Runner Region: %s", args.local_runner_region)
262281
logger.info(" Local Runner Mode: %s", args.local_runner_mode)
282+
logger.info(" Store Type: %s", args.store_type)
283+
if args.store_type == "filesystem":
284+
store_path = args.store_path or ".durable_executions"
285+
logger.info(" Store Path: %s", store_path)
263286

264287
# Use runner as context manager for proper lifecycle
265288
with WebRunner(runner_config) as runner:

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,27 @@
33
import json
44
from dataclasses import replace
55
from datetime import UTC, datetime
6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77
from uuid import uuid4
88

99
from aws_durable_execution_sdk_python.execution import (
1010
DurableExecutionInvocationOutput,
1111
InvocationStatus,
1212
)
1313
from aws_durable_execution_sdk_python.lambda_service import (
14+
CallbackOptions,
15+
ContextOptions,
1416
ErrorObject,
1517
ExecutionDetails,
18+
InvokeOptions,
1619
Operation,
20+
OperationAction,
1721
OperationStatus,
22+
OperationSubType,
1823
OperationType,
1924
OperationUpdate,
25+
StepOptions,
26+
WaitOptions,
2027
)
2128

2229
# Import AWS exceptions
@@ -27,6 +34,57 @@
2734
from aws_durable_execution_sdk_python_testing.token import CheckpointToken
2835

2936

37+
def _operation_update_from_dict(data: dict[str, Any]) -> OperationUpdate:
38+
"""Create OperationUpdate from dictionary data."""
39+
error = ErrorObject.from_dict(data["Error"]) if data.get("Error") else None
40+
41+
context_options = None
42+
if context_data := data.get("ContextOptions"):
43+
context_options = ContextOptions(
44+
replay_children=context_data.get("ReplayChildren", False)
45+
)
46+
47+
step_options = None
48+
if step_data := data.get("StepOptions"):
49+
step_options = StepOptions(
50+
next_attempt_delay_seconds=step_data.get("NextAttemptDelaySeconds")
51+
)
52+
53+
wait_options = None
54+
if wait_data := data.get("WaitOptions"):
55+
wait_options = WaitOptions.from_dict(wait_data)
56+
57+
callback_options = None
58+
if callback_data := data.get("CallbackOptions"):
59+
callback_options = CallbackOptions(
60+
timeout_seconds=callback_data.get("TimeoutSeconds"),
61+
heartbeat_timeout_seconds=callback_data.get("HeartbeatTimeoutSeconds"),
62+
)
63+
64+
invoke_options = None
65+
if invoke_data := data.get("InvokeOptions"):
66+
invoke_options = InvokeOptions(
67+
function_name=invoke_data.get("FunctionName", ""),
68+
timeout_seconds=invoke_data.get("TimeoutSeconds"),
69+
)
70+
71+
return OperationUpdate(
72+
operation_id=data["Id"],
73+
operation_type=OperationType(data["Type"]),
74+
action=OperationAction(data["Action"]),
75+
parent_id=data.get("ParentId"),
76+
name=data.get("Name"),
77+
sub_type=OperationSubType(data["SubType"]) if data.get("SubType") else None,
78+
payload=data.get("Payload"),
79+
error=error,
80+
context_options=context_options,
81+
step_options=step_options,
82+
wait_options=wait_options,
83+
callback_options=callback_options,
84+
invoke_options=invoke_options,
85+
)
86+
87+
3088
if TYPE_CHECKING:
3189
from aws_durable_execution_sdk_python_testing.model import (
3290
StartDurableExecutionInput,
@@ -51,7 +109,7 @@ def __init__(
51109
# TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store
52110
self.token_sequence: int = 0
53111
self.is_complete: bool = False
54-
self.result: DurableExecutionInvocationOutput | None
112+
self.result: DurableExecutionInvocationOutput | None = None
55113
self.consecutive_failed_invocation_attempts: int = 0
56114

57115
@staticmethod
@@ -63,6 +121,58 @@ def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
63121
durable_execution_arn=str(uuid4()), start_input=input, operations=[]
64122
)
65123

124+
def to_dict(self) -> dict[str, Any]:
125+
"""Serialize execution to dictionary."""
126+
return {
127+
"DurableExecutionArn": self.durable_execution_arn,
128+
"StartInput": self.start_input.to_dict(),
129+
"Operations": [op.to_dict() for op in self.operations],
130+
"Updates": [update.to_dict() for update in self.updates],
131+
"UsedTokens": list(self.used_tokens),
132+
"TokenSequence": self.token_sequence,
133+
"IsComplete": self.is_complete,
134+
"Result": self.result.to_dict() if self.result else None,
135+
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
136+
}
137+
138+
@classmethod
139+
def from_dict(cls, data: dict[str, Any]) -> Execution:
140+
"""Deserialize execution from dictionary."""
141+
from aws_durable_execution_sdk_python_testing.model import (
142+
StartDurableExecutionInput,
143+
)
144+
145+
# Reconstruct start_input
146+
start_input = StartDurableExecutionInput.from_dict(data["StartInput"])
147+
148+
# Reconstruct operations
149+
operations = [Operation.from_dict(op_data) for op_data in data["Operations"]]
150+
151+
# Create execution
152+
execution = cls(
153+
durable_execution_arn=data["DurableExecutionArn"],
154+
start_input=start_input,
155+
operations=operations,
156+
)
157+
158+
# Set additional fields
159+
execution.updates = [
160+
_operation_update_from_dict(update_data) for update_data in data["Updates"]
161+
]
162+
execution.used_tokens = set(data["UsedTokens"])
163+
execution.token_sequence = data["TokenSequence"]
164+
execution.is_complete = data["IsComplete"]
165+
execution.result = (
166+
DurableExecutionInvocationOutput.from_dict(data["Result"])
167+
if data["Result"]
168+
else None
169+
)
170+
execution.consecutive_failed_invocation_attempts = data[
171+
"ConsecutiveFailedInvocationAttempts"
172+
]
173+
174+
return execution
175+
66176
def start(self) -> None:
67177
# not thread safe, prob should be
68178
if self.start_input.invocation_id is None:

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5050
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
5252

5353
logger = logging.getLogger(__name__)
5454

@@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
142142
durable_execution_name=execution.start_input.execution_name,
143143
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
144144
status=status,
145-
start_timestamp=execution_op.start_timestamp.isoformat()
145+
start_timestamp=execution_op.start_timestamp.timestamp()
146146
if execution_op.start_timestamp
147-
else datetime.now(UTC).isoformat(),
147+
else datetime.now(UTC).timestamp(),
148148
input_payload=execution_op.execution_details.input_payload
149149
if execution_op.execution_details
150150
else None,
151151
result=result,
152152
error=error,
153-
end_timestamp=execution_op.end_timestamp.isoformat()
153+
end_timestamp=execution_op.end_timestamp.timestamp()
154154
if execution_op.end_timestamp
155155
else None,
156156
version="1.0",
@@ -223,10 +223,10 @@ def list_executions(
223223
durable_execution_name=execution.start_input.execution_name,
224224
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225225
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.isoformat()
226+
start_timestamp=execution_op.start_timestamp.timestamp()
227227
if execution_op.start_timestamp
228-
else datetime.now(UTC).isoformat(),
229-
end_timestamp=execution_op.end_timestamp.isoformat()
228+
else datetime.now(UTC).timestamp(),
229+
end_timestamp=execution_op.end_timestamp.timestamp()
230230
if execution_op.end_timestamp
231231
else None,
232232
)
@@ -333,7 +333,7 @@ def stop_execution(
333333
# Stop the execution
334334
self.fail_execution(execution_arn, stop_error)
335335

336-
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat())
336+
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp())
337337

338338
def get_execution_state(
339339
self,

src/aws_durable_execution_sdk_python_testing/model.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ class GetDurableExecutionResponse:
131131
durable_execution_name: str
132132
function_arn: str
133133
status: str
134-
start_timestamp: str
134+
start_timestamp: float
135135
input_payload: str | None = None
136136
result: str | None = None
137137
error: ErrorObject | None = None
138-
end_timestamp: str | None = None
138+
end_timestamp: float | None = None
139139
version: str | None = None
140140

141141
@classmethod
@@ -188,8 +188,8 @@ class Execution:
188188
durable_execution_name: str
189189
function_arn: str
190190
status: str
191-
start_timestamp: str
192-
end_timestamp: str | None = None
191+
start_timestamp: float
192+
end_timestamp: float | None = None
193193

194194
@classmethod
195195
def from_dict(cls, data: dict) -> Execution:
@@ -325,7 +325,7 @@ def to_dict(self) -> dict[str, Any]:
325325
class StopDurableExecutionResponse:
326326
"""Response from stopping a durable execution."""
327327

328-
stop_date: str
328+
stop_date: float
329329

330330
@classmethod
331331
def from_dict(cls, data: dict) -> StopDurableExecutionResponse:

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@
4848
StartDurableExecutionOutput,
4949
)
5050
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import (
52+
ExecutionStore,
53+
FileSystemExecutionStore,
54+
InMemoryExecutionStore,
55+
)
5256
from aws_durable_execution_sdk_python_testing.web.server import WebServer
5357

5458

@@ -83,6 +87,10 @@ class WebRunnerConfig:
8387
local_runner_region: str = "us-west-2"
8488
local_runner_mode: str = "local"
8589

90+
# Store configuration
91+
store_type: str = "memory" # "memory" or "filesystem"
92+
store_path: str | None = None # Path for filesystem store
93+
8694

8795
@dataclass(frozen=True)
8896
class Operation:
@@ -543,7 +551,7 @@ def __init__(self, config: WebRunnerConfig) -> None:
543551
self._config = config
544552
self._server: WebServer | None = None
545553
self._scheduler: Scheduler | None = None
546-
self._store: InMemoryExecutionStore | None = None
554+
self._store: ExecutionStore | None = None
547555
self._invoker: LambdaInvoker | None = None
548556
self._executor: Executor | None = None
549557

@@ -581,7 +589,11 @@ def start(self) -> None:
581589
raise DurableFunctionsLocalRunnerError(msg)
582590

583591
# Create dependencies and server
584-
self._store = InMemoryExecutionStore()
592+
if self._config.store_type == "filesystem":
593+
store_path = self._config.store_path or ".durable_executions"
594+
self._store = FileSystemExecutionStore(store_path)
595+
else:
596+
self._store = InMemoryExecutionStore()
585597
self._scheduler = Scheduler()
586598
self._invoker = LambdaInvoker(self._create_boto3_client())
587599

0 commit comments

Comments
 (0)