Skip to content

Commit d6e073c

Browse files
author
Rares Polenciuc
committed
feat: add execution history event generation and pagination
- Add event_conversion module to transform operations into history events - Implement operation_to_started_event and operation_to_finished_event - Add generate_execution_events for complete execution history - Update executor get_execution_history with proper cursor-based pagination - Support reverse_order and include_execution_data options - Add comprehensive test coverage for event conversion and pagination
1 parent 48eae39 commit d6e073c

4 files changed

Lines changed: 1031 additions & 19 deletions

File tree

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
"""Event conversion utilities for durable execution history.
2+
3+
This module provides functions to convert operation objects into history events
4+
that can be consumed by the execution history API.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from datetime import UTC, datetime
10+
from typing import TYPE_CHECKING
11+
12+
from aws_durable_execution_sdk_python.execution import InvocationStatus
13+
from aws_durable_execution_sdk_python.lambda_service import (
14+
OperationStatus,
15+
OperationType,
16+
)
17+
18+
from aws_durable_execution_sdk_python_testing.model import Event as HistoryEvent
19+
20+
21+
if TYPE_CHECKING:
22+
from aws_durable_execution_sdk_python.lambda_service import Operation
23+
24+
from aws_durable_execution_sdk_python_testing.execution import Execution
25+
26+
27+
def _iso(ts, *, default_now: bool = False) -> str:
28+
"""Convert timestamp to ISO format string.
29+
30+
Args:
31+
ts: Timestamp to convert
32+
default_now: If True, use current time when ts is None
33+
34+
Returns:
35+
ISO format timestamp string
36+
37+
Raises:
38+
ValueError: If ts is None and default_now is False
39+
"""
40+
if ts is None:
41+
if default_now:
42+
return datetime.now(UTC).isoformat()
43+
msg = "timestamp is required"
44+
raise ValueError(msg)
45+
if ts.tzinfo is None:
46+
ts = ts.replace(tzinfo=UTC)
47+
return ts.isoformat()
48+
49+
50+
def _payload_envelope(payload, include: bool): # noqa: FBT001
51+
"""Create payload envelope for event data.
52+
53+
Args:
54+
payload: Payload data to wrap
55+
include: Whether to include the actual payload data
56+
57+
Returns:
58+
Payload envelope dict or None if payload is None
59+
"""
60+
if payload is None:
61+
return None
62+
return {"Payload": payload if include else None, "Truncated": False}
63+
64+
65+
def _error_envelope(err):
66+
"""Create error envelope for event data.
67+
68+
Args:
69+
err: Error object to wrap
70+
71+
Returns:
72+
Error envelope dict or None if err is None
73+
"""
74+
if err is None:
75+
return None
76+
return {
77+
"Payload": err.to_dict() if hasattr(err, "to_dict") else err,
78+
"Truncated": False,
79+
}
80+
81+
82+
def _wait_seconds_from(op: Operation) -> int | None:
83+
"""Extract wait duration from operation.
84+
85+
Args:
86+
op: Operation to extract wait duration from
87+
88+
Returns:
89+
Wait duration in seconds or None if not available
90+
"""
91+
if hasattr(op, "wait_options") and op.wait_options:
92+
v = op.wait_options.get("wait_seconds")
93+
if v is not None:
94+
return int(v)
95+
if op.wait_details and hasattr(op.wait_details, "wait_seconds"):
96+
v = op.wait_details.wait_seconds
97+
if v is not None:
98+
return int(v)
99+
return None
100+
101+
102+
def _scheduled_end_iso(op: Operation) -> str | None:
103+
"""Extract scheduled end timestamp from operation.
104+
105+
Args:
106+
op: Operation to extract timestamp from
107+
108+
Returns:
109+
ISO format timestamp string or None if not available
110+
"""
111+
wd = op.wait_details
112+
ts = getattr(wd, "scheduled_timestamp", None) if wd else None
113+
return _iso(ts) if ts else None
114+
115+
116+
def _with_event_id(d: dict, eid: int) -> dict:
117+
"""Add event ID to event dictionary.
118+
119+
Args:
120+
d: Event dictionary
121+
eid: Event ID to add
122+
123+
Returns:
124+
Updated dictionary with event ID
125+
"""
126+
out = dict(d)
127+
out["EventId"] = eid
128+
return out
129+
130+
131+
def operation_to_started_event(
132+
operation: Operation,
133+
execution=None,
134+
include_execution_data: bool = False, # noqa: FBT001, FBT002
135+
) -> HistoryEvent:
136+
"""Convert operation to started event.
137+
138+
Args:
139+
operation: Operation to convert
140+
execution: Optional execution context
141+
include_execution_data: Whether to include execution data in the event
142+
143+
Returns:
144+
History event for operation start
145+
146+
Raises:
147+
ValueError: If operation type is unknown
148+
"""
149+
base = {
150+
"EventTimestamp": _iso(operation.start_timestamp, default_now=True),
151+
"Id": operation.operation_id,
152+
"Name": operation.name,
153+
"ParentId": operation.parent_id,
154+
}
155+
156+
match operation.operation_type:
157+
case OperationType.EXECUTION:
158+
input_payload = (
159+
operation.execution_details.input_payload
160+
if operation.execution_details
161+
else None
162+
)
163+
details = {
164+
"Input": _payload_envelope(input_payload, include_execution_data),
165+
"ExecutionTimeout": (
166+
getattr(
167+
getattr(execution, "start_input", None),
168+
"execution_timeout_seconds",
169+
None,
170+
)
171+
if execution
172+
else None
173+
),
174+
}
175+
return HistoryEvent.from_dict(
176+
{
177+
**base,
178+
"EventType": "ExecutionStarted",
179+
"ExecutionStartedDetails": details,
180+
}
181+
)
182+
183+
case OperationType.STEP:
184+
return HistoryEvent.from_dict(
185+
{
186+
**base,
187+
"EventType": "StepStarted",
188+
"StepStartedDetails": {"Present": True},
189+
}
190+
)
191+
192+
case OperationType.WAIT:
193+
details = {
194+
"Duration": _wait_seconds_from(operation),
195+
"ScheduledEndTimestamp": _scheduled_end_iso(operation),
196+
}
197+
return HistoryEvent.from_dict(
198+
{
199+
**base,
200+
"EventType": "WaitStarted",
201+
"WaitStartedDetails": details,
202+
}
203+
)
204+
205+
case OperationType.CALLBACK:
206+
cb = getattr(operation, "callback_details", None)
207+
details = {
208+
"CallbackId": getattr(cb, "callback_id", None),
209+
}
210+
return HistoryEvent.from_dict(
211+
{
212+
**base,
213+
"EventType": "CallbackStarted",
214+
"CallbackStartedDetails": details,
215+
}
216+
)
217+
218+
case OperationType.INVOKE:
219+
return HistoryEvent.from_dict(
220+
{
221+
**base,
222+
"EventType": "InvokeStarted",
223+
"InvokeStartedDetails": {
224+
"Input": {"Payload": None, "Truncated": False}
225+
},
226+
}
227+
)
228+
229+
case _:
230+
msg = f"Unknown operation type: {operation.operation_type}"
231+
raise ValueError(msg)
232+
233+
234+
def operation_to_finished_event(
235+
operation: Operation,
236+
execution=None,
237+
include_execution_data: bool = False, # noqa: FBT001, FBT002
238+
) -> HistoryEvent | None:
239+
"""Convert operation to finished event.
240+
241+
Args:
242+
operation: Operation to convert
243+
execution: Optional execution context
244+
include_execution_data: Whether to include execution data in the event
245+
246+
Returns:
247+
History event for operation completion or None if operation not finished
248+
"""
249+
if not operation.end_timestamp:
250+
return None
251+
252+
base = {
253+
"EventTimestamp": _iso(operation.end_timestamp),
254+
"Id": operation.operation_id,
255+
"Name": operation.name,
256+
"ParentId": operation.parent_id,
257+
}
258+
259+
match operation.operation_type:
260+
case OperationType.EXECUTION if (
261+
execution
262+
and getattr(execution, "is_complete", False)
263+
and getattr(execution, "result", None)
264+
):
265+
res = execution.result
266+
if res.status == InvocationStatus.SUCCEEDED:
267+
details = {
268+
"Result": _payload_envelope(
269+
getattr(res, "result", None), include_execution_data
270+
)
271+
}
272+
return HistoryEvent.from_dict(
273+
{
274+
**base,
275+
"EventType": "ExecutionSucceeded",
276+
"ExecutionSucceededDetails": details,
277+
}
278+
)
279+
details = {"Error": _error_envelope(getattr(res, "error", None))}
280+
return HistoryEvent.from_dict(
281+
{
282+
**base,
283+
"EventType": "ExecutionFailed",
284+
"ExecutionFailedDetails": details,
285+
}
286+
)
287+
288+
case OperationType.WAIT if operation.status == OperationStatus.SUCCEEDED:
289+
details = {"Duration": _wait_seconds_from(operation)}
290+
return HistoryEvent.from_dict(
291+
{**base, "EventType": "WaitSucceeded", "WaitSucceededDetails": details}
292+
)
293+
294+
case OperationType.STEP:
295+
sd = operation.step_details
296+
if operation.status == OperationStatus.SUCCEEDED:
297+
result = getattr(sd, "result", None) if sd else None
298+
details = (
299+
{"Result": _payload_envelope(result, include_execution_data)}
300+
if result is not None
301+
else {"Result": None}
302+
)
303+
return HistoryEvent.from_dict(
304+
{
305+
**base,
306+
"EventType": "StepSucceeded",
307+
"StepSucceededDetails": details,
308+
}
309+
)
310+
if operation.status == OperationStatus.FAILED:
311+
err = getattr(sd, "error", None) if sd else None
312+
details = (
313+
{"Error": _error_envelope(err)}
314+
if err is not None
315+
else {"Error": None}
316+
)
317+
return HistoryEvent.from_dict(
318+
{**base, "EventType": "StepFailed", "StepFailedDetails": details}
319+
)
320+
return None
321+
322+
case OperationType.CALLBACK:
323+
return None
324+
case OperationType.INVOKE:
325+
return None
326+
327+
case _:
328+
return None
329+
330+
331+
def generate_execution_events(
332+
execution: Execution,
333+
include_execution_data: bool = False, # noqa: FBT001, FBT002
334+
) -> list[HistoryEvent]:
335+
"""Generate history events for an execution.
336+
337+
Walk all operations on an execution and emit start + (if present) finish events.
338+
EventIds are strictly increasing in emission order.
339+
340+
Args:
341+
execution: Execution to generate events for
342+
include_execution_data: Whether to include execution data in events
343+
344+
Returns:
345+
List of history events in chronological order
346+
"""
347+
events: list[HistoryEvent] = []
348+
eid = 1
349+
for op in getattr(execution, "operations", []):
350+
started = operation_to_started_event(op, execution, include_execution_data)
351+
started = HistoryEvent.from_dict(_with_event_id(started.to_dict(), eid))
352+
events.append(started)
353+
eid += 1
354+
355+
finished = operation_to_finished_event(op, execution, include_execution_data)
356+
if finished is not None:
357+
finished = HistoryEvent.from_dict(_with_event_id(finished.to_dict(), eid))
358+
events.append(finished)
359+
eid += 1
360+
return events

0 commit comments

Comments
 (0)