-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtransformer.py
More file actions
104 lines (86 loc) · 3.65 KB
/
transformer.py
File metadata and controls
104 lines (86 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Operation transformer for converting OperationUpdates to Operations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from aws_durable_execution_sdk_python.lambda_service import (
Operation,
OperationType,
OperationUpdate,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.callback import (
CallbackProcessor,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.context import (
ContextProcessor,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.execution import (
ExecutionProcessor,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.step import (
StepProcessor,
)
from aws_durable_execution_sdk_python_testing.checkpoint.processors.wait import (
WaitProcessor,
)
from aws_durable_execution_sdk_python_testing.exceptions import (
InvalidParameterValueException,
)
if TYPE_CHECKING:
from collections.abc import MutableMapping
from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import (
OperationProcessor,
)
from typing import ClassVar
class OperationTransformer:
"""Transforms OperationUpdates to Operations while maintaining order and triggering scheduler actions."""
_DEFAULT_PROCESSORS: ClassVar[dict[OperationType, OperationProcessor]] = {
OperationType.STEP: StepProcessor(),
OperationType.WAIT: WaitProcessor(),
OperationType.CONTEXT: ContextProcessor(),
OperationType.CALLBACK: CallbackProcessor(),
OperationType.EXECUTION: ExecutionProcessor(),
}
def __init__(
self,
processors: MutableMapping[OperationType, OperationProcessor] | None = None,
):
self.processors = processors or self._DEFAULT_PROCESSORS
def process_updates(
self,
updates: list[OperationUpdate],
current_operations: list[Operation],
notifier,
execution_arn: str,
) -> tuple[list[Operation], list[OperationUpdate]]:
"""Transform updates maintaining operation order and return (operations, updates)."""
op_map = {op.operation_id: op for op in current_operations}
# Start with copy of current operations list
result_operations = current_operations.copy()
for update in updates:
processor = self.processors.get(update.operation_type)
if processor:
current_op = op_map.get(update.operation_id)
updated_op = processor.process(
update=update,
current_op=current_op,
notifier=notifier,
execution_arn=execution_arn,
)
if updated_op is not None:
if update.operation_id in op_map:
# Update existing operation in-place
for i, op in enumerate(result_operations): # pragma: no branch
# no branch coverage because result_operation empty not reachable here
if op.operation_id == update.operation_id:
result_operations[i] = updated_op
break
else:
# Append new operation to end
result_operations.append(updated_op)
# Update map for future lookups
op_map[update.operation_id] = updated_op
else:
msg: str = (
f"Checkpoint for {update.operation_type} is not implemented yet."
)
raise InvalidParameterValueException(msg)
return result_operations, updates