Skip to content

Commit 566b976

Browse files
committed
Add WorkflowClient.new_workflow_stub_from_workflow_id()
1 parent a9653fb commit 566b976

3 files changed

Lines changed: 95 additions & 5 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ version is targeted to be released in ~~September of 2019~~ ~~January 2020~~ Mar
2828
- [x] newRandom
2929
- [x] UUID
3030
- [x] Workflow Versioning
31-
- [ ] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
31+
- [x] WorkflowClient.newWorkflowStub(Class workflowInterface, String workflowId);
3232

3333
1.1
3434
- [ ] ActivityStub and Workflow.newUntypedActivityStub
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import time
2+
3+
import pytest
4+
5+
from cadence.exceptions import QueryFailureException
6+
from cadence.workerfactory import WorkerFactory
7+
from cadence.workflow import workflow_method, signal_method, Workflow, WorkflowClient, query_method
8+
9+
TASK_LIST = "TestStubWorkflowId"
10+
DOMAIN = "sample"
11+
12+
13+
class GreetingException(Exception):
14+
pass
15+
16+
17+
class TestStubWorkflowId:
18+
19+
@query_method()
20+
async def get_message(self) -> str:
21+
raise NotImplementedError
22+
23+
@query_method()
24+
async def get_message_fail(self) -> str:
25+
raise NotImplementedError
26+
27+
@signal_method()
28+
async def put_message(self, message):
29+
raise NotImplementedError
30+
31+
@workflow_method(task_list=TASK_LIST)
32+
async def get_greetings(self) -> list:
33+
raise NotImplementedError
34+
35+
36+
class TestStubWorkflowIdImpl(TestStubWorkflowId):
37+
38+
def __init__(self):
39+
self.message = ""
40+
41+
async def get_message(self) -> str:
42+
return self.message
43+
44+
async def get_message_fail(self) -> str:
45+
raise GreetingException("error from query")
46+
47+
async def put_message(self, message):
48+
self.message = message
49+
50+
async def get_greetings(self) -> list:
51+
self.message = "initial-message"
52+
await Workflow.await_till(lambda: self.message == "done")
53+
return "finished"
54+
55+
56+
def test_stub_workflow_id():
57+
factory = WorkerFactory("localhost", 7933, DOMAIN)
58+
worker = factory.new_worker(TASK_LIST)
59+
worker.register_workflow_implementation_type(TestStubWorkflowIdImpl)
60+
factory.start()
61+
62+
client = WorkflowClient.new_client(domain=DOMAIN)
63+
workflow: TestStubWorkflowId = client.new_workflow_stub(TestStubWorkflowId)
64+
context = WorkflowClient.start(workflow.get_greetings)
65+
66+
stub: TestStubWorkflowId = client.new_workflow_stub_from_workflow_id(TestStubWorkflowId,
67+
workflow_id=context.workflow_execution.workflow_id)
68+
stub.put_message("abc")
69+
assert stub.get_message() == "abc"
70+
71+
stub.put_message("done")
72+
assert client.wait_for_close_with_workflow_id(context.workflow_execution.workflow_id) == "finished"
73+
74+
print("Stopping workers")
75+
worker.stop()

cadence/workflow.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,24 @@ def new_workflow_stub(self, cls: Type, workflow_options: WorkflowOptions = None)
133133
stub_cls = type(cls.__name__, (WorkflowStub,), attrs)
134134
return stub_cls()
135135

136+
def new_workflow_stub_from_workflow_id(self, cls: Type, workflow_id: str):
137+
"""
138+
Use it to send signals or queries to a running workflow.
139+
Do not call workflow methods on it
140+
"""
141+
stub_instance = self.new_workflow_stub(cls)
142+
execution = WorkflowExecution(workflow_id=workflow_id, run_id=None)
143+
stub_instance._execution = execution
144+
return stub_instance
145+
136146
def wait_for_close(self, context: WorkflowExecutionContext) -> object:
147+
return self.wait_for_close_with_workflow_id(workflow_id=context.workflow_execution.workflow_id,
148+
run_id=context.workflow_execution.run_id,
149+
workflow_type=context.workflow_type)
150+
151+
def wait_for_close_with_workflow_id(self, workflow_id: str, run_id: str = None, workflow_type: str = None):
137152
while True:
138-
history_request = create_close_history_event_request(self, context.workflow_execution.workflow_id,
139-
context.workflow_execution.run_id)
153+
history_request = create_close_history_event_request(self, workflow_id, run_id)
140154
history_response, err = self.service.get_workflow_execution_history(history_request)
141155
if err:
142156
raise Exception(err)
@@ -152,8 +166,9 @@ def wait_for_close(self, context: WorkflowExecutionContext) -> object:
152166
exception = deserialize_exception(attributes.details)
153167
if isinstance(exception, ActivityFailureException):
154168
exception.set_cause()
155-
raise WorkflowFailureException(workflow_type=context.workflow_type,
156-
execution=context.workflow_execution) from exception
169+
workflow_execution = WorkflowExecution(workflow_id=workflow_id, run_id=run_id)
170+
raise WorkflowFailureException(workflow_type=workflow_type,
171+
execution=workflow_execution) from exception
157172
else:
158173
details: Dict = json.loads(attributes.details)
159174
detail_message = details.get("detailMessage", "")

0 commit comments

Comments
 (0)