-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathobserver.py
More file actions
144 lines (117 loc) · 4.82 KB
/
observer.py
File metadata and controls
144 lines (117 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Checkpoint processors can notify the Execution of notable event state changes. Observer pattern."""
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from aws_durable_execution_sdk_python_testing.token import CallbackToken
if TYPE_CHECKING:
from collections.abc import Callable
from aws_durable_execution_sdk_python.lambda_service import (
ErrorObject,
CallbackOptions,
)
class ExecutionObserver(ABC):
"""Observer for execution lifecycle events."""
@abstractmethod
def on_completed(self, execution_arn: str, result: str | None = None) -> None:
"""Called when execution completes successfully."""
@abstractmethod
def on_failed(self, execution_arn: str, error: ErrorObject) -> None:
"""Called when execution fails."""
@abstractmethod
def on_timed_out(self, execution_arn: str, error: ErrorObject) -> None:
"""Called when execution times out."""
@abstractmethod
def on_stopped(self, execution_arn: str, error: ErrorObject) -> None:
"""Called when execution is stopped."""
@abstractmethod
def on_wait_timer_scheduled(
self, execution_arn: str, operation_id: str, delay: float
) -> None:
"""Called when wait timer scheduled."""
@abstractmethod
def on_step_retry_scheduled(
self, execution_arn: str, operation_id: str, delay: float
) -> None:
"""Called when step retry scheduled."""
@abstractmethod
def on_callback_created(
self,
execution_arn: str,
operation_id: str,
callback_options: CallbackOptions | None,
callback_token: CallbackToken,
) -> None:
"""Called when callback is created."""
class ExecutionNotifier:
"""Notifies observers about execution events. Thread-safe."""
def __init__(self) -> None:
self._observers: list[ExecutionObserver] = []
self._lock = threading.RLock()
def add_observer(self, observer: ExecutionObserver) -> None:
"""Add an observer to be notified of execution events."""
with self._lock:
self._observers.append(observer)
def _notify_observers(self, method: Callable, *args, **kwargs) -> None:
"""Notify all observers by calling the specified method."""
with self._lock:
observers = self._observers.copy()
for observer in observers:
getattr(observer, method.__name__)(*args, **kwargs)
# region event emitters
def notify_completed(self, execution_arn: str, result: str | None = None) -> None:
"""Notify observers about execution completion."""
self._notify_observers(
ExecutionObserver.on_completed, execution_arn=execution_arn, result=result
)
def notify_failed(self, execution_arn: str, error: ErrorObject) -> None:
"""Notify observers about execution failure."""
self._notify_observers(
ExecutionObserver.on_failed, execution_arn=execution_arn, error=error
)
def notify_timed_out(self, execution_arn: str, error: ErrorObject) -> None:
"""Notify observers about execution timeout."""
self._notify_observers(
ExecutionObserver.on_timed_out, execution_arn=execution_arn, error=error
)
def notify_stopped(self, execution_arn: str, error: ErrorObject) -> None:
"""Notify observers about execution being stopped."""
self._notify_observers(
ExecutionObserver.on_stopped, execution_arn=execution_arn, error=error
)
def notify_wait_timer_scheduled(
self, execution_arn: str, operation_id: str, delay: float
) -> None:
"""Notify observers about wait timer scheduling."""
self._notify_observers(
ExecutionObserver.on_wait_timer_scheduled,
execution_arn=execution_arn,
operation_id=operation_id,
delay=delay,
)
def notify_step_retry_scheduled(
self, execution_arn: str, operation_id: str, delay: float
) -> None:
"""Notify observers about step retry scheduling."""
self._notify_observers(
ExecutionObserver.on_step_retry_scheduled,
execution_arn=execution_arn,
operation_id=operation_id,
delay=delay,
)
def notify_callback_created(
self,
execution_arn: str,
operation_id: str,
callback_options: CallbackOptions | None,
callback_token: CallbackToken,
) -> None:
"""Notify observers about callback creation."""
self._notify_observers(
ExecutionObserver.on_callback_created,
execution_arn=execution_arn,
operation_id=operation_id,
callback_options=callback_options,
callback_token=callback_token,
)
# endregion event emitters