diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index b9e5452855ff1..c2bf05cd2036b 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -19,6 +19,7 @@ import contextlib import datetime +import json import operator import os import pathlib @@ -2873,6 +2874,30 @@ def test_defer_task(create_task_instance): assert ti.trigger_timeout is None +def test_defer_task_serializes_non_json_next_kwargs(create_task_instance): + from airflow.sdk.serde import deserialize + from airflow.triggers.base import StartTriggerArgs + + session = mock.Mock(spec=["add", "flush"]) + delay = datetime.timedelta(minutes=5) + start_at = timezone.utcnow() + ti = create_task_instance( + dag_id="test_defer_task_serializes_non_json_next_kwargs", + task_id="test_defer_task_serializes_non_json_next_kwargs_op", + start_from_trigger=True, + start_trigger_args=StartTriggerArgs( + trigger_cls="trigger_cls", + next_method="next_method", + trigger_kwargs={"key": "value"}, + next_kwargs={"start_at": start_at, "delay": delay}, + ), + ) + + assert ti.defer_task(session=session) + json.dumps(ti.next_kwargs) + assert deserialize(ti.next_kwargs) == {"start_at": start_at, "delay": delay} + + def test_defer_task_with_trigger_timeout(create_task_instance): from airflow.models.trigger import Trigger from airflow.triggers.base import StartTriggerArgs