Skip to content

Commit 9ac1e8a

Browse files
committed
fix(logging): add OTel-safe extra modes for LoggerAdapter
1 parent da9b1a6 commit 9ac1e8a

7 files changed

Lines changed: 732 additions & 6 deletions

File tree

README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,10 +1018,21 @@ By default the sandbox completely reloads non-standard-library and non-Temporal
10181018
the sandbox quicker and use less memory when importing known-side-effect-free modules, they can be marked
10191019
as passthrough modules.
10201020

1021-
**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be
1021+
**Passthrough modules are about import-time behavior and determinism, not just about whether a module is third-party.**
1022+
Any module that is side-effect-free on import and makes only deterministic calls can be passed through, including
1023+
first-party application modules, third-party libraries, and non-workflow-specific code. The key criteria are:
1024+
1. The module does not have import-time side effects (e.g., file I/O, network calls, random values)
1025+
2. Calls made from the module are deterministic within workflow code
1026+
1027+
**For performance and behavior reasons, users are encouraged to pass through all non-workflow imports whose calls will be
10221028
deterministic.** In particular, this advice extends to modules containing the activities to be referenced in workflows,
10231029
and modules containing dataclasses and Pydantic models, which can be particularly expensive to import.
10241030

1031+
**Note on datetime and similar stdlib modules:** If you need to use non-deterministic functions like `datetime.date.today()`,
1032+
do **not** mark `datetime` as passthrough. Instead, use `with_child_unrestricted()` to allow specific invalid members
1033+
(see [Invalid Module Members](#invalid-module-members) below). Passthrough affects import reloading, while
1034+
`invalid_module_members` controls which calls are allowed at runtime.
1035+
10251036
One way to pass through a module is at import time in the workflow file using the `imports_passed_through` context
10261037
manager like so:
10271038

@@ -1071,7 +1082,7 @@ Note, some calls from the module may still be checked for invalid calls at runti
10711082

10721083
`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already
10731084
has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To
1074-
remove this restriction:
1085+
remove this restriction and allow a specific call like `datetime.date.today()`:
10751086

10761087
```python
10771088
my_restrictions = dataclasses.replace(
@@ -1083,6 +1094,12 @@ my_restrictions = dataclasses.replace(
10831094
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
10841095
```
10851096

1097+
**This is the correct approach for allowing non-deterministic stdlib calls.** Do not use passthrough modules for this
1098+
purpose—passthrough controls import reloading, while `invalid_module_members` controls runtime call restrictions.
1099+
1100+
For a complete example showing both passthrough modules and unrestricted invalid members, see the
1101+
[pydantic_converter sample worker configuration](https://github.com/temporalio/samples-python/blob/4303a9b15f4ddc4cd770bc0ba33afef90a25d3ae/pydantic_converter/worker.py#L45-L65).
1102+
10861103
Restrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from
10871104
being used altogether:
10881105

@@ -1129,7 +1146,7 @@ To mitigate this, users should:
11291146

11301147
* Define workflows in files that have as few non-standard-library imports as possible
11311148
* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large
1132-
* Set third-party libraries as passthrough modules if they are known to be side-effect free
1149+
* Set non-workflow imports as passthrough modules if they are known to be side-effect free on import and deterministic
11331150

11341151
###### Extending Restricted Classes
11351152

temporalio/_log_utils.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Internal utilities for Temporal logging.
2+
3+
This module is internal and may change at any time.
4+
"""
5+
6+
from __future__ import annotations
7+
8+
import json
9+
from collections.abc import Mapping, MutableMapping
10+
from typing import Any, Literal
11+
12+
TemporalLogExtraMode = Literal["dict", "flatten", "json"]
13+
"""Mode controlling how Temporal context is added to log record extra.
14+
15+
Values:
16+
dict: (default) Add context as a nested dictionary under a single key.
17+
This is the original behavior. Suitable for logging handlers that
18+
support nested structures.
19+
flatten: Add each context field as a separate top-level key with a
20+
namespaced prefix. Values that are not primitives (str/int/float/bool)
21+
are converted to strings. This mode is recommended for OpenTelemetry
22+
and other logging pipelines that require flat, scalar attributes.
23+
json: Add context as a JSON string under a single key. Useful when
24+
downstream systems expect string values but you want structured data.
25+
"""
26+
27+
28+
def _apply_temporal_context_to_extra(
29+
extra: MutableMapping[str, Any],
30+
*,
31+
key: str,
32+
prefix: str,
33+
ctx: Mapping[str, Any],
34+
mode: TemporalLogExtraMode,
35+
) -> None:
36+
"""Apply temporal context to log record extra based on the configured mode.
37+
38+
Args:
39+
extra: The mutable extra dict to update.
40+
key: The key to use for dict/json modes (e.g., "temporal_workflow").
41+
prefix: The prefix to use for flatten mode keys (e.g., "temporal.workflow").
42+
ctx: The context mapping containing temporal fields.
43+
mode: The mode controlling how context is added.
44+
"""
45+
if mode == "dict":
46+
extra[key] = dict(ctx)
47+
elif mode == "json":
48+
extra[key] = json.dumps(ctx, separators=(",", ":"), default=str)
49+
elif mode == "flatten":
50+
for k, v in ctx.items():
51+
# Ensure value is a primitive type safe for OTel attributes
52+
if not isinstance(v, (str, int, float, bool, type(None))):
53+
v = str(v)
54+
extra[f"{prefix}.{k}"] = v
55+
else:
56+
# Fallback to dict for any unknown mode (shouldn't happen with typing)
57+
extra[key] = dict(ctx)
58+
59+
60+
def _update_temporal_context_in_extra(
61+
extra: MutableMapping[str, Any],
62+
*,
63+
key: str,
64+
prefix: str,
65+
update_ctx: Mapping[str, Any],
66+
mode: TemporalLogExtraMode,
67+
) -> None:
68+
"""Update existing temporal context in extra with additional fields.
69+
70+
This is used when adding update info to existing workflow context.
71+
72+
Args:
73+
extra: The mutable extra dict to update.
74+
key: The key used for dict/json modes (e.g., "temporal_workflow").
75+
prefix: The prefix used for flatten mode keys (e.g., "temporal.workflow").
76+
update_ctx: Additional context fields to add/update.
77+
mode: The mode controlling how context is added.
78+
"""
79+
if mode == "dict":
80+
extra.setdefault(key, {}).update(update_ctx)
81+
elif mode == "json":
82+
# For JSON mode, we need to parse, update, and re-serialize
83+
existing = extra.get(key)
84+
if existing is not None:
85+
try:
86+
existing_dict = json.loads(existing)
87+
existing_dict.update(update_ctx)
88+
extra[key] = json.dumps(
89+
existing_dict, separators=(",", ":"), default=str
90+
)
91+
except (json.JSONDecodeError, TypeError):
92+
# If parsing fails, just create a new JSON object with update_ctx
93+
extra[key] = json.dumps(
94+
dict(update_ctx), separators=(",", ":"), default=str
95+
)
96+
else:
97+
extra[key] = json.dumps(
98+
dict(update_ctx), separators=(",", ":"), default=str
99+
)
100+
elif mode == "flatten":
101+
for k, v in update_ctx.items():
102+
# Ensure value is a primitive type safe for OTel attributes
103+
if not isinstance(v, (str, int, float, bool, type(None))):
104+
v = str(v)
105+
extra[f"{prefix}.{k}"] = v
106+
else:
107+
# Fallback to dict for any unknown mode
108+
extra.setdefault(key, {}).update(update_ctx)

temporalio/activity.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import temporalio.common
3333
import temporalio.converter
3434

35+
from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra
3536
from .types import CallableType
3637

3738
if TYPE_CHECKING:
@@ -500,6 +501,11 @@ class LoggerAdapter(logging.LoggerAdapter):
500501
value will be added to the ``extra`` dictionary with the entire
501502
activity info, making it present on the ``LogRecord.__dict__`` for
502503
use by others. Default is False.
504+
temporal_extra_mode: Controls how activity context is added to log
505+
``extra``. Default is ``"dict"`` (current behavior). Set to
506+
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
507+
with ``temporal.activity.`` prefix), or ``"json"`` for a single JSON
508+
string value.
503509
"""
504510

505511
def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None:
@@ -508,6 +514,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
508514
self.activity_info_on_message = True
509515
self.activity_info_on_extra = True
510516
self.full_activity_info_on_extra = False
517+
self.temporal_extra_mode: TemporalLogExtraMode = "dict"
511518

512519
def process(
513520
self, msg: Any, kwargs: MutableMapping[str, Any]
@@ -525,7 +532,13 @@ def process(
525532
if self.activity_info_on_extra:
526533
# Extra can be absent or None, this handles both
527534
extra = kwargs.get("extra", None) or {}
528-
extra["temporal_activity"] = context.logger_details
535+
_apply_temporal_context_to_extra(
536+
extra,
537+
key="temporal_activity",
538+
prefix="temporal.activity",
539+
ctx=context.logger_details,
540+
mode=self.temporal_extra_mode,
541+
)
529542
kwargs["extra"] = extra
530543
if self.full_activity_info_on_extra:
531544
# Extra can be absent or None, this handles both

temporalio/workflow.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
import temporalio.workflow
6262
from temporalio.nexus._util import ServiceHandlerT
6363

64+
from ._log_utils import (
65+
TemporalLogExtraMode,
66+
_apply_temporal_context_to_extra,
67+
_update_temporal_context_in_extra,
68+
)
6469
from .types import (
6570
AnyType,
6671
CallableAsyncNoParam,
@@ -1569,6 +1574,11 @@ class LoggerAdapter(logging.LoggerAdapter):
15691574
use by others. Default is False.
15701575
log_during_replay: Boolean for whether logs should occur during replay.
15711576
Default is False.
1577+
temporal_extra_mode: Controls how workflow context is added to log
1578+
``extra``. Default is ``"dict"`` (current behavior). Set to
1579+
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
1580+
with ``temporal.workflow.`` prefix), or ``"json"`` for a single JSON
1581+
string value.
15721582
15731583
Values added to ``extra`` are merged with the ``extra`` dictionary from a
15741584
logging call, with values from the logging call taking precedence. I.e. the
@@ -1582,6 +1592,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
15821592
self.workflow_info_on_extra = True
15831593
self.full_workflow_info_on_extra = False
15841594
self.log_during_replay = False
1595+
self.temporal_extra_mode: TemporalLogExtraMode = "dict"
15851596
self.disable_sandbox = False
15861597

15871598
def process(
@@ -1602,7 +1613,13 @@ def process(
16021613
if self.workflow_info_on_message:
16031614
msg_extra.update(workflow_details)
16041615
if self.workflow_info_on_extra:
1605-
extra["temporal_workflow"] = workflow_details
1616+
_apply_temporal_context_to_extra(
1617+
extra,
1618+
key="temporal_workflow",
1619+
prefix="temporal.workflow",
1620+
ctx=workflow_details,
1621+
mode=self.temporal_extra_mode,
1622+
)
16061623
if self.full_workflow_info_on_extra:
16071624
extra["workflow_info"] = runtime.workflow_info()
16081625
update_info = current_update_info()
@@ -1611,7 +1628,13 @@ def process(
16111628
if self.workflow_info_on_message:
16121629
msg_extra.update(update_details)
16131630
if self.workflow_info_on_extra:
1614-
extra.setdefault("temporal_workflow", {}).update(update_details)
1631+
_update_temporal_context_in_extra(
1632+
extra,
1633+
key="temporal_workflow",
1634+
prefix="temporal.workflow",
1635+
update_ctx=update_details,
1636+
mode=self.temporal_extra_mode,
1637+
)
16151638

16161639
kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
16171640
if msg_extra:

0 commit comments

Comments
 (0)