diff --git a/airflow-core/newsfragments/66564.bugfix.rst b/airflow-core/newsfragments/66564.bugfix.rst new file mode 100644 index 0000000000000..c18c56e3c2491 --- /dev/null +++ b/airflow-core/newsfragments/66564.bugfix.rst @@ -0,0 +1 @@ +Fix deserialization of operators that store ``NOTSET`` (an ``ArgNotSet`` sentinel) on a serialized field. ``OperatorSerialization._deserialize_field_value`` now restores the ``NOTSET`` singleton for any field encoded as ``{__type: ARG_NOT_SET}`` instead of returning the raw encoding dict (or raising for ``_date``-suffixed fields). The user-visible repro is ``TriggerDagRunOperator(logical_date=NOTSET)`` which previously failed to round-trip through ``SerializedDAG``. diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index cf0e2dc149f02..fcdc6119454f4 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1575,6 +1575,20 @@ def _deserialize_field_value(cls, field_name: str, value: Any) -> Any: :param value: The value to deserialize :return: The deserialized value """ + # Restore the ``NOTSET`` singleton for any field whose value was serialized as + # the generic ARG_NOT_SET encoding. This is checked before the field-name + # dispatch so the behavior is symmetric with how ARG_NOT_SET is emitted by + # the serializer (a generic encoding, not tied to any field type) and so + # any future operator that stores ``NOTSET`` on a non-date field also + # round-trips correctly. Without this, deserializing operators such as + # ``TriggerDagRunOperator`` (whose ``logical_date`` defaults to ``NOTSET``) + # would either fail in ``_deserialize_datetime`` or return the raw encoding + # dict from the catch-all branch. + if isinstance(value, dict) and value.get(Encoding.TYPE) == DAT.ARG_NOT_SET: + from airflow.serialization.definitions.notset import NOTSET + + return NOTSET + if field_name == "downstream_task_ids": return set(value) if value is not None else set() elif field_name in _HAS_CALLBACK_FIELDS: diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index e672e93cd2313..fbc499b2e6f72 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -1011,6 +1011,51 @@ def test_deserialize_datetime_with_timestamp(self): assert isinstance(result, datetime) assert result.timestamp() == timestamp + @pytest.mark.parametrize( + "field_name", + ["logical_date", "start_date", "end_date", "trigger_dag_id", "any_other_field"], + ) + def test_deserialize_field_value_with_arg_not_set(self, field_name): + """``_deserialize_field_value`` returns ``NOTSET`` for any ARG_NOT_SET-encoded field. + + Operators may store ``NOTSET`` (an ``ArgNotSet`` instance) on a field to mean + "use the default at runtime". When the field is round-tripped through + serialization the value is encoded as ``{Encoding.TYPE: DAT.ARG_NOT_SET}`` + regardless of field type, so ``_deserialize_field_value`` must restore the + ``NOTSET`` singleton irrespective of the field name (covers the date-typed + case demonstrated by ``TriggerDagRunOperator.logical_date`` and the + non-date case for any future operator that uses ``NOTSET`` as a sentinel). + """ + from airflow.serialization.definitions.notset import NOTSET + from airflow.serialization.serialized_objects import OperatorSerialization + + value = {Encoding.TYPE: DAT.ARG_NOT_SET} + assert OperatorSerialization._deserialize_field_value(field_name, value) is NOTSET + + def test_serialize_deserialize_trigger_dag_run_with_notset_logical_date(self): + """End-to-end round trip: serialize ``TriggerDagRunOperator(logical_date=NOTSET)`` + and confirm the deserialized task carries the ``NOTSET`` singleton on + ``logical_date``. + + This pins the symmetry between the serializer (which emits + ``{Encoding.TYPE: DAT.ARG_NOT_SET}`` for ``NOTSET`` values) and the + deserializer fix in ``_deserialize_field_value``. Without the fix, the + deserializer would raise (or return the raw encoding dict) and this test + would fail. + """ + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + from airflow.serialization.definitions.notset import NOTSET + from airflow.serialization.serialized_objects import DagSerialization + + with DAG(DAG_ID, start_date=DEFAULT_DATE) as dag: + TriggerDagRunOperator(task_id="test_trigger", trigger_dag_id="downstream_dag") + + serialized = DagSerialization.serialize_dag(dag) + deserialized = DagSerialization.deserialize_dag(serialized) + + task = deserialized.task_dict["test_trigger"] + assert task.logical_date is NOTSET + class TestRetryPolicySerialization: """Test that retry_policy is serialized as a boolean flag (has_retry_policy)."""