This repository was archived by the owner on Sep 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathactivity_loop.py
More file actions
87 lines (80 loc) · 4.12 KB
/
activity_loop.py
File metadata and controls
87 lines (80 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import datetime
import logging
import json
from cadence.activity import ActivityContext, ActivityTask, complete_exceptionally, complete
from cadence.cadence_types import PollForActivityTaskRequest, TaskListMetadata, TaskList, PollForActivityTaskResponse
from cadence.conversions import json_to_args
from cadence.workflowservice import WorkflowService
from cadence.worker import Worker, StopRequestedException
logger = logging.getLogger(__name__)
def activity_task_loop(worker: Worker):
service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout())
worker.manage_service(service)
logger.info(f"Activity task worker started: {WorkflowService.get_identity()}")
try:
while True:
if worker.is_stop_requested():
return
try:
service.set_next_timeout_cb(worker.raise_if_stop_requested)
polling_start = datetime.datetime.now()
polling_request = PollForActivityTaskRequest()
polling_request.task_list_metadata = TaskListMetadata()
polling_request.task_list_metadata.max_tasks_per_second = 200000
polling_request.domain = worker.domain
polling_request.identity = WorkflowService.get_identity()
polling_request.task_list = TaskList()
polling_request.task_list.name = worker.task_list
task: PollForActivityTaskResponse
task, err = service.poll_for_activity_task(polling_request)
polling_end = datetime.datetime.now()
logger.debug("PollForActivityTask: %dms", (polling_end - polling_start).total_seconds() * 1000)
except StopRequestedException:
return
except Exception as ex:
logger.error("PollForActivityTask error: %s", ex)
logger.error(f"Exiting")
break
if err:
logger.error("PollForActivityTask failed: %s", err)
continue
task_token = task.task_token
if not task_token:
logger.debug("PollForActivityTask has no task_token (expected): %s", task)
continue
args = json_to_args(task.input)
logger.info(f"Request for activity: {task.activity_type.name}")
fn = worker.activities.get(task.activity_type.name)
if not fn:
logger.error("Activity type not found: " + task.activity_type.name)
continue
process_start = datetime.datetime.now()
activity_context = ActivityContext()
activity_context.service = service
activity_context.activity_task = ActivityTask.from_poll_for_activity_task_response(task)
activity_context.domain = worker.domain
try:
ActivityContext.set(activity_context)
return_value = fn(*args)
if activity_context.do_not_complete:
logger.info(f"Not completing activity {task.activity_type.name}({str(args)[1:-1]})")
continue
error = complete(service, task_token, return_value)
if error:
logger.error("Error invoking RespondActivityTaskCompleted: %s", error)
logger.info(f"Activity {task.activity_type.name}({str(args)[1:-1]}) returned {json.dumps(return_value)}")
except Exception as ex:
logger.error(f"Activity {task.activity_type.name} failed: {type(ex).__name__}({ex})", exc_info=1)
error = complete_exceptionally(service, task_token, ex)
if error:
logger.error("Error invoking RespondActivityTaskFailed: %s", error)
finally:
ActivityContext.set(None)
process_end = datetime.datetime.now()
logger.info("Process ActivityTask: %dms", (process_end - process_start).total_seconds() * 1000)
finally:
try:
service.close()
except:
logger.warning("service.close() failed", exc_info=1)
worker.notify_thread_stopped()