Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66564.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deserialization of operators that store ``NOTSET`` (an ``ArgNotSet`` sentinel) on a serialized field. ``OperatorSerialization._deserialize_field_value`` now restores the ``NOTSET`` singleton for any field encoded as ``{__type: ARG_NOT_SET}`` instead of returning the raw encoding dict (or raising for ``_date``-suffixed fields). The user-visible repro is ``TriggerDagRunOperator(logical_date=NOTSET)`` which previously failed to round-trip through ``SerializedDAG``.
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,20 @@ def _deserialize_field_value(cls, field_name: str, value: Any) -> Any:
:param value: The value to deserialize
:return: The deserialized value
"""
# Restore the ``NOTSET`` singleton for any field whose value was serialized as
# the generic ARG_NOT_SET encoding. This is checked before the field-name
# dispatch so the behavior is symmetric with how ARG_NOT_SET is emitted by
# the serializer (a generic encoding, not tied to any field type) and so
# any future operator that stores ``NOTSET`` on a non-date field also
# round-trips correctly. Without this, deserializing operators such as
# ``TriggerDagRunOperator`` (whose ``logical_date`` defaults to ``NOTSET``)
# would either fail in ``_deserialize_datetime`` or return the raw encoding
# dict from the catch-all branch.
if isinstance(value, dict) and value.get(Encoding.TYPE) == DAT.ARG_NOT_SET:
from airflow.serialization.definitions.notset import NOTSET

return NOTSET

if field_name == "downstream_task_ids":
return set(value) if value is not None else set()
elif field_name in _HAS_CALLBACK_FIELDS:
Expand Down
45 changes: 45 additions & 0 deletions airflow-core/tests/unit/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,51 @@ def test_deserialize_datetime_with_timestamp(self):
assert isinstance(result, datetime)
assert result.timestamp() == timestamp

@pytest.mark.parametrize(
"field_name",
["logical_date", "start_date", "end_date", "trigger_dag_id", "any_other_field"],
)
def test_deserialize_field_value_with_arg_not_set(self, field_name):
"""``_deserialize_field_value`` returns ``NOTSET`` for any ARG_NOT_SET-encoded field.

Operators may store ``NOTSET`` (an ``ArgNotSet`` instance) on a field to mean
"use the default at runtime". When the field is round-tripped through
serialization the value is encoded as ``{Encoding.TYPE: DAT.ARG_NOT_SET}``
regardless of field type, so ``_deserialize_field_value`` must restore the
``NOTSET`` singleton irrespective of the field name (covers the date-typed
case demonstrated by ``TriggerDagRunOperator.logical_date`` and the
non-date case for any future operator that uses ``NOTSET`` as a sentinel).
"""
from airflow.serialization.definitions.notset import NOTSET
from airflow.serialization.serialized_objects import OperatorSerialization

value = {Encoding.TYPE: DAT.ARG_NOT_SET}
assert OperatorSerialization._deserialize_field_value(field_name, value) is NOTSET

def test_serialize_deserialize_trigger_dag_run_with_notset_logical_date(self):
"""End-to-end round trip: serialize ``TriggerDagRunOperator(logical_date=NOTSET)``
and confirm the deserialized task carries the ``NOTSET`` singleton on
``logical_date``.

This pins the symmetry between the serializer (which emits
``{Encoding.TYPE: DAT.ARG_NOT_SET}`` for ``NOTSET`` values) and the
deserializer fix in ``_deserialize_field_value``. Without the fix, the
deserializer would raise (or return the raw encoding dict) and this test
would fail.
"""
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.serialization.definitions.notset import NOTSET
from airflow.serialization.serialized_objects import DagSerialization

with DAG(DAG_ID, start_date=DEFAULT_DATE) as dag:
TriggerDagRunOperator(task_id="test_trigger", trigger_dag_id="downstream_dag")

serialized = DagSerialization.serialize_dag(dag)
deserialized = DagSerialization.deserialize_dag(serialized)

task = deserialized.task_dict["test_trigger"]
assert task.logical_date is NOTSET


class TestRetryPolicySerialization:
"""Test that retry_policy is serialized as a boolean flag (has_retry_policy)."""
Expand Down
Loading