Skip to content

Commit 15f08ca

Browse files
harsh543claude
andcommitted
test: parameterize workflow logging tests
- Merge test_workflow_logging and test_workflow_logging_flatten_mode into a single pytest-parameterized test covering dict and flatten modes - Extract replay/full_workflow_info_on_extra testing into separate test_workflow_logging_replay test for clarity Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3e29635 commit 15f08ca

1 file changed

Lines changed: 65 additions & 81 deletions

File tree

tests/worker/test_workflow.py

Lines changed: 65 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,7 +1991,67 @@ def last_signal(self) -> str:
19911991
return self._last_signal
19921992

19931993

1994-
async def test_workflow_logging(client: Client):
1994+
@pytest.mark.parametrize("temporal_extra_mode", ["dict", "flatten"])
1995+
async def test_workflow_logging(client: Client, temporal_extra_mode: str):
1996+
"""Test that workflow logger produces correct log records for each extra mode."""
1997+
original_mode = workflow.logger.temporal_extra_mode
1998+
workflow.logger.temporal_extra_mode = temporal_extra_mode
1999+
2000+
try:
2001+
with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer:
2002+
async with new_worker(
2003+
client, LoggingWorkflow, max_cached_workflows=0
2004+
) as worker:
2005+
handle = await client.start_workflow(
2006+
LoggingWorkflow.run,
2007+
id=f"workflow-{uuid.uuid4()}",
2008+
task_queue=worker.task_queue,
2009+
)
2010+
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2011+
await handle.execute_update(
2012+
LoggingWorkflow.my_update, "update 1", id="update-1"
2013+
)
2014+
await handle.signal(LoggingWorkflow.my_signal, "finish")
2015+
await handle.result()
2016+
2017+
record = capturer.find_log("Signal: signal 1")
2018+
assert record is not None
2019+
assert record.funcName == "my_signal"
2020+
2021+
update_record = capturer.find_log("Update: update 1")
2022+
assert update_record is not None
2023+
2024+
if temporal_extra_mode == "dict":
2025+
# Dict mode appends context to message and uses nested dict
2026+
assert "({'attempt':" in record.message
2027+
assert record.__dict__["temporal_workflow"]["workflow_type"] == "LoggingWorkflow"
2028+
assert update_record.__dict__["temporal_workflow"]["update_id"] == "update-1"
2029+
assert update_record.__dict__["temporal_workflow"]["update_name"] == "my_update"
2030+
assert "'update_id': 'update-1'" in update_record.message
2031+
else:
2032+
# Flatten mode uses OTel-safe scalar attributes
2033+
assert "temporal_workflow" not in record.__dict__
2034+
assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow"
2035+
assert "temporal.workflow.workflow_id" in record.__dict__
2036+
assert "temporal.workflow.run_id" in record.__dict__
2037+
assert "temporal.workflow.namespace" in record.__dict__
2038+
assert "temporal.workflow.task_queue" in record.__dict__
2039+
assert record.__dict__["temporal.workflow.attempt"] == 1
2040+
assert update_record.__dict__["temporal.workflow.update_id"] == "update-1"
2041+
assert update_record.__dict__["temporal.workflow.update_name"] == "my_update"
2042+
2043+
# Verify all temporal.workflow.* values are primitives (OTel-safe)
2044+
for key, value in record.__dict__.items():
2045+
if key.startswith("temporal.workflow."):
2046+
assert isinstance(
2047+
value, (str, int, float, bool, type(None))
2048+
), f"Key {key} has non-primitive value: {type(value)}"
2049+
finally:
2050+
workflow.logger.temporal_extra_mode = original_mode
2051+
2052+
2053+
async def test_workflow_logging_replay(client: Client):
2054+
"""Test that replayed logs are suppressed and full_workflow_info_on_extra works."""
19952055
workflow.logger.full_workflow_info_on_extra = True
19962056
with LogCapturer().logs_captured(
19972057
workflow.logger.base_logger, activity.logger.base_logger
@@ -2007,43 +2067,22 @@ async def test_workflow_logging(client: Client):
20072067
id=f"workflow-{uuid.uuid4()}",
20082068
task_queue=worker.task_queue,
20092069
)
2010-
# Send some signals and updates
20112070
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
20122071
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
20132072
await handle.execute_update(
20142073
LoggingWorkflow.my_update, "update 1", id="update-1"
20152074
)
2016-
await handle.execute_update(
2017-
LoggingWorkflow.my_update, "update 2", id="update-2"
2018-
)
20192075
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
20202076

20212077
# Confirm logs were produced
2022-
assert capturer.find_log("Signal: signal 1 ({'attempt':")
2078+
assert capturer.find_log("Signal: signal 1")
20232079
assert capturer.find_log("Signal: signal 2")
20242080
assert capturer.find_log("Update: update 1")
2025-
assert capturer.find_log("Update: update 2")
20262081
assert capturer.find_log("Query called")
2027-
assert not capturer.find_log("Signal: signal 3")
2028-
# Also make sure it has some workflow info and correct funcName
2029-
record = capturer.find_log("Signal: signal 1")
2030-
assert (
2031-
record
2032-
and record.__dict__["temporal_workflow"]["workflow_type"]
2033-
== "LoggingWorkflow"
2034-
and record.funcName == "my_signal"
2035-
)
2082+
20362083
# Since we enabled full info, make sure it's there
2037-
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
2038-
# Check the log emitted by the update execution.
2039-
record = capturer.find_log("Update: update 1")
2040-
assert (
2041-
record
2042-
and record.__dict__["temporal_workflow"]["update_id"] == "update-1"
2043-
and record.__dict__["temporal_workflow"]["update_name"] == "my_update"
2044-
and "'update_id': 'update-1'" in record.message
2045-
and "'update_name': 'my_update'" in record.message
2046-
)
2084+
record = capturer.find_log("Signal: signal 1")
2085+
assert record and isinstance(record.__dict__["workflow_info"], workflow.Info)
20472086

20482087
# Clear queue and start a new one with more signals
20492088
capturer.log_queue.queue.clear()
@@ -2053,7 +2092,6 @@ async def test_workflow_logging(client: Client):
20532092
task_queue=worker.task_queue,
20542093
max_cached_workflows=0,
20552094
) as worker:
2056-
# Send signals and updates
20572095
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
20582096
await handle.signal(LoggingWorkflow.my_signal, "finish")
20592097
await handle.result()
@@ -2065,60 +2103,6 @@ async def test_workflow_logging(client: Client):
20652103
assert capturer.find_log("Signal: finish")
20662104

20672105

2068-
async def test_workflow_logging_flatten_mode(client: Client):
2069-
"""Test that flatten mode produces OTel-safe scalar attributes."""
2070-
# Save original mode and set to flatten
2071-
original_mode = workflow.logger.temporal_extra_mode
2072-
workflow.logger.temporal_extra_mode = "flatten"
2073-
2074-
try:
2075-
with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer:
2076-
async with new_worker(
2077-
client, LoggingWorkflow, max_cached_workflows=0
2078-
) as worker:
2079-
handle = await client.start_workflow(
2080-
LoggingWorkflow.run,
2081-
id=f"workflow-{uuid.uuid4()}",
2082-
task_queue=worker.task_queue,
2083-
)
2084-
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2085-
await handle.execute_update(
2086-
LoggingWorkflow.my_update, "update 1", id="update-1"
2087-
)
2088-
await handle.signal(LoggingWorkflow.my_signal, "finish")
2089-
await handle.result()
2090-
2091-
# Check signal log record
2092-
record = capturer.find_log("Signal: signal 1")
2093-
assert record is not None
2094-
2095-
# Should NOT have nested dict
2096-
assert "temporal_workflow" not in record.__dict__
2097-
2098-
# Should have flattened keys with temporal.workflow prefix
2099-
assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow"
2100-
assert "temporal.workflow.workflow_id" in record.__dict__
2101-
assert "temporal.workflow.run_id" in record.__dict__
2102-
assert "temporal.workflow.namespace" in record.__dict__
2103-
assert "temporal.workflow.task_queue" in record.__dict__
2104-
assert record.__dict__["temporal.workflow.attempt"] == 1
2105-
2106-
# Verify all temporal.workflow.* values are primitives (OTel-safe)
2107-
for key, value in record.__dict__.items():
2108-
if key.startswith("temporal.workflow."):
2109-
assert isinstance(
2110-
value, (str, int, float, bool, type(None))
2111-
), f"Key {key} has non-primitive value: {type(value)}"
2112-
2113-
# Check update log record has flattened update fields
2114-
update_record = capturer.find_log("Update: update 1")
2115-
assert update_record is not None
2116-
assert update_record.__dict__["temporal.workflow.update_id"] == "update-1"
2117-
assert update_record.__dict__["temporal.workflow.update_name"] == "my_update"
2118-
finally:
2119-
workflow.logger.temporal_extra_mode = original_mode
2120-
2121-
21222106
@activity.defn
21232107
async def task_fail_once_activity() -> None:
21242108
if activity.info().attempt == 1:

0 commit comments

Comments
 (0)