-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathTaskOrchestrationExecutor.py
More file actions
352 lines (308 loc) · 15.4 KB
/
TaskOrchestrationExecutor.py
File metadata and controls
352 lines (308 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
from azure.durable_functions.models.Task import TaskBase, TaskState, AtomicTask, CompoundTask
from azure.durable_functions.models.OrchestratorState import OrchestratorState
from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext
from typing import Any, List, Optional, Union
from azure.durable_functions.models.history.HistoryEventType import HistoryEventType
from azure.durable_functions.models.history.HistoryEvent import HistoryEvent
from types import GeneratorType
import warnings
from collections import namedtuple
import json
from ..models.entities.ResponseMessage import ResponseMessage
from azure.functions._durable_functions import _deserialize_custom_object, _serialize_custom_object
class TaskOrchestrationExecutor:
"""Manages the execution and replay of user-defined orchestrations."""
def __init__(self):
"""Initialize TaskOrchestrationExecutor."""
# A mapping of event types to a tuple of
# (1) whether the event type represents a task success
# (2) the attribute in the corresponding event object that identifies the Task
# this mapping is used for processing events that transition a Task from its running state
# to a terminal one
SetTaskValuePayload = namedtuple("SetTaskValuePayload", ("is_success", "task_id_key"))
self.event_to_SetTaskValuePayload = dict([
(HistoryEventType.TASK_COMPLETED, SetTaskValuePayload(True, "TaskScheduledId")),
(HistoryEventType.TIMER_FIRED, SetTaskValuePayload(True, "TimerId")),
(HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED,
SetTaskValuePayload(True, "TaskScheduledId")),
(HistoryEventType.EVENT_RAISED, SetTaskValuePayload(True, "Name")),
(HistoryEventType.TASK_FAILED, SetTaskValuePayload(False, "TaskScheduledId")),
(HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED,
SetTaskValuePayload(False, "TaskScheduledId"))
])
self.task_completion_events = set(self.event_to_SetTaskValuePayload.keys())
self.initialize()
def initialize(self):
"""Initialize the TaskOrchestrationExecutor for a new orchestration invocation."""
# The first task is just a placeholder to kickstart the generator.
# So it's value is `None`.
self.current_task: TaskBase = AtomicTask(-1, [])
self.current_task.set_value(is_error=False, value=None)
self.output: Any = None
self.exception: Optional[Exception] = None
self.orchestrator_returned: bool = False
def execute(self, context: DurableOrchestrationContext,
history: List[HistoryEvent], fn) -> str:
"""Execute an orchestration via its history to evaluate Tasks and replay events.
Parameters
----------
context : DurableOrchestrationContext
The user's orchestration context, to interact with the user code.
history : List[HistoryEvent]
The orchestration history, to evaluate tasks and replay events.
fn : function
The user's orchestration function.
Returns
-------
str
A JSON-formatted string of the user's orchestration state, payload for the extension.
"""
self.context = context
evaluated_user_code = fn(context)
# The minimum History size is 2, in the shape: [OrchestratorStarted, ExecutionStarted].
# At the start of replay, the `is_replaying` flag is determined from the
# ExecutionStarted event.
# For some reason, OrchestratorStarted does not update its `isPlayed` field.
if len(history) < 2:
err_message = "Internal Durable Functions error: "\
+ f"received History array of size {len(history)} "\
+ "when a minimum size of 2 is expected. "\
+ "Please report this issue at "\
+ "https://github.com/Azure/azure-functions-durable-python/issues."
raise Exception(err_message)
# Set initial is_replaing state.
execution_started_event = history[1]
self.current_task.is_played = execution_started_event.is_played
# If user code is a generator, then it uses `yield` statements (the DF API)
# and so we iterate through the DF history, generating tasks and populating
# them with values when the history provides them
if isinstance(evaluated_user_code, GeneratorType):
self.generator = evaluated_user_code
for event in history:
self.process_event(event)
if self.has_execution_completed:
break
# Due to backwards compatibility reasons, it's possible
# for the `continue_as_new` API to be called without `yield` statements.
# Therefore, we explicitely check if `continue_as_new` was used before
# flatting the orchestration as returned/completed
elif not self.context.will_continue_as_new:
self.orchestrator_returned = True
self.output = evaluated_user_code
return self.get_orchestrator_state_str()
def process_event(self, event: HistoryEvent):
"""Evaluate a history event.
This might result in updating some orchestration internal state deterministically,
to evaluating some Task, or have no side-effects.
Parameters
----------
event : HistoryEvent
The history event to process
"""
event_type = event.event_type
if event_type == HistoryEventType.ORCHESTRATOR_STARTED:
# update orchestration's deterministic timestamp
timestamp = event.timestamp
if timestamp > self.context.current_utc_datetime:
self.context.current_utc_datetime = event.timestamp
elif event.event_type == HistoryEventType.CONTINUE_AS_NEW:
# re-initialize the orchestration state
self.initialize()
elif event_type == HistoryEventType.EXECUTION_STARTED:
# begin replaying user code
self.resume_user_code()
elif event_type == HistoryEventType.EVENT_SENT:
# we want to differentiate between a "proper" event sent, and a signal/call entity
key = getattr(event, "event_id")
if key in self.context.open_tasks.keys():
task = self.context.open_tasks[key]
if task._api_name == "CallEntityAction":
# in the signal entity case, the Task is represented
# with a GUID, not with a sequential integer
self.context.open_tasks.pop(key)
event_id = json.loads(event.Input)["id"]
self.context.open_tasks[event_id] = task
elif self.is_task_completion_event(event.event_type):
# transition a task to a success or failure state
(is_success, id_key) = self.event_to_SetTaskValuePayload[event_type]
self.set_task_value(event, is_success, id_key)
self.resume_user_code()
def set_task_value(self, event: HistoryEvent, is_success: bool, id_key: str):
"""Set a running task to either a success or failed state, and sets its value.
Parameters
----------
event : HistoryEvent
The history event containing the value for the Task
is_success : bool
Whether the Task succeeded or failed (throws exception)
id_key : str
The attribute in the event object containing the ID of the Task to target
"""
def parse_history_event(directive_result):
"""Based on the type of event, parse the JSON.serializable portion of the event."""
event_type = directive_result.event_type
if event_type is None:
raise ValueError("EventType is not found in task object")
# We provide the ability to deserialize custom objects, because the output of this
# will be passed directly to the orchestrator as the output of some activity
if (event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED
and directive_result.Result is not None):
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
if (event_type == HistoryEventType.TASK_COMPLETED
and directive_result.Result is not None):
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
if (event_type == HistoryEventType.EVENT_RAISED
and directive_result.Input is not None):
# TODO: Investigate why the payload is in "Input" instead of "Result"
response = json.loads(directive_result.Input,
object_hook=_deserialize_custom_object)
return response
return None
# get target task
key = getattr(event, id_key)
try:
task: Union[TaskBase, List[TaskBase]] = self.context.open_tasks.pop(key)
if isinstance(task, list):
task_list = task
task = task_list.pop()
if len(task_list) > 0:
self.context.open_tasks[key] = task_list
except KeyError:
warning = f"Potential duplicate Task completion for TaskId: {key}"
warnings.warn(warning)
self.context.deferred_tasks[key] = lambda: self.set_task_value(
event, is_success, id_key)
return
if is_success:
# retrieve result
new_value = parse_history_event(event)
if task._api_name == "CallEntityAction":
event_payload = ResponseMessage.from_dict(new_value)
new_value = json.loads(event_payload.result)
if event_payload.is_exception:
new_value = Exception(new_value)
is_success = False
else:
# generate exception
new_value = Exception(f"{event.Reason} \n {event.Details}")
# with a yielded task now evaluated, we can try to resume the user code
task.set_is_played(event._is_played)
task.set_value(is_error=not is_success, value=new_value)
def resume_user_code(self):
"""Attempt to continue executing user code.
We can only continue executing if the active/current task has resolved to a value.
"""
current_task = self.current_task
self.context._set_is_replaying(current_task.is_played)
if current_task.state is TaskState.RUNNING:
# if the current task hasn't been resolved, we can't
# continue executing the user code.
return
new_task = None
try:
# resume orchestration with a resolved task's value
task_value = current_task.result
task_succeeded = current_task.state is TaskState.SUCCEEDED
new_task = self.generator.send(
task_value) if task_succeeded else self.generator.throw(task_value)
if isinstance(new_task, TaskBase) and not (new_task._is_scheduled):
self.context._add_to_open_tasks(new_task)
except StopIteration as stop_exception:
# the orchestration returned,
# flag it as such and capture its output
self.orchestrator_returned = True
self.output = stop_exception.value
except Exception as exception:
# the orchestration threw an exception
self.exception = exception
self.current_task = new_task
if not (new_task is None):
if not (self.current_task._is_scheduled):
# new task is received. it needs to be resolved to a value
self.context._add_to_actions(self.current_task.action_repr)
self._mark_as_scheduled(self.current_task)
if not (new_task.state is TaskState.RUNNING):
# user yielded the same task multiple times, continue executing code
# until a new/not-previously-yielded task is encountered
self.resume_user_code()
def _mark_as_scheduled(self, task: TaskBase):
if isinstance(task, CompoundTask):
for task in task.children:
self._mark_as_scheduled(task)
else:
task._set_is_scheduled(True)
def get_orchestrator_state_str(self) -> str:
"""Obtain a JSON-formatted string representing the orchestration's state.
Returns
-------
str
String represented orchestration's state, payload to the extension
Raises
------
Exception
When the orchestration's state represents an error. The exception's
message contains in it the string representation of the orchestration's
state
"""
if (self.output is not None):
try:
# Attempt to serialize the output. If serialization fails, raise an
# error indicating that the orchestration output is not serializable,
# which is not permitted in durable Python functions.
json.dumps(self.output, default=_serialize_custom_object)
except Exception as e:
self.output = None
self.exception = e
exception_str = None
if self.exception is not None:
exception_str = str(self.exception)
if not exception_str:
exception_str = str(type(self.exception))
state = OrchestratorState(
is_done=self.orchestration_invocation_succeeded,
actions=self.context._actions,
output=self.output,
replay_schema=self.context._replay_schema,
error=exception_str,
custom_status=self.context.custom_status
)
if self.exception is not None:
# Create formatted error, using out-of-proc error schema
error_label = "\n\n$OutOfProcData$:"
state_str = state.to_json_string()
formatted_error = f"{self.exception}{error_label}{state_str}"
# Raise exception, re-set stack to original location
raise Exception(formatted_error) from self.exception
return state.to_json_string()
def is_task_completion_event(self, event_type: HistoryEventType) -> bool:
"""Determine if some event_type corresponds to a Task-resolution event.
Parameters
----------
event_type : HistoryEventType
The event_type to analyze.
Returns
-------
bool
True if the event corresponds to a Task being resolved. False otherwise.
"""
return event_type in self.task_completion_events
@property
def has_execution_completed(self) -> bool:
"""Determine if the orchestration invocation is completed.
An orchestration can complete either by returning,
continuing-as-new, or through an exception.
Returns
-------
bool
Whether the orchestration invocation is completed.
"""
return self.orchestration_invocation_succeeded or not (self.exception is None)
@property
def orchestration_invocation_succeeded(self) -> bool:
"""Whether the orchestration returned or continued-as-new.
Returns
-------
bool
Whether the orchestration returned or continued-as-new
"""
return self.orchestrator_returned or self.context.will_continue_as_new