Skip to content

Commit 984b357

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent 48eae39 commit 984b357

4 files changed

Lines changed: 990 additions & 45 deletions

File tree

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
"""Event conversion utilities for durable execution history.
2+
3+
This module provides functions to convert operation objects into history events
4+
that can be consumed by the execution history API.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from datetime import UTC, datetime
10+
from typing import TYPE_CHECKING
11+
12+
from aws_durable_execution_sdk_python.execution import InvocationStatus
13+
from aws_durable_execution_sdk_python.lambda_service import (
14+
OperationStatus,
15+
OperationType,
16+
)
17+
18+
from aws_durable_execution_sdk_python_testing.model import Event as HistoryEvent
19+
20+
21+
if TYPE_CHECKING:
22+
from aws_durable_execution_sdk_python.lambda_service import Operation
23+
24+
from aws_durable_execution_sdk_python_testing.execution import Execution
25+
26+
27+
def _iso(ts, *, default_now: bool = False) -> str:
28+
"""Convert timestamp to ISO format string.
29+
30+
Args:
31+
ts: Timestamp to convert
32+
default_now: If True, use current time when ts is None
33+
34+
Returns:
35+
ISO format timestamp string
36+
37+
Raises:
38+
ValueError: If ts is None and default_now is False
39+
"""
40+
if ts is None:
41+
if default_now:
42+
return datetime.now(UTC).isoformat()
43+
msg = "timestamp is required"
44+
raise ValueError(msg)
45+
if ts.tzinfo is None:
46+
ts = ts.replace(tzinfo=UTC)
47+
return ts.isoformat()
48+
49+
50+
def _payload_envelope(payload, include: bool): # noqa: FBT001
51+
"""Create payload envelope for event data.
52+
53+
Args:
54+
payload: Payload data to wrap
55+
include: Whether to include the actual payload data
56+
57+
Returns:
58+
Payload envelope dict or None if payload is None
59+
"""
60+
if payload is None:
61+
return None
62+
return {"Payload": payload if include else None, "Truncated": False}
63+
64+
65+
def _error_envelope(err):
66+
"""Create error envelope for event data.
67+
68+
Args:
69+
err: Error object to wrap
70+
71+
Returns:
72+
Error envelope dict or None if err is None
73+
"""
74+
if err is None:
75+
return None
76+
return {"Payload": err.to_dict() if hasattr(err, "to_dict") else err, "Truncated": False}
77+
78+
79+
def _wait_seconds_from(op: Operation) -> int | None:
80+
"""Extract wait duration from operation.
81+
82+
Args:
83+
op: Operation to extract wait duration from
84+
85+
Returns:
86+
Wait duration in seconds or None if not available
87+
"""
88+
if hasattr(op, "wait_options") and op.wait_options:
89+
v = op.wait_options.get("wait_seconds")
90+
if v is not None:
91+
return int(v)
92+
if op.wait_details and hasattr(op.wait_details, "wait_seconds"):
93+
v = op.wait_details.wait_seconds
94+
if v is not None:
95+
return int(v)
96+
return None
97+
98+
99+
def _scheduled_end_iso(op: Operation) -> str | None:
100+
"""Extract scheduled end timestamp from operation.
101+
102+
Args:
103+
op: Operation to extract timestamp from
104+
105+
Returns:
106+
ISO format timestamp string or None if not available
107+
"""
108+
wd = op.wait_details
109+
ts = getattr(wd, "scheduled_timestamp", None) if wd else None
110+
return _iso(ts) if ts else None
111+
112+
113+
def _with_event_id(d: dict, eid: int) -> dict:
114+
"""Add event ID to event dictionary.
115+
116+
Args:
117+
d: Event dictionary
118+
eid: Event ID to add
119+
120+
Returns:
121+
Updated dictionary with event ID
122+
"""
123+
out = dict(d)
124+
out["EventId"] = eid
125+
return out
126+
127+
128+
def operation_to_started_event(operation: Operation, execution=None, include_execution_data: bool = False) -> HistoryEvent: # noqa: FBT001, FBT002
129+
"""Convert operation to started event.
130+
131+
Args:
132+
operation: Operation to convert
133+
execution: Optional execution context
134+
include_execution_data: Whether to include execution data in the event
135+
136+
Returns:
137+
History event for operation start
138+
139+
Raises:
140+
ValueError: If operation type is unknown
141+
"""
142+
base = {
143+
"EventTimestamp": _iso(operation.start_timestamp, default_now=True),
144+
"Id": operation.operation_id,
145+
"Name": operation.name,
146+
"ParentId": operation.parent_id,
147+
}
148+
149+
match operation.operation_type:
150+
case OperationType.EXECUTION:
151+
input_payload = operation.execution_details.input_payload if operation.execution_details else None
152+
details = {
153+
"Input": _payload_envelope(input_payload, include_execution_data),
154+
"ExecutionTimeout": (
155+
getattr(getattr(execution, "start_input", None), "execution_timeout_seconds", None)
156+
if execution else None
157+
),
158+
}
159+
return HistoryEvent.from_dict({
160+
**base,
161+
"EventType": "ExecutionStarted",
162+
"ExecutionStartedDetails": details,
163+
})
164+
165+
case OperationType.STEP:
166+
return HistoryEvent.from_dict({
167+
**base,
168+
"EventType": "StepStarted",
169+
"StepStartedDetails": {"Present": True},
170+
})
171+
172+
case OperationType.WAIT:
173+
details = {
174+
"Duration": _wait_seconds_from(operation),
175+
"ScheduledEndTimestamp": _scheduled_end_iso(operation),
176+
}
177+
return HistoryEvent.from_dict({
178+
**base,
179+
"EventType": "WaitStarted",
180+
"WaitStartedDetails": details,
181+
})
182+
183+
case OperationType.CALLBACK:
184+
cb = getattr(operation, "callback_details", None)
185+
details = {
186+
"CallbackId": getattr(cb, "callback_id", None),
187+
}
188+
return HistoryEvent.from_dict({
189+
**base,
190+
"EventType": "CallbackStarted",
191+
"CallbackStartedDetails": details,
192+
})
193+
194+
case OperationType.INVOKE:
195+
return HistoryEvent.from_dict({
196+
**base,
197+
"EventType": "InvokeStarted",
198+
"InvokeStartedDetails": {"Input": {"Payload": None, "Truncated": False}},
199+
})
200+
201+
case _:
202+
msg = f"Unknown operation type: {operation.operation_type}"
203+
raise ValueError(msg)
204+
205+
206+
def operation_to_finished_event(operation: Operation, execution=None, include_execution_data: bool = False) -> HistoryEvent | None: # noqa: FBT001, FBT002
207+
"""Convert operation to finished event.
208+
209+
Args:
210+
operation: Operation to convert
211+
execution: Optional execution context
212+
include_execution_data: Whether to include execution data in the event
213+
214+
Returns:
215+
History event for operation completion or None if operation not finished
216+
"""
217+
if not operation.end_timestamp:
218+
return None
219+
220+
base = {
221+
"EventTimestamp": _iso(operation.end_timestamp),
222+
"Id": operation.operation_id,
223+
"Name": operation.name,
224+
"ParentId": operation.parent_id,
225+
}
226+
227+
match operation.operation_type:
228+
case OperationType.EXECUTION if execution and getattr(execution, "is_complete", False) and getattr(execution, "result", None):
229+
res = execution.result
230+
if res.status == InvocationStatus.SUCCEEDED:
231+
details = {
232+
"Result": _payload_envelope(getattr(res, "result", None), include_execution_data)
233+
}
234+
return HistoryEvent.from_dict({
235+
**base, "EventType": "ExecutionSucceeded", "ExecutionSucceededDetails": details
236+
})
237+
details = {
238+
"Error": _error_envelope(getattr(res, "error", None))
239+
}
240+
return HistoryEvent.from_dict({
241+
**base, "EventType": "ExecutionFailed", "ExecutionFailedDetails": details
242+
})
243+
244+
case OperationType.WAIT if operation.status == OperationStatus.SUCCEEDED:
245+
details = {"Duration": _wait_seconds_from(operation)}
246+
return HistoryEvent.from_dict({
247+
**base, "EventType": "WaitSucceeded", "WaitSucceededDetails": details
248+
})
249+
250+
case OperationType.STEP:
251+
sd = operation.step_details
252+
if operation.status == OperationStatus.SUCCEEDED:
253+
result = getattr(sd, "result", None) if sd else None
254+
details = {"Result": _payload_envelope(result, include_execution_data)} if result is not None else {"Result": None}
255+
return HistoryEvent.from_dict({
256+
**base, "EventType": "StepSucceeded", "StepSucceededDetails": details
257+
})
258+
if operation.status == OperationStatus.FAILED:
259+
err = getattr(sd, "error", None) if sd else None
260+
details = {"Error": _error_envelope(err)} if err is not None else {"Error": None}
261+
return HistoryEvent.from_dict({
262+
**base, "EventType": "StepFailed", "StepFailedDetails": details
263+
})
264+
return None
265+
266+
case OperationType.CALLBACK:
267+
return None
268+
case OperationType.INVOKE:
269+
return None
270+
271+
case _:
272+
return None
273+
274+
275+
def generate_execution_events(
276+
execution: Execution,
277+
include_execution_data: bool = False, # noqa: FBT001, FBT002
278+
) -> list[HistoryEvent]:
279+
"""Generate history events for an execution.
280+
281+
Walk all operations on an execution and emit start + (if present) finish events.
282+
EventIds are strictly increasing in emission order.
283+
284+
Args:
285+
execution: Execution to generate events for
286+
include_execution_data: Whether to include execution data in events
287+
288+
Returns:
289+
List of history events in chronological order
290+
"""
291+
events: list[HistoryEvent] = []
292+
eid = 1
293+
for op in getattr(execution, "operations", []):
294+
started = operation_to_started_event(op, execution, include_execution_data)
295+
started = HistoryEvent.from_dict(_with_event_id(started.to_dict(), eid))
296+
events.append(started)
297+
eid += 1
298+
299+
finished = operation_to_finished_event(op, execution, include_execution_data)
300+
if finished is not None:
301+
finished = HistoryEvent.from_dict(_with_event_id(finished.to_dict(), eid))
302+
events.append(finished)
303+
eid += 1
304+
return events

0 commit comments

Comments
 (0)