Skip to content

Commit 2e4c217

Browse files
committed
Fixed: get_version() should always return the same result per workflow execution.
1 parent 54e7ab7 commit 2e4c217

6 files changed

Lines changed: 88 additions & 49 deletions

File tree

cadence/clock_decision_context.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from cadence.conversions import args_to_json
1010
from cadence.decision_loop import ReplayDecider, DecisionContext
1111
from cadence.exceptions import CancellationException
12-
from cadence.marker import MarkerHandler
12+
from cadence.marker import MarkerHandler, MarkerInterface, MarkerResult
1313
from cadence.util import OpenRequestInfo
1414

1515
logger = logging.getLogger(__name__)
@@ -86,15 +86,14 @@ def handle_timer_canceled(self, event: HistoryEvent):
8686
self.timer_cancelled(started_event_id, None)
8787

8888
def get_version(self, change_id: str, min_supported: int, max_supported) -> int:
89-
def func(stored):
90-
if stored:
91-
return None
92-
else:
93-
return json.dumps(max_supported)
89+
def func():
90+
return json.dumps(max_supported)
9491

9592
result: bytes = self.version_handler.handle(change_id, func)
96-
if not result:
97-
return DEFAULT_VERSION
93+
if result is None:
94+
result = json.dumps(DEFAULT_VERSION)
95+
self.version_handler.set_data(change_id, result)
96+
self.version_handler.mark_replayed(change_id) # so that we don't ever emit a MarkerRecorded for this
9897

9998
version: int = json.loads(result)
10099
self.validate_version(change_id, version, min_supported, max_supported)
@@ -106,6 +105,9 @@ def validate_version(self, change_id: str, version: int, min_supported: int, max
106105
f"Supported version is between {min_supported} and {max_supported}.")
107106

108107
def handle_marker_recorded(self, event: HistoryEvent):
108+
"""
109+
Will be executed more than once for the same event.
110+
"""
109111
attributes = event.marker_recorded_event_attributes
110112
name: str = attributes.marker_name
111113
if SIDE_EFFECT_MARKER_NAME == name:
@@ -116,7 +118,12 @@ def handle_marker_recorded(self, event: HistoryEvent):
116118
# TODO
117119
# handleLocalActivityMarker(attributes);
118120
pass
119-
elif MUTABLE_SIDE_EFFECT_MARKER_NAME != name and VERSION_MARKER_NAME != name:
121+
elif VERSION_MARKER_NAME == name:
122+
marker_data = MarkerInterface.from_event_attributes(attributes)
123+
change_id: str = marker_data.get_id()
124+
data: bytes = marker_data.get_data()
125+
self.version_handler.mutable_marker_results[change_id] = MarkerResult(data=data)
126+
elif MUTABLE_SIDE_EFFECT_MARKER_NAME != name:
120127
# TODO
121128
# if (log.isWarnEnabled()) {
122129
# log.warn("Unexpected marker: " + event);

cadence/marker.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def get_id(self) -> str:
7070
class MarkerResult:
7171
data: bytes = None
7272
access_count: int = 0
73+
replayed = False
7374

7475

7576
@dataclass
@@ -80,29 +81,42 @@ class MarkerHandler:
8081

8182
def record_mutable_marker(self, id: str, event_id: int, data: bytes, access_count: int):
8283
marker = MarkerData.create(id=id, event_id=event_id, data=data, access_count=access_count)
83-
self.mutable_marker_results[id] = MarkerResult(data=data)
84+
if id in self.mutable_marker_results:
85+
self.mutable_marker_results[id].replayed = True
86+
else:
87+
self.mutable_marker_results[id] = MarkerResult(data=data)
8488
self.decision_context.record_marker(self.marker_name, marker.get_header(), data)
8589

86-
def handle(self, id: str, func) -> bytes:
87-
result: MarkerResult = self.mutable_marker_results.get(id)
88-
stored: bytes = None
89-
if result:
90-
stored = result.data
91-
event_id = self.decision_context.decider.next_decision_event_id
92-
access_count = 0 if result is None else result.access_count
93-
if self.decision_context.is_replaying():
94-
data: bytes = self.get_marker_data_from_history(event_id, id, access_count)
95-
if data:
96-
self.record_mutable_marker(id, event_id, data, access_count)
97-
return data
98-
return stored
99-
to_store = func(stored)
100-
if to_store:
101-
data = to_store
102-
self.record_mutable_marker(id, event_id, data, access_count)
103-
return to_store
104-
return stored
90+
# Sets data without creating a decision - used when DEFAULT_VERSION is the implicit current version
91+
def set_data(self, id, data: bytes):
92+
self.mutable_marker_results[id] = MarkerResult(data=data)
10593

94+
def mark_replayed(self, id):
95+
self.mutable_marker_results[id].replayed = True
96+
97+
def handle(self, id: str, func) -> Optional[bytes]:
98+
event_id = self.decision_context.decider.next_decision_event_id
99+
result: MarkerResult = self.mutable_marker_results.get(id)
100+
if result or self.decision_context.is_replaying():
101+
if result:
102+
if self.decision_context.is_replaying() and not result.replayed:
103+
# Need to insert marker to ensure that event_id is incremented
104+
self.record_mutable_marker(id, event_id, result.data, 0)
105+
return result.data
106+
else:
107+
return None
108+
else:
109+
to_store = func()
110+
if to_store:
111+
data = to_store
112+
self.record_mutable_marker(id, event_id, data, 0)
113+
return to_store
114+
else:
115+
# TODO: Should this ever happen? - at least for version it will never happen
116+
pass
117+
118+
# This method is currently not being used - after adopting the version logic from the
119+
# Golang client
106120
def get_marker_data_from_history(self, event_id: int, marker_id: str, expected_access_count: int) -> \
107121
Optional[bytes]:
108122
event: HistoryEvent = self.decision_context.decider.get_optional_decision_event(event_id)

cadence/tests/test_marker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ def callback(stored):
151151
handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name")
152152
handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35)
153153
ret = handler.handle("the-id", callback)
154-
assert ret == b'blah-blah'
155-
assert len(decision_context.decider.decisions) == 1
154+
assert ret == b'123'
155+
assert len(decision_context.decider.decisions) == 0
156156

157157

158158
def test_handle_replaying_no_history(decision_context):
@@ -175,8 +175,8 @@ def callback(stored):
175175
handler = MarkerHandler(decision_context=decision_context, marker_name="the-marker-name")
176176
handler.mutable_marker_results["the-id"] = MarkerResult(data=b'123', access_count=35)
177177
ret = handler.handle("the-id", callback)
178-
assert ret == b'456'
179-
assert len(decision_context.decider.decisions) == 1
178+
assert ret == b'123'
179+
assert len(decision_context.decider.decisions) == 0
180180

181181

182182
def test_handle_not_replaying_callback_returns_none(decision_context):

cadence/tests/test_version.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ def version_decision_context(version_marker_recorded_event):
8888
def test_clock_decision_context_from_replay(version_decision_context):
8989
version_decision_context.workflow_clock.set_replaying(True)
9090
version = version_decision_context.workflow_clock.get_version("abc", 1, 5)
91-
assert version == 4
92-
assert len(version_decision_context.decider.decisions) == 1
91+
assert version == -1
92+
assert len(version_decision_context.decider.decisions) == 0
9393

9494

9595
def test_validate_version(version_decision_context):

cadence/tests/test_workflow_get_version.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
v1_hits = 0
1212
v2_hits = 0
1313

14-
version_found_in_v2_step_1 = None
15-
version_found_in_v2_step_2 = None
14+
version_found_in_v2_step_1_0 = None
15+
version_found_in_v2_step_1_1 = None
16+
version_found_in_v2_step_2_0 = None
17+
version_found_in_v2_step_2_1 = None
1618

1719
v2_done = False
1820

@@ -41,13 +43,16 @@ def __init__(self):
4143

4244
async def get_greetings(self):
4345
global v2_hits
44-
global version_found_in_v2_step_1, version_found_in_v2_step_2
46+
global version_found_in_v2_step_1_0, version_found_in_v2_step_1_1
47+
global version_found_in_v2_step_2_0, version_found_in_v2_step_2_1
4548
global v2_done
4649
v2_hits += 1
4750

48-
version_found_in_v2_step_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
51+
version_found_in_v2_step_1_0 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
52+
version_found_in_v2_step_1_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
4953
await Workflow.sleep(60)
50-
version_found_in_v2_step_2 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
54+
version_found_in_v2_step_2_0 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
55+
version_found_in_v2_step_2_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
5156
v2_done = True
5257

5358

@@ -74,8 +79,12 @@ def test_workflow_workflow_get_version():
7479

7580
assert v1_hits == 1
7681
assert v2_hits == 1
77-
assert version_found_in_v2_step_1 == DEFAULT_VERSION
78-
assert version_found_in_v2_step_2 == 2
82+
assert version_found_in_v2_step_1_0 == DEFAULT_VERSION
83+
assert version_found_in_v2_step_1_1 == DEFAULT_VERSION
84+
assert version_found_in_v2_step_2_0 == DEFAULT_VERSION
85+
assert version_found_in_v2_step_2_1 == DEFAULT_VERSION
86+
87+
# TODO: Assert that there are no markers recorded
7988

8089
print("Stopping workers")
8190
worker.stop(background=True)

cadence/tests/test_workflow_get_version_single.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
TASK_LIST = "TestWorkflowGetVersionSingle"
99
DOMAIN = "sample"
1010

11-
version_found_in_step_1 = None
12-
version_found_in_step_2 = None
11+
version_found_in_step_1_0 = None
12+
version_found_in_step_1_1 = None
13+
version_found_in_step_2_0 = None
14+
version_found_in_step_2_1 = None
1315

1416

1517
class TestWorkflowGetVersionSingle:
@@ -24,10 +26,13 @@ def __init__(self):
2426
pass
2527

2628
async def get_greetings(self):
27-
global version_found_in_step_1, version_found_in_step_2
28-
version_found_in_step_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
29+
global version_found_in_step_1_0, version_found_in_step_1_1
30+
global version_found_in_step_2_0, version_found_in_step_2_1
31+
version_found_in_step_1_0 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
32+
version_found_in_step_1_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
2933
await Workflow.sleep(60)
30-
version_found_in_step_2 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
34+
version_found_in_step_2_0 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
35+
version_found_in_step_2_1 = Workflow.get_version("first-item", DEFAULT_VERSION, 2)
3136

3237

3338
def test_workflow_workflow_get_version_single():
@@ -40,8 +45,12 @@ def test_workflow_workflow_get_version_single():
4045
workflow: TestWorkflowGetVersionSingle = client.new_workflow_stub(TestWorkflowGetVersionSingle)
4146
workflow.get_greetings()
4247

43-
assert version_found_in_step_1 == 2
44-
assert version_found_in_step_2 == 2
48+
assert version_found_in_step_1_0 == 2
49+
assert version_found_in_step_1_1 == 2
50+
assert version_found_in_step_2_0 == 2
51+
assert version_found_in_step_2_1 == 2
52+
53+
# TODO: Assert that there is only a single marker recorded
4554

4655
print("Stopping workers")
4756
worker.stop(background=True)

0 commit comments

Comments
 (0)