Skip to content
25 changes: 25 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import contextlib
import datetime
import json
import operator
import os
import pathlib
Expand Down Expand Up @@ -2873,6 +2874,30 @@ def test_defer_task(create_task_instance):
assert ti.trigger_timeout is None


def test_defer_task_serializes_non_json_next_kwargs(create_task_instance):
from airflow.sdk.serde import deserialize
from airflow.triggers.base import StartTriggerArgs

session = mock.Mock(spec=["add", "flush"])
delay = datetime.timedelta(minutes=5)
start_at = timezone.utcnow()
ti = create_task_instance(
dag_id="test_defer_task_serializes_non_json_next_kwargs",
task_id="test_defer_task_serializes_non_json_next_kwargs_op",
start_from_trigger=True,
start_trigger_args=StartTriggerArgs(
trigger_cls="trigger_cls",
next_method="next_method",
trigger_kwargs={"key": "value"},
next_kwargs={"start_at": start_at, "delay": delay},
),
)

assert ti.defer_task(session=session)
json.dumps(ti.next_kwargs)
assert deserialize(ti.next_kwargs) == {"start_at": start_at, "delay": delay}
Comment thread
dabla marked this conversation as resolved.


def test_defer_task_with_trigger_timeout(create_task_instance):
from airflow.models.trigger import Trigger
from airflow.triggers.base import StartTriggerArgs
Expand Down
Loading