2323from aws_durable_execution_sdk_python_testing .execution import Execution
2424from aws_durable_execution_sdk_python_testing .model import (
2525 CheckpointDurableExecutionResponse ,
26+ CheckpointUpdatedExecutionState ,
2627 GetDurableExecutionHistoryResponse ,
2728 GetDurableExecutionResponse ,
2829 GetDurableExecutionStateResponse ,
4748if TYPE_CHECKING :
4849 from collections .abc import Awaitable , Callable
4950
51+ from aws_durable_execution_sdk_python_testing .checkpoint .processor import (
52+ CheckpointProcessor ,
53+ )
5054 from aws_durable_execution_sdk_python_testing .invoker import Invoker
5155 from aws_durable_execution_sdk_python_testing .scheduler import Event , Scheduler
5256 from aws_durable_execution_sdk_python_testing .stores .base import ExecutionStore
@@ -58,10 +62,17 @@ class Executor(ExecutionObserver):
5862 MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5
5963 RETRY_BACKOFF_SECONDS = 5
6064
61- def __init__ (self , store : ExecutionStore , scheduler : Scheduler , invoker : Invoker ):
65+ def __init__ (
66+ self ,
67+ store : ExecutionStore ,
68+ scheduler : Scheduler ,
69+ invoker : Invoker ,
70+ checkpoint_processor : CheckpointProcessor ,
71+ ):
6272 self ._store = store
6373 self ._scheduler = scheduler
6474 self ._invoker = invoker
75+ self ._checkpoint_processor = checkpoint_processor
6576 self ._completion_events : dict [str , Event ] = {}
6677
6778 def start_execution (
@@ -464,8 +475,8 @@ def checkpoint_execution(
464475 self ,
465476 execution_arn : str ,
466477 checkpoint_token : str ,
467- updates : list [OperationUpdate ] | None = None , # noqa: ARG002
468- client_token : str | None = None , # noqa: ARG002
478+ updates : list [OperationUpdate ] | None = None ,
479+ client_token : str | None = None ,
469480 ) -> CheckpointDurableExecutionResponse :
470481 """Process checkpoint for an execution.
471482
@@ -489,19 +500,33 @@ def checkpoint_execution(
489500 msg : str = f"Invalid checkpoint token: { checkpoint_token } "
490501 raise InvalidParameterValueException (msg )
491502
492- # TODO: Process operation updates using the checkpoint processor
493- # This would integrate with the existing checkpoint processing pipeline
503+ # Process operation updates using the checkpoint processor
504+ if updates :
505+ checkpoint_output = self ._checkpoint_processor .process_checkpoint (
506+ checkpoint_token = checkpoint_token ,
507+ updates = updates ,
508+ client_token = client_token ,
509+ )
494510
495- # Generate new checkpoint token
496- new_checkpoint_token = execution .get_new_checkpoint_token ()
511+ # Convert SDK CheckpointUpdatedExecutionState to testing library version
512+ new_execution_state = None
513+ if checkpoint_output .new_execution_state :
514+ new_execution_state = CheckpointUpdatedExecutionState (
515+ operations = checkpoint_output .new_execution_state .operations ,
516+ next_marker = checkpoint_output .new_execution_state .next_marker ,
517+ )
518+
519+ return CheckpointDurableExecutionResponse (
520+ checkpoint_token = checkpoint_output .checkpoint_token ,
521+ new_execution_state = new_execution_state ,
522+ )
497523
498- # Get current execution state - for now return None (simplified implementation)
499- # In a full implementation, this would return CheckpointUpdatedExecutionState with operations
500- new_execution_state = None
524+ # Generate new checkpoint token for case with no updates
525+ new_checkpoint_token = execution .get_new_checkpoint_token ()
501526
502527 return CheckpointDurableExecutionResponse (
503528 checkpoint_token = new_checkpoint_token ,
504- new_execution_state = new_execution_state ,
529+ new_execution_state = None ,
505530 )
506531
507532 def send_callback_success (
0 commit comments