Skip to content

Commit e014308

Browse files
committed
Add Workflow.getVersion().
1 parent e03e504 commit e014308

4 files changed

Lines changed: 142 additions & 1 deletion

File tree

cadence/decision_loop.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,9 @@ def record_marker(self, marker_name: str, header: Header, details: bytes):
463463
decision_id = DecisionId(DecisionTarget.MARKER, next_decision_event_id)
464464
self.decider.add_decision(decision_id, MarkerDecisionStateMachine(id=decision_id, decision=decision))
465465

466+
def get_version(self, change_id: str, min_supported: int, max_supported: int) -> int:
467+
return self.workflow_clock.get_version(change_id, min_supported, max_supported)
468+
466469
@dataclass
467470
class ReplayDecider:
468471
execution_id: str
@@ -715,6 +718,7 @@ def handle_marker_recorded(self, event: HistoryEvent):
715718
def get_optional_decision_event(self, event_id: int) -> HistoryEvent:
716719
return self.decision_events.get_optional_decision_event(event_id)
717720

721+
718722
# noinspection PyUnusedLocal
719723
def noop(*args):
720724
pass
@@ -740,7 +744,7 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent):
740744
EventType.TimerStarted: ReplayDecider.handle_timer_started,
741745
EventType.TimerCanceled: on_timer_canceled,
742746
EventType.CancelTimerFailed: ReplayDecider.handle_cancel_timer_failed,
743-
EventType.MarkerRecorded: ReplayDecider.handler_marker_recorded
747+
EventType.MarkerRecorded: ReplayDecider.handle_marker_recorded
744748
}
745749

746750

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import sys
2+
from time import sleep
3+
4+
from cadence.clock_decision_context import DEFAULT_VERSION
5+
from cadence.workerfactory import WorkerFactory
6+
from cadence.workflow import workflow_method, Workflow, WorkflowClient
7+
8+
TASK_LIST = "TestWorkflowGetVersion"
9+
DOMAIN = "sample"
10+
11+
v1_hits = 0
12+
v2_hits = 0
13+
14+
version_found_in_v2_step_1 = None
15+
version_found_in_v2_step_2 = None
16+
17+
v2_done = False
18+
19+
20+
class TestWorkflowGetVersion:
21+
@workflow_method(task_list=TASK_LIST)
22+
async def get_greetings(self) -> list:
23+
raise NotImplementedError
24+
25+
26+
class TestWorkflowGetVersionImplV1(TestWorkflowGetVersion):
27+
28+
def __init__(self):
29+
pass
30+
31+
async def get_greetings(self):
32+
global v1_hits
33+
v1_hits += 1
34+
await Workflow.sleep(60)
35+
36+
37+
class TestWorkflowGetVersionImplV2(TestWorkflowGetVersion):
38+
39+
def __init__(self):
40+
pass
41+
42+
async def get_greetings(self):
43+
global v2_hits
44+
global version_found_in_v2_step_1, version_found_in_v2_step_2
45+
global v2_done
46+
v2_hits += 1
47+
48+
version_found_in_v2_step_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
49+
await Workflow.sleep(60)
50+
version_found_in_v2_step_2 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
51+
v2_done = True
52+
53+
54+
def test_workflow_workflow_get_version():
55+
global v1_hits, v2_hits
56+
factory = WorkerFactory("localhost", 7933, DOMAIN)
57+
worker = factory.new_worker(TASK_LIST)
58+
worker.register_workflow_implementation_type(TestWorkflowGetVersionImplV1)
59+
factory.start()
60+
61+
client = WorkflowClient.new_client(domain=DOMAIN)
62+
workflow: TestWorkflowGetVersion = client.new_workflow_stub(TestWorkflowGetVersion)
63+
64+
client.start(workflow.get_greetings)
65+
while v1_hits == 0:
66+
print(".", end="")
67+
sleep(2)
68+
69+
worker.register_workflow_implementation_type(TestWorkflowGetVersionImplV2)
70+
71+
while not v2_done:
72+
print(".", end="")
73+
sleep(2)
74+
75+
assert v1_hits == 1
76+
assert v2_hits == 1
77+
assert version_found_in_v2_step_1 == DEFAULT_VERSION
78+
assert version_found_in_v2_step_2 == 2
79+
80+
print("Stopping workers")
81+
worker.stop(background=True)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import sys
2+
from time import sleep
3+
4+
from cadence.clock_decision_context import DEFAULT_VERSION
5+
from cadence.workerfactory import WorkerFactory
6+
from cadence.workflow import workflow_method, Workflow, WorkflowClient
7+
8+
TASK_LIST = "TestWorkflowGetVersionSingle"
9+
DOMAIN = "sample"
10+
11+
version_found_in_step_1 = None
12+
version_found_in_step_2 = None
13+
14+
15+
class TestWorkflowGetVersionSingle:
16+
@workflow_method(task_list=TASK_LIST)
17+
async def get_greetings(self) -> list:
18+
raise NotImplementedError
19+
20+
21+
class TestWorkflowGetVersionSingleImpl(TestWorkflowGetVersionSingle):
22+
23+
def __init__(self):
24+
pass
25+
26+
async def get_greetings(self):
27+
global version_found_in_step_1, version_found_in_step_2
28+
version_found_in_step_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
29+
await Workflow.sleep(60)
30+
version_found_in_step_2 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
31+
32+
33+
def test_workflow_workflow_get_version_single():
34+
factory = WorkerFactory("localhost", 7933, DOMAIN)
35+
worker = factory.new_worker(TASK_LIST)
36+
worker.register_workflow_implementation_type(TestWorkflowGetVersionSingleImpl)
37+
factory.start()
38+
39+
client = WorkflowClient.new_client(domain=DOMAIN)
40+
workflow: TestWorkflowGetVersionSingle = client.new_workflow_stub(TestWorkflowGetVersionSingle)
41+
workflow.get_greetings()
42+
43+
assert version_found_in_step_1 == 2
44+
assert version_found_in_step_2 == 2
45+
46+
print("Stopping workers")
47+
worker.stop(background=True)

cadence/workflow.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ def new_random() -> random.Random:
7171
task: ITask = ITask.current()
7272
return task.decider.decision_context.new_random()
7373

74+
@staticmethod
75+
def get_version(change_id: str, min_supported: int, max_supported: int):
76+
from cadence.decision_loop import ITask
77+
from cadence.decision_loop import DecisionContext
78+
task: ITask = ITask.current()
79+
decision_context: DecisionContext = task.decider.decision_context
80+
return decision_context.get_version(change_id, min_supported, max_supported)
81+
82+
7483
class WorkflowStub:
7584
pass
7685

0 commit comments

Comments
 (0)