From b967d44517e866b490c7efd665e189817cd854d6 Mon Sep 17 00:00:00 2001 From: hwang-cadent Date: Thu, 7 May 2026 11:46:50 -0500 Subject: [PATCH 1/2] Fix ARG_NOT_SET deserialization for ``_date`` fields ``OperatorSerialization._deserialize_field_value`` previously sent every ``*_date`` field to ``_deserialize_datetime`` unconditionally. When an operator stores a date-suffixed field as ``NOTSET`` (an ``ArgNotSet`` sentinel meaning "use the default at runtime") the value is encoded as ``{__type: ARG_NOT_SET}``. Passing that encoded value to ``_deserialize_datetime`` fails because it is not a datetime payload, so deserializing such an operator (e.g. ``TriggerDagRunOperator`` whose ``logical_date`` defaults to ``NOTSET``) raised an error. This change short-circuits ARG_NOT_SET on date fields and restores the ``NOTSET`` singleton, leaving ``None`` and real datetimes unchanged. Adds a direct unit test in ``test_serialized_objects.py`` covering ``logical_date``, ``start_date`` and ``end_date``. --- .../airflow/serialization/serialized_objects.py | 8 ++++++++ .../serialization/test_serialized_objects.py | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index cf0e2dc149f02..fe9fe24622dd4 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1587,6 +1587,14 @@ def _deserialize_field_value(cls, field_name: str, value: Any) -> Any: elif field_name == "resources": return Resources.from_dict(value) if value is not None else None elif field_name.endswith("_date"): + # Handle ARG_NOT_SET (NOTSET singleton) before trying to parse as datetime. + # Without this, deserializing operators that store a date field as ``NOTSET`` + # (e.g. ``TriggerDagRunOperator.logical_date``) would fail because + # ``_deserialize_datetime`` does not understand the ARG_NOT_SET encoding. + if isinstance(value, dict) and value.get(Encoding.TYPE) == DAT.ARG_NOT_SET: + from airflow.serialization.definitions.notset import NOTSET + + return NOTSET return cls._deserialize_datetime(value) if value is not None else None else: # For all other fields, return as-is (strings, ints, bools, etc.) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index e672e93cd2313..be8cadf56e925 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -1011,6 +1011,23 @@ def test_deserialize_datetime_with_timestamp(self): assert isinstance(result, datetime) assert result.timestamp() == timestamp + def test_deserialize_field_value_with_arg_not_set_for_date_fields(self): + """``_deserialize_field_value`` returns ``NOTSET`` for ARG_NOT_SET date fields. + + Operators may store ``NOTSET`` (an ``ArgNotSet`` instance) on date-suffixed + fields such as ``logical_date`` to mean "use the default at runtime". When + such a field is round-tripped through serialization, the ARG_NOT_SET + encoding must be restored to ``NOTSET`` instead of being parsed as a + datetime. + """ + from airflow.serialization.definitions.notset import NOTSET + from airflow.serialization.serialized_objects import OperatorSerialization + + value = {Encoding.TYPE: DAT.ARG_NOT_SET} + + for field_name in ("logical_date", "start_date", "end_date"): + assert OperatorSerialization._deserialize_field_value(field_name, value) is NOTSET + class TestRetryPolicySerialization: """Test that retry_policy is serialized as a boolean flag (has_retry_policy).""" From 6bd4aaba44333f5c7a6d41988f89f1c9b393c636 Mon Sep 17 00:00:00 2001 From: hwang-cadent Date: Mon, 11 May 2026 11:11:12 -0500 Subject: [PATCH 2/2] Address review: hoist ARG_NOT_SET handling, add round-trip test, newsfragment Per review feedback from @potiuk: * Hoist the ARG_NOT_SET check above the field-name dispatch in ``OperatorSerialization._deserialize_field_value`` so the behavior is symmetric with how the serializer emits ARG_NOT_SET (a generic encoding, not tied to any field type). Previously the fix was scoped only to ``*_date`` fields, which would silently leave the raw encoding dict on any future non-date field that uses ``NOTSET`` as a default. * Replace the ``for`` loop in the unit test with ``@pytest.mark.parametrize`` and extend the parameter set to a non-date field so a failure shows the offending case in pytest output and so the test pins the type-agnostic behavior. * Add ``test_serialize_deserialize_trigger_dag_run_with_notset_logical_date``, a round-trip test that serializes ``TriggerDagRunOperator(logical_date=NOTSET)`` through ``DagSerialization`` and asserts the deserialized task carries the ``NOTSET`` singleton. This pins the symmetry between the serializer and deserializer (the original test only proved the deserialize half). * Add ``66564.bugfix.rst`` newsfragment describing the user-visible bug and the fix. --- airflow-core/newsfragments/66564.bugfix.rst | 1 + .../serialization/serialized_objects.py | 22 +++++---- .../serialization/test_serialized_objects.py | 48 +++++++++++++++---- 3 files changed, 53 insertions(+), 18 deletions(-) create mode 100644 airflow-core/newsfragments/66564.bugfix.rst diff --git a/airflow-core/newsfragments/66564.bugfix.rst b/airflow-core/newsfragments/66564.bugfix.rst new file mode 100644 index 0000000000000..c18c56e3c2491 --- /dev/null +++ b/airflow-core/newsfragments/66564.bugfix.rst @@ -0,0 +1 @@ +Fix deserialization of operators that store ``NOTSET`` (an ``ArgNotSet`` sentinel) on a serialized field. ``OperatorSerialization._deserialize_field_value`` now restores the ``NOTSET`` singleton for any field encoded as ``{__type: ARG_NOT_SET}`` instead of returning the raw encoding dict (or raising for ``_date``-suffixed fields). The user-visible repro is ``TriggerDagRunOperator(logical_date=NOTSET)`` which previously failed to round-trip through ``SerializedDAG``. diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index fe9fe24622dd4..fcdc6119454f4 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1575,6 +1575,20 @@ def _deserialize_field_value(cls, field_name: str, value: Any) -> Any: :param value: The value to deserialize :return: The deserialized value """ + # Restore the ``NOTSET`` singleton for any field whose value was serialized as + # the generic ARG_NOT_SET encoding. This is checked before the field-name + # dispatch so the behavior is symmetric with how ARG_NOT_SET is emitted by + # the serializer (a generic encoding, not tied to any field type) and so + # any future operator that stores ``NOTSET`` on a non-date field also + # round-trips correctly. Without this, deserializing operators such as + # ``TriggerDagRunOperator`` (whose ``logical_date`` defaults to ``NOTSET``) + # would either fail in ``_deserialize_datetime`` or return the raw encoding + # dict from the catch-all branch. + if isinstance(value, dict) and value.get(Encoding.TYPE) == DAT.ARG_NOT_SET: + from airflow.serialization.definitions.notset import NOTSET + + return NOTSET + if field_name == "downstream_task_ids": return set(value) if value is not None else set() elif field_name in _HAS_CALLBACK_FIELDS: @@ -1587,14 +1601,6 @@ def _deserialize_field_value(cls, field_name: str, value: Any) -> Any: elif field_name == "resources": return Resources.from_dict(value) if value is not None else None elif field_name.endswith("_date"): - # Handle ARG_NOT_SET (NOTSET singleton) before trying to parse as datetime. - # Without this, deserializing operators that store a date field as ``NOTSET`` - # (e.g. ``TriggerDagRunOperator.logical_date``) would fail because - # ``_deserialize_datetime`` does not understand the ARG_NOT_SET encoding. - if isinstance(value, dict) and value.get(Encoding.TYPE) == DAT.ARG_NOT_SET: - from airflow.serialization.definitions.notset import NOTSET - - return NOTSET return cls._deserialize_datetime(value) if value is not None else None else: # For all other fields, return as-is (strings, ints, bools, etc.) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index be8cadf56e925..fbc499b2e6f72 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -1011,22 +1011,50 @@ def test_deserialize_datetime_with_timestamp(self): assert isinstance(result, datetime) assert result.timestamp() == timestamp - def test_deserialize_field_value_with_arg_not_set_for_date_fields(self): - """``_deserialize_field_value`` returns ``NOTSET`` for ARG_NOT_SET date fields. - - Operators may store ``NOTSET`` (an ``ArgNotSet`` instance) on date-suffixed - fields such as ``logical_date`` to mean "use the default at runtime". When - such a field is round-tripped through serialization, the ARG_NOT_SET - encoding must be restored to ``NOTSET`` instead of being parsed as a - datetime. + @pytest.mark.parametrize( + "field_name", + ["logical_date", "start_date", "end_date", "trigger_dag_id", "any_other_field"], + ) + def test_deserialize_field_value_with_arg_not_set(self, field_name): + """``_deserialize_field_value`` returns ``NOTSET`` for any ARG_NOT_SET-encoded field. + + Operators may store ``NOTSET`` (an ``ArgNotSet`` instance) on a field to mean + "use the default at runtime". When the field is round-tripped through + serialization the value is encoded as ``{Encoding.TYPE: DAT.ARG_NOT_SET}`` + regardless of field type, so ``_deserialize_field_value`` must restore the + ``NOTSET`` singleton irrespective of the field name (covers the date-typed + case demonstrated by ``TriggerDagRunOperator.logical_date`` and the + non-date case for any future operator that uses ``NOTSET`` as a sentinel). """ from airflow.serialization.definitions.notset import NOTSET from airflow.serialization.serialized_objects import OperatorSerialization value = {Encoding.TYPE: DAT.ARG_NOT_SET} + assert OperatorSerialization._deserialize_field_value(field_name, value) is NOTSET + + def test_serialize_deserialize_trigger_dag_run_with_notset_logical_date(self): + """End-to-end round trip: serialize ``TriggerDagRunOperator(logical_date=NOTSET)`` + and confirm the deserialized task carries the ``NOTSET`` singleton on + ``logical_date``. + + This pins the symmetry between the serializer (which emits + ``{Encoding.TYPE: DAT.ARG_NOT_SET}`` for ``NOTSET`` values) and the + deserializer fix in ``_deserialize_field_value``. Without the fix, the + deserializer would raise (or return the raw encoding dict) and this test + would fail. + """ + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + from airflow.serialization.definitions.notset import NOTSET + from airflow.serialization.serialized_objects import DagSerialization + + with DAG(DAG_ID, start_date=DEFAULT_DATE) as dag: + TriggerDagRunOperator(task_id="test_trigger", trigger_dag_id="downstream_dag") + + serialized = DagSerialization.serialize_dag(dag) + deserialized = DagSerialization.deserialize_dag(serialized) - for field_name in ("logical_date", "start_date", "end_date"): - assert OperatorSerialization._deserialize_field_value(field_name, value) is NOTSET + task = deserialized.task_dict["test_trigger"] + assert task.logical_date is NOTSET class TestRetryPolicySerialization: