Skip to content

Commit f850798

Browse files
committed
Add Workflow.get_logger()
1 parent 5fa7114 commit f850798

7 files changed

Lines changed: 261 additions & 0 deletions

cadence/decision_loop.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,12 @@ def record_marker(self, marker_name: str, header: Header, details: bytes):
515515
def get_version(self, change_id: str, min_supported: int, max_supported: int) -> int:
516516
return self.workflow_clock.get_version(change_id, min_supported, max_supported)
517517

518+
def get_logger(self, name) -> logging.Logger:
519+
replay_aware_logger = logging.getLogger(name)
520+
make_replay_aware(replay_aware_logger)
521+
return replay_aware_logger
522+
523+
518524
@dataclass
519525
class ReplayDecider:
520526
execution_id: str
@@ -939,3 +945,4 @@ def respond_decisions(self, task_token: bytes, decisions: List[Decision]):
939945

940946

941947
from cadence.clock_decision_context import ClockDecisionContext, TimerCancellationHandler, LOCAL_ACTIVITY_MARKER_NAME
948+
from cadence.replay_interceptor import make_replay_aware

cadence/replay_interceptor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import inspect
2+
from typing import Callable
3+
4+
5+
def get_replay_aware_interceptor(fn: Callable):
6+
def interceptor(*args, **kwargs):
7+
from cadence.decision_loop import ITask
8+
task: ITask = ITask.current()
9+
if not task.decider.decision_context.is_replaying():
10+
return fn(*args, **kwargs)
11+
12+
return interceptor
13+
14+
15+
def make_replay_aware(target: object):
16+
# TODO: Consider using metaclasses instead
17+
if hasattr(target, "_cadence_python_intercepted"):
18+
return target
19+
for name, fn in inspect.getmembers(target):
20+
if inspect.ismethod(fn):
21+
setattr(target, name, get_replay_aware_interceptor(fn))
22+
target._cadence_python_intercepted = True
23+
return target
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
import logging.config
3+
4+
counter_filter_counter = 0
5+
6+
7+
class CounterFilter(logging.Filter):
8+
9+
def filter(self, record):
10+
global counter_filter_counter
11+
counter_filter_counter += 1
12+
return True
13+
14+
15+
def reset_counter_filter_counter():
16+
global counter_filter_counter
17+
counter_filter_counter = 0
18+
19+
20+
def get_counter_filter_counter():
21+
return counter_filter_counter
22+
23+
24+
LOGGING = {
25+
'version': 1,
26+
'filters': {
27+
'counter-filter': {
28+
'()': CounterFilter,
29+
}
30+
},
31+
'handlers': {
32+
'console': {
33+
'class': 'logging.StreamHandler',
34+
'filters': ['counter-filter']
35+
}
36+
},
37+
'loggers': {
38+
'test-logger': {
39+
'level': 'DEBUG',
40+
'handlers': ['console']
41+
}
42+
},
43+
}

cadence/tests/test_interceptor.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import inspect
2+
import logging
3+
import logging.config
4+
5+
from cadence.tests.interceptor_testing_utils import LOGGING, get_counter_filter_counter, reset_counter_filter_counter
6+
7+
a_captured = None
8+
b_captured = None
9+
args_captured = None
10+
kwargs_captured = None
11+
12+
13+
class Target:
14+
def do_stuff(self, a, b=2):
15+
global a_captured, b_captured
16+
a_captured = a
17+
b_captured = b
18+
19+
20+
def interceptor(fn):
21+
def intercept(*args, **kwargs):
22+
global args_captured, kwargs_captured
23+
args_captured = args
24+
kwargs_captured = kwargs
25+
fn(*args, **kwargs)
26+
27+
return intercept
28+
29+
30+
def test_interceptor():
31+
target = Target()
32+
for name, fn in inspect.getmembers(target):
33+
if inspect.ismethod(fn):
34+
setattr(target, name, interceptor(fn))
35+
target.do_stuff(1, b=20)
36+
assert args_captured == (1,)
37+
assert kwargs_captured == {"b": 20}
38+
assert a_captured == 1
39+
assert b_captured == 20
40+
41+
target2 = Target()
42+
target2.do_stuff(99, b=100)
43+
assert args_captured == (1,)
44+
assert kwargs_captured == {"b": 20}
45+
46+
47+
def test_logger():
48+
reset_counter_filter_counter()
49+
50+
logging.config.dictConfig(LOGGING)
51+
logger = logging.getLogger("test-logger")
52+
logger.info("1")
53+
logger.info("2")
54+
logger.info("3")
55+
56+
logger = logging.getLogger("something-else")
57+
logger.info("1")
58+
logger.info("2")
59+
logger.info("3")
60+
61+
assert get_counter_filter_counter() == 3
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from unittest.mock import MagicMock, Mock
2+
3+
import pytest
4+
5+
from cadence.decision_loop import DecisionContext, ITask
6+
from cadence.replay_interceptor import make_replay_aware
7+
import cadence.decision_loop
8+
9+
a_captured = None
10+
b_captured = None
11+
12+
13+
@pytest.fixture()
14+
def task_decision_context_replaying():
15+
task: ITask = Mock()
16+
task.decider = Mock()
17+
decision_context = MagicMock()
18+
decision_context.is_replaying = Mock(return_value=True)
19+
task.decider.decision_context = decision_context
20+
return task
21+
22+
23+
@pytest.fixture()
24+
def task_decision_context_not_replaying():
25+
task: ITask = Mock()
26+
task.decider = Mock()
27+
decision_context = MagicMock()
28+
decision_context.is_replaying = Mock(return_value=False)
29+
task.decider.decision_context = decision_context
30+
return task
31+
32+
33+
@pytest.fixture()
34+
def target():
35+
return Target()
36+
37+
class Target:
38+
def do_stuff(self, a, b=1):
39+
global a_captured, b_captured
40+
a_captured = a
41+
b_captured = b
42+
43+
44+
@pytest.mark.asyncio
45+
async def test_get_replay_aware_interceptor_not_replaying(task_decision_context_not_replaying, target: Target):
46+
cadence.decision_loop.current_task.set(task_decision_context_not_replaying)
47+
global a_captured, b_captured
48+
a_captured = None
49+
b_captured = None
50+
target = Target()
51+
original_fn = target.do_stuff
52+
make_replay_aware(target)
53+
assert target.do_stuff != original_fn
54+
target.do_stuff(20, b=30)
55+
assert a_captured == 20
56+
assert b_captured == 30
57+
58+
59+
@pytest.mark.asyncio
60+
async def test_get_replay_aware_interceptor_replaying(task_decision_context_replaying, target: Target):
61+
cadence.decision_loop.current_task.set(task_decision_context_replaying)
62+
global a_captured, b_captured
63+
a_captured = None
64+
b_captured = None
65+
original_fn = target.do_stuff
66+
make_replay_aware(target)
67+
assert target.do_stuff != original_fn
68+
target.do_stuff(20, b=30)
69+
assert a_captured is None
70+
assert b_captured is None
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import logging
2+
3+
from cadence.tests.interceptor_testing_utils import reset_counter_filter_counter, LOGGING, get_counter_filter_counter
4+
from cadence.workerfactory import WorkerFactory
5+
from cadence.workflow import workflow_method, Workflow, WorkflowClient
6+
7+
TASK_LIST = "TestWorkflowLogger"
8+
DOMAIN = "sample"
9+
10+
11+
class TestWorkflowLogger:
12+
@workflow_method(task_list=TASK_LIST)
13+
async def get_greetings(self) -> list:
14+
raise NotImplementedError
15+
16+
17+
class TestWorkflowLoggerImpl(TestWorkflowLogger):
18+
19+
def __init__(self):
20+
pass
21+
22+
async def get_greetings(self):
23+
logger = Workflow.get_logger("test-logger")
24+
logger.info("********Test %d", 1)
25+
await Workflow.sleep(10)
26+
logger.info("********Test %d", 2)
27+
await Workflow.sleep(10)
28+
logger.info("********Test %d", 3)
29+
await Workflow.sleep(10)
30+
logger.info("********Test %d", 4)
31+
await Workflow.sleep(10)
32+
logger.info("********Test %d", 5)
33+
34+
35+
def test_workflow_logger():
36+
reset_counter_filter_counter()
37+
logging.config.dictConfig(LOGGING)
38+
39+
factory = WorkerFactory("localhost", 7933, DOMAIN)
40+
worker = factory.new_worker(TASK_LIST)
41+
worker.register_workflow_implementation_type(TestWorkflowLoggerImpl)
42+
factory.start()
43+
44+
client = WorkflowClient.new_client(domain=DOMAIN)
45+
workflow: TestWorkflowLogger = client.new_workflow_stub(TestWorkflowLogger)
46+
47+
workflow.get_greetings()
48+
assert get_counter_filter_counter() == 5
49+
50+
print("Stopping workers")
51+
worker.stop()

cadence/workflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ def get_version(change_id: str, min_supported: int, max_supported: int):
8282
decision_context: DecisionContext = task.decider.decision_context
8383
return decision_context.get_version(change_id, min_supported, max_supported)
8484

85+
@staticmethod
86+
def get_logger(name):
87+
from cadence.decision_loop import ITask
88+
task: ITask = ITask.current()
89+
return task.decider.decision_context.get_logger(name)
90+
8591

8692
class WorkflowStub:
8793
pass

0 commit comments

Comments
 (0)