From c18e4b042471a078c51eb592425f6c846ad6bb73 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 4 Apr 2026 14:17:42 +0200 Subject: [PATCH 1/2] refactor: Fix task_defer method with non-JSON next_kwargs in TaskInstance --- .../src/airflow/models/taskinstance.py | 5 +++- .../tests/unit/models/test_taskinstance.py | 23 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 964c345a3bf6c..c1a4ef49a7234 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1625,7 +1625,10 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: assert isinstance(self.task, Operator) if start_trigger_args := self.start_trigger_args: + from airflow.sdk.serde import serialize as serde_serialize + trigger_kwargs = start_trigger_args.trigger_kwargs or {} + next_kwargs = serde_serialize(start_trigger_args.next_kwargs or {}) timeout = start_trigger_args.timeout # Calculate timeout too if it was passed @@ -1650,7 +1653,7 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: self.state = TaskInstanceState.DEFERRED self.trigger_id = trigger_row.id self.next_method = start_trigger_args.next_method - self.next_kwargs = start_trigger_args.next_kwargs or {} + self.next_kwargs = next_kwargs self.start_date = timezone.utcnow() # If an execution_timeout is set, set the timeout to the minimum of diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 9eba07daaa130..3ec3ba88b2eb1 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2729,6 +2729,29 @@ def test_defer_task(create_task_instance): assert ti.trigger_timeout is None +def test_defer_task_serializes_non_json_next_kwargs(create_task_instance): + from airflow.sdk.serde import deserialize + from airflow.triggers.base import StartTriggerArgs + + session = mock.Mock(spec=["add", "flush"]) + delay = datetime.timedelta(minutes=5) + start_at = timezone.utcnow() + ti = create_task_instance( + dag_id="test_defer_task_serializes_non_json_next_kwargs", + task_id="test_defer_task_serializes_non_json_next_kwargs_op", + start_from_trigger=True, + start_trigger_args=StartTriggerArgs( + trigger_cls="trigger_cls", + next_method="next_method", + trigger_kwargs={"key": "value"}, + next_kwargs={"start_at": start_at, "delay": delay}, + ), + ) + + assert ti.defer_task(session=session) + assert deserialize(ti.next_kwargs) == {"start_at": start_at, "delay": delay} + + def test_defer_task_with_trigger_timeout(create_task_instance): from airflow.models.trigger import Trigger from airflow.triggers.base import StartTriggerArgs From 4bcd9af6e982a07a1a453d49303fce1ae1d62c27 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 17 Apr 2026 14:21:03 +0200 Subject: [PATCH 2/2] refactor: Make sure to test if next_kwargs are JSON serializable --- airflow-core/tests/unit/models/test_taskinstance.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3ec3ba88b2eb1..70fd56251d2a4 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -19,6 +19,7 @@ import contextlib import datetime +import json import operator import os import pathlib @@ -2749,6 +2750,7 @@ def test_defer_task_serializes_non_json_next_kwargs(create_task_instance): ) assert ti.defer_task(session=session) + json.dumps(ti.next_kwargs) assert deserialize(ti.next_kwargs) == {"start_at": start_at, "delay": delay}