From c41e48712f4d17e6cbb0dc3a4d79dcffe8853e6f Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sat, 21 Mar 2026 22:01:01 -0700 Subject: [PATCH 01/14] feat-58056: Implement AssetAndTimeSchedule for time-based scheduling with asset conditions --- .../asset-scheduling.rst | 31 ++++++- .../authoring-and-scheduling/timetable.rst | 23 +++++- .../src/airflow/jobs/scheduler_job_runner.py | 45 +++++++++++ airflow-core/src/airflow/models/dag.py | 6 ++ airflow-core/src/airflow/timetables/assets.py | 80 ++++++++++++++++++- .../unit/timetables/test_assets_timetable.py | 69 +++++++++++++++- 6 files changed, 248 insertions(+), 6 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst index 1bfe98d715ec2..e3979f3de374b 100644 --- a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst +++ b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst @@ -147,6 +147,30 @@ If one asset is updated multiple times before all consumed assets update, the do } +Gate scheduled runs on asset updates +------------------------------------ + +Use ``AssetAndTimeSchedule`` when you want a Dag to follow a normal time-based timetable but only start after specific assets have been updated. Airflow creates the DagRun at the scheduled time and keeps it queued until every required asset has queued an event. When the DagRun starts, those asset events are consumed so the next scheduled run waits for new updates. This does not create additional asset-triggered runs. + +.. code-block:: python + + from airflow.sdk import DAG, Asset + from airflow.timetables.assets import AssetAndTimeSchedule + from airflow.timetables.trigger import CronTriggerTimetable + + example_asset = Asset("s3://asset/example.csv") + + with DAG( + dag_id="gated_hourly_dag", + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("0 * * * *", timezone="UTC"), + assets=[example_asset], + ), + ..., + ): + ... + + Fetching information from a triggering asset event ---------------------------------------------------- @@ -416,6 +440,9 @@ Combining asset and time-based schedules AssetTimetable Integration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You can schedule Dags based on both asset events and time-based schedules using ``AssetOrTimeSchedule``. This allows you to create workflows when a Dag needs both to be triggered by data updates and run periodically according to a fixed timetable. +Asset-aware timetables combine asset expressions with a time-based schedule: + +* Use ``AssetOrTimeSchedule`` to create runs both on a timetable and when assets update, producing scheduled runs and asset-triggered runs independently. +* Use ``AssetAndTimeSchedule`` to keep a Dag on a timetable but only start those scheduled runs once the referenced assets have been updated. -For more detailed information on ``AssetOrTimeSchedule``, refer to the corresponding section in :ref:`AssetOrTimeSchedule `. +For more detailed information on asset-aware timetables, refer to :ref:`AssetOrTimeSchedule `. diff --git a/airflow-core/docs/authoring-and-scheduling/timetable.rst b/airflow-core/docs/authoring-and-scheduling/timetable.rst index b19cc80211392..87d55a4c9054c 100644 --- a/airflow-core/docs/authoring-and-scheduling/timetable.rst +++ b/airflow-core/docs/authoring-and-scheduling/timetable.rst @@ -268,7 +268,10 @@ Asset event based scheduling with time based scheduling ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Combining conditional asset expressions with time-based schedules enhances scheduling flexibility. -The ``AssetOrTimeSchedule`` is a specialized timetable that allows for the scheduling of Dags based on both time-based schedules and asset events. It also facilitates the creation of both scheduled runs, as per traditional timetables, and asset-triggered runs, which operate independently. +Asset-aware timetables let you combine a time-based schedule with an asset expression: + +* ``AssetOrTimeSchedule`` schedules Dag runs both on the timetable and whenever the assets update. It creates traditional scheduled runs and asset-triggered runs independently. +* ``AssetAndTimeSchedule`` keeps the Dag on a time-based timetable but defers starting a queued run until all referenced assets are ready. When the run starts, the asset events are consumed so the next scheduled run waits for the next set of updates. No asset-triggered runs are created. This feature is particularly useful in scenarios where a Dag needs to run on asset updates and also at periodic intervals. It ensures that the workflow remains responsive to data changes and consistently runs regular checks or updates. @@ -290,6 +293,24 @@ Here's an example of a Dag using ``AssetOrTimeSchedule``: # Dag tasks go here pass +Here's an example of a Dag using ``AssetAndTimeSchedule`` to require both the time-based schedule and fresh assets before a run starts: + +.. code-block:: python + + from airflow.timetables.assets import AssetAndTimeSchedule + from airflow.timetables.trigger import CronTriggerTimetable + + + @dag( + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset) + ) + # Additional arguments here, replace this comment with actual arguments + ) + def example_gated_dag(): + # Dag tasks go here + pass + Timetables comparisons diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 80897213c18b5..10920ed3882b0 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -88,6 +88,7 @@ from airflow.serialization.definitions.assets import SerializedAssetUniqueKey from airflow.serialization.definitions.notset import NOTSET from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES +from airflow.timetables.assets import AssetAndTimeSchedule from airflow.timetables.simple import AssetTriggeredTimetable from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import LoggingMixin @@ -2317,6 +2318,50 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): dag_run.run_id, ) continue + # For AssetAndTimeSchedule, defer starting until all required assets are queued. + if isinstance(dag.timetable, AssetAndTimeSchedule): + # Count required assets for this DAG's schedule + required_count = ( + session.scalar( + select(func.count()) + .select_from(DagScheduleAssetReference) + .where(DagScheduleAssetReference.dag_id == dag_id) + ) + or 0 + ) + + if required_count > 0: + ready_count = ( + session.scalar( + select(func.count()) + .select_from(DagScheduleAssetReference) + .join( + AssetDagRunQueue, + and_( + DagScheduleAssetReference.asset_id == AssetDagRunQueue.asset_id, + DagScheduleAssetReference.dag_id == AssetDagRunQueue.target_dag_id, + ), + ) + .where(DagScheduleAssetReference.dag_id == dag_id) + ) + or 0 + ) + + if ready_count < required_count: + # Not all assets are present; skip starting this run for now. + self.log.debug( + "Deferring DagRun until assets ready; dag_id=%s run_id=%s (ready=%s required=%s)", + dag_id, + run_id, + ready_count, + required_count, + ) + # Do not increment active run counts; we didn't start it. + continue + + # Consume queued asset events for this DAG so the next run gates on new events. + session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_id)) + active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1 _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed(msg="started") diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index ca84b7047b435..f0a4234dbd457 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -696,6 +696,12 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase, statuses: dict[UKey, bool] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + timetable = ser_dag.dag.timetable + + if not isinstance(timetable, AssetTriggeredTimetable): + del adrq_by_dag[dag_id] + continue + ready = dag_ready(dag_id, cond=ser_dag.dag.timetable.asset_condition, statuses=statuses) if not ready: log.debug("Asset condition not met for dag '%s'", dag_id) diff --git a/airflow-core/src/airflow/timetables/assets.py b/airflow-core/src/airflow/timetables/assets.py index 37c6fb4825b69..c73c94988bbb0 100644 --- a/airflow-core/src/airflow/timetables/assets.py +++ b/airflow-core/src/airflow/timetables/assets.py @@ -20,7 +20,10 @@ import typing from airflow.exceptions import AirflowTimetableInvalid -from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetBase +from airflow.sdk.definitions.asset import BaseAsset # TODO: Use serialized classes. +from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAll, SerializedAssetBase +from airflow.serialization.encoders import ensure_serialized_asset +from airflow.timetables.base import Timetable from airflow.timetables.simple import AssetTriggeredTimetable from airflow.utils.types import DagRunType @@ -29,7 +32,8 @@ import pendulum - from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable + from airflow.sdk.definitions.asset import Asset + from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction class AssetOrTimeSchedule(AssetTriggeredTimetable): @@ -90,3 +94,75 @@ def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str: if run_type != DagRunType.ASSET_TRIGGERED: return self.timetable.generate_run_id(run_type=run_type, **kwargs) return super().generate_run_id(run_type=run_type, **kwargs) + + +class AssetAndTimeSchedule(Timetable): + """ + Time-based schedule that waits for required assets before starting. + + This timetable composes a time-based timetable with an asset condition. It + schedules runs according to the provided ``timetable`` (e.g. cron), but a + queued run will only start when all required assets are present. Unlike + :class:`AssetOrTimeSchedule`, this does not create asset-triggered runs. + """ + + def __init__( + self, + *, + timetable: Timetable, + assets: Collection[Asset] | BaseAsset | SerializedAssetBase, + ) -> None: + self.timetable = timetable + + if isinstance(assets, SerializedAssetBase | BaseAsset): + self.asset_condition = ensure_serialized_asset(assets) + else: + self.asset_condition = SerializedAssetAll([ensure_serialized_asset(a) for a in assets]) + + self.description = f"Triggered by assets and {timetable.description}" + self.periodic = timetable.periodic + self.can_be_scheduled = timetable.can_be_scheduled + self.active_runs_limit = timetable.active_runs_limit + + @classmethod + def deserialize(cls, data: dict[str, typing.Any]) -> Timetable: + from airflow.serialization.decoders import decode_asset_like, decode_timetable + + return cls( + assets=decode_asset_like(data["asset_condition"]), + timetable=decode_timetable(data["timetable"]), + ) + + def serialize(self) -> dict[str, typing.Any]: + from airflow.serialization.encoders import encode_asset_like, encode_timetable + + return { + "asset_condition": encode_asset_like(self.asset_condition), + "timetable": encode_timetable(self.timetable), + } + + def validate(self) -> None: + if isinstance(self.timetable, AssetTriggeredTimetable): + raise AirflowTimetableInvalid("cannot nest asset timetables") + if not isinstance(self.asset_condition, SerializedAssetBase): + raise AirflowTimetableInvalid("all elements in 'assets' must be assets") + + @property + def summary(self) -> str: + return f"Asset and {self.timetable.summary}" + + def infer_manual_data_interval(self, *, run_after: pendulum.DateTime) -> DataInterval: + return self.timetable.infer_manual_data_interval(run_after=run_after) + + def next_dagrun_info( + self, *, last_automated_data_interval: DataInterval | None, restriction: TimeRestriction + ) -> DagRunInfo | None: + return self.timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=restriction, + ) + + def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str: + # All run IDs are delegated to the wrapped timetable; this class + # intentionally does not create ASSET_TRIGGERED runs. + return self.timetable.generate_run_id(run_type=run_type, **kwargs) diff --git a/airflow-core/tests/unit/timetables/test_assets_timetable.py b/airflow-core/tests/unit/timetables/test_assets_timetable.py index 3c12c886f283f..0e7f538deba55 100644 --- a/airflow-core/tests/unit/timetables/test_assets_timetable.py +++ b/airflow-core/tests/unit/timetables/test_assets_timetable.py @@ -31,7 +31,7 @@ from airflow.sdk import Asset, AssetAll, AssetAny, AssetOrTimeSchedule as SdkAssetOrTimeSchedule from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAll, SerializedAssetAny from airflow.serialization.serialized_objects import DagSerialization -from airflow.timetables.assets import AssetOrTimeSchedule as CoreAssetOrTimeSchedule +from airflow.timetables.assets import AssetAndTimeSchedule, AssetOrTimeSchedule as CoreAssetOrTimeSchedule from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.simple import AssetTriggeredTimetable from airflow.utils.types import DagRunType @@ -122,6 +122,17 @@ def sdk_asset_timetable(test_timetable, test_assets) -> SdkAssetOrTimeSchedule: return SdkAssetOrTimeSchedule(timetable=test_timetable, assets=test_assets) +@pytest.fixture +def asset_and_time_timetable(test_timetable: MockTimetable, test_assets: list[Asset]) -> AssetAndTimeSchedule: + """ + Pytest fixture for creating an AssetAndTimeSchedule object. + + :param test_timetable: The test timetable instance. + :param test_assets: A list of Asset instances. + """ + return AssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) + + @pytest.fixture def core_asset_timetable(test_timetable: MockTimetable) -> CoreAssetOrTimeSchedule: return CoreAssetOrTimeSchedule( @@ -160,6 +171,14 @@ def test_serialization(sdk_asset_timetable: SdkAssetOrTimeSchedule, monkeypatch: } +def test_serialization_and(asset_and_time_timetable: AssetAndTimeSchedule, monkeypatch: Any) -> None: + """Tests serialization of AssetAndTimeSchedule.""" + monkeypatch.setattr("airflow.serialization.encoders.encode_timetable", serialize_timetable) + serialized = asset_and_time_timetable.serialize() + assert serialized["timetable"] == "serialized_timetable" + assert "asset_condition" in serialized + + def test_deserialization(monkeypatch: Any, core_asset_timetable: CoreAssetOrTimeSchedule) -> None: """ Tests the deserialization method of AssetOrTimeSchedule. @@ -186,6 +205,28 @@ def test_deserialization(monkeypatch: Any, core_asset_timetable: CoreAssetOrTime assert deserialized == core_asset_timetable +def test_deserialization_and(monkeypatch: Any) -> None: + """Tests deserialization of AssetAndTimeSchedule.""" + monkeypatch.setattr("airflow.serialization.decoders.decode_timetable", lambda x: MockTimetable()) + mock_serialized_data = { + "timetable": "mock_serialized_timetable", + "asset_condition": { + "__type": "asset_all", + "objects": [ + { + "__type": "asset", + "name": "test_asset", + "uri": "test://asset/", + "group": "asset", + "extra": None, + } + ], + }, + } + deserialized = AssetAndTimeSchedule.deserialize(mock_serialized_data) + assert isinstance(deserialized, AssetAndTimeSchedule) + + def test_infer_manual_data_interval(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: """ Tests the infer_manual_data_interval method of AssetOrTimeSchedule. @@ -197,6 +238,12 @@ def test_infer_manual_data_interval(core_asset_timetable: CoreAssetOrTimeSchedul assert result == DataInterval.exact(run_after) +def test_infer_manual_data_interval_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: + run_after = DateTime.now() + result = asset_and_time_timetable.infer_manual_data_interval(run_after=run_after) + assert isinstance(result, DataInterval) + + def test_next_dagrun_info(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: """ Tests the next_dagrun_info method of AssetOrTimeSchedule. @@ -214,6 +261,15 @@ def test_next_dagrun_info(core_asset_timetable: CoreAssetOrTimeSchedule) -> None ) +def test_next_dagrun_info_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: + last_interval = DataInterval.exact(DateTime.now()) + restriction = TimeRestriction(earliest=DateTime.now(), latest=None, catchup=True) + result = asset_and_time_timetable.next_dagrun_info( + last_automated_data_interval=last_interval, restriction=restriction + ) + assert result is None or isinstance(result, DagRunInfo) + + def test_generate_run_id(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: """ Tests the generate_run_id method of AssetOrTimeSchedule. @@ -231,6 +287,17 @@ def test_generate_run_id(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: assert run_id == "manual__2025-06-07T08:09:00+00:00" +def test_generate_run_id_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: + run_id = asset_and_time_timetable.generate_run_id( + run_type=DagRunType.MANUAL, + extra_args="test", + logical_date=DateTime.now(), + run_after=DateTime.now(), + data_interval=None, + ) + assert isinstance(run_id, str) + + @pytest.fixture def asset_events(mocker) -> list[AssetEvent]: """Pytest fixture for creating mock AssetEvent objects.""" From 9db4b4404cd10918c5c712e65d133a38431468d6 Mon Sep 17 00:00:00 2001 From: Aaron Chen Date: Sun, 29 Mar 2026 10:17:11 -0700 Subject: [PATCH 02/14] Apply suggestion from @Lee-W Co-authored-by: Wei Lee --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 10920ed3882b0..767afda366edc 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2320,7 +2320,7 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): continue # For AssetAndTimeSchedule, defer starting until all required assets are queued. if isinstance(dag.timetable, AssetAndTimeSchedule): - # Count required assets for this DAG's schedule + # Count required assets for this Dag's schedule required_count = ( session.scalar( select(func.count()) From b1f65e34f74971e090c1cba645ff03fb34e1273b Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 29 Mar 2026 16:00:13 -0700 Subject: [PATCH 03/14] refactor - using AssetEvaluator & add task sdk interface --- .../src/airflow/jobs/scheduler_job_runner.py | 61 ++++----- .../src/airflow/serialization/encoders.py | 9 ++ .../tests/unit/jobs/test_scheduler_job.py | 118 ++++++++++++++++++ .../unit/timetables/test_assets_timetable.py | 80 +++++++++--- task-sdk/src/airflow/sdk/__init__.py | 3 + .../sdk/definitions/timetables/assets.py | 19 +++ 6 files changed, 235 insertions(+), 55 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 767afda366edc..fb110a3c6e76e 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2278,6 +2278,7 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): cached_get_dag: Callable[[DagRun], SerializedDAG | None] = lru_cache()( partial(self.scheduler_dag_bag.get_dag_for_run, session=session) ) + asset_evaluator = AssetEvaluator(session) for dag_run in dag_runs: dag_id = dag_run.dag_id @@ -2320,48 +2321,34 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): continue # For AssetAndTimeSchedule, defer starting until all required assets are queued. if isinstance(dag.timetable, AssetAndTimeSchedule): - # Count required assets for this Dag's schedule - required_count = ( - session.scalar( - select(func.count()) - .select_from(DagScheduleAssetReference) - .where(DagScheduleAssetReference.dag_id == dag_id) + queued_adrqs = session.scalars( + with_row_locks( + select(AssetDagRunQueue) + .where(AssetDagRunQueue.target_dag_id == dag_id) + .options(joinedload(AssetDagRunQueue.asset)), + of=AssetDagRunQueue, + session=session, + skip_locked=True, ) - or 0 - ) + ).all() + statuses = { + SerializedAssetUniqueKey.from_asset(record.asset): True for record in queued_adrqs + } + + if not asset_evaluator.run(dag.timetable.asset_condition, statuses=statuses): + self.log.debug("Deferring DagRun until assets ready; dag_id=%s run_id=%s", dag_id, run_id) + # Do not increment active run counts; we didn't start it. + continue - if required_count > 0: - ready_count = ( - session.scalar( - select(func.count()) - .select_from(DagScheduleAssetReference) - .join( - AssetDagRunQueue, - and_( - DagScheduleAssetReference.asset_id == AssetDagRunQueue.asset_id, - DagScheduleAssetReference.dag_id == AssetDagRunQueue.target_dag_id, - ), - ) - .where(DagScheduleAssetReference.dag_id == dag_id) + if queued_adrqs: + # Consume only the rows selected for this DagRun to avoid races with new asset events. + adrq_pks = [(record.asset_id, record.target_dag_id) for record in queued_adrqs] + session.execute( + delete(AssetDagRunQueue).where( + tuple_(AssetDagRunQueue.asset_id, AssetDagRunQueue.target_dag_id).in_(adrq_pks) ) - or 0 ) - if ready_count < required_count: - # Not all assets are present; skip starting this run for now. - self.log.debug( - "Deferring DagRun until assets ready; dag_id=%s run_id=%s (ready=%s required=%s)", - dag_id, - run_id, - ready_count, - required_count, - ) - # Do not increment active run counts; we didn't start it. - continue - - # Consume queued asset events for this DAG so the next run gates on new events. - session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_id)) - active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1 _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed(msg="started") diff --git a/airflow-core/src/airflow/serialization/encoders.py b/airflow-core/src/airflow/serialization/encoders.py index 87d7ef1f137e9..708ff349e026f 100644 --- a/airflow-core/src/airflow/serialization/encoders.py +++ b/airflow-core/src/airflow/serialization/encoders.py @@ -32,6 +32,7 @@ Asset, AssetAlias, AssetAll, + AssetAndTimeSchedule, AssetAny, AssetOrTimeSchedule, ChainMapper, @@ -281,6 +282,7 @@ class _Serializer: """Serialization logic.""" BUILTIN_TIMETABLES: dict[type, str] = { + AssetAndTimeSchedule: "airflow.timetables.assets.AssetAndTimeSchedule", AssetOrTimeSchedule: "airflow.timetables.assets.AssetOrTimeSchedule", AssetTriggeredTimetable: "airflow.timetables.simple.AssetTriggeredTimetable", ContinuousTimetable: "airflow.timetables.simple.ContinuousTimetable", @@ -379,6 +381,13 @@ def _(self, timetable: MultipleCronTriggerTimetable) -> dict[str, Any]: "run_immediately": encode_run_immediately(representitive.run_immediately), } + @serialize_timetable.register + def _(self, timetable: AssetAndTimeSchedule) -> dict[str, Any]: + return { + "asset_condition": encode_asset_like(timetable.asset_condition), + "timetable": encode_timetable(timetable.timetable), + } + @serialize_timetable.register def _(self, timetable: AssetOrTimeSchedule) -> dict[str, Any]: return { diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 16c42e8acd029..f47adeb16f500 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -100,7 +100,9 @@ from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable from airflow.serialization.definitions.dag import SerializedDAG from airflow.serialization.serialized_objects import LazyDeserializedDAG +from airflow.timetables.assets import AssetAndTimeSchedule from airflow.timetables.base import DagRunInfo, DataInterval +from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.session import create_session, provide_session from airflow.utils.sqlalchemy import with_row_locks from airflow.utils.state import CallbackState, DagRunState, State, TaskInstanceState @@ -9000,6 +9002,122 @@ def test_start_queued_dagruns_uses_latest_max_active_runs_from_dag_model(self, d assert running_count == 2 +@pytest.mark.usefixtures("disable_load_example") +@pytest.mark.need_serialized_dag +def test_start_queued_dagruns_asset_and_time_uses_asset_evaluator(session: Session, dag_maker): + asset_1 = Asset(uri="test://asset-and-time-or-1", name="asset-and-time-or-1") + asset_2 = Asset(uri="test://asset-and-time-or-2", name="asset-and-time-or-2") + with dag_maker( + dag_id="asset-and-time-uses-evaluator", + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("* * * * *", timezone="UTC"), + assets=asset_1 | asset_2, + ), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + dag_run = dag_maker.create_dagrun( + run_id="asset_and_time_queued", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=logical_date, + data_interval=DataInterval.exact(logical_date), + session=session, + ) + asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_1.uri)) + session.add( + AssetDagRunQueue( + asset_id=asset_1_id, + target_dag_id=dag_run.dag_id, + created_at=timezone.utcnow(), + ) + ) + session.flush() + + job_runner = SchedulerJobRunner(job=Job(), executors=[MockExecutor()]) + job_runner._start_queued_dagruns(session) + session.flush() + + dag_run = session.get(DagRun, dag_run.id) + assert dag_run is not None + assert dag_run.state == DagRunState.RUNNING + + +@pytest.mark.usefixtures("disable_load_example") +@pytest.mark.need_serialized_dag +def test_start_queued_dagruns_asset_and_time_deletes_only_selected_adrq_rows(session: Session, dag_maker): + asset_1 = Asset(uri="test://asset-and-time-selected-1", name="asset-and-time-selected-1") + asset_2 = Asset(uri="test://asset-and-time-selected-2", name="asset-and-time-selected-2") + with dag_maker( + dag_id="asset-and-time-delete-selected", + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("* * * * *", timezone="UTC"), + assets=asset_1 | asset_2, + ), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + dag_run = dag_maker.create_dagrun( + run_id="asset_and_time_selected_rows", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=logical_date, + data_interval=DataInterval.exact(logical_date), + session=session, + ) + asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_1.uri)) + asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset_2.uri)) + session.add_all( + [ + AssetDagRunQueue( + asset_id=asset_1_id, + target_dag_id=dag_run.dag_id, + created_at=timezone.utcnow(), + ), + AssetDagRunQueue( + asset_id=asset_2_id, + target_dag_id=dag_run.dag_id, + created_at=timezone.utcnow(), + ), + ] + ) + session.flush() + + job_runner = SchedulerJobRunner(job=Job(), executors=[MockExecutor()]) + + def _lock_only_selected_row(query, **_): + if query.column_descriptions and query.column_descriptions[0].get("entity") is AssetDagRunQueue: + return query.where(AssetDagRunQueue.asset_id == asset_1_id) + return query + + with patch("airflow.jobs.scheduler_job_runner.with_row_locks", side_effect=_lock_only_selected_row): + job_runner._start_queued_dagruns(session) + + dag_run = session.get(DagRun, dag_run.id) + assert dag_run is not None + assert dag_run.state == DagRunState.RUNNING + + adrq_1 = session.scalars( + select(AssetDagRunQueue).where( + AssetDagRunQueue.target_dag_id == dag_run.dag_id, + AssetDagRunQueue.asset_id == asset_1_id, + ) + ).one_or_none() + assert adrq_1 is None + + adrq_2 = session.scalars( + select(AssetDagRunQueue).where( + AssetDagRunQueue.target_dag_id == dag_run.dag_id, + AssetDagRunQueue.asset_id == asset_2_id, + ) + ).one_or_none() + assert adrq_2 is not None + + class TestSchedulerJobQueriesCount: """ These tests are designed to detect changes in the number of queries for diff --git a/airflow-core/tests/unit/timetables/test_assets_timetable.py b/airflow-core/tests/unit/timetables/test_assets_timetable.py index 0e7f538deba55..50c67c162abbf 100644 --- a/airflow-core/tests/unit/timetables/test_assets_timetable.py +++ b/airflow-core/tests/unit/timetables/test_assets_timetable.py @@ -28,10 +28,19 @@ from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.sdk import Asset, AssetAll, AssetAny, AssetOrTimeSchedule as SdkAssetOrTimeSchedule +from airflow.sdk import ( + Asset, + AssetAll, + AssetAndTimeSchedule as SdkAssetAndTimeSchedule, + AssetAny, + AssetOrTimeSchedule as SdkAssetOrTimeSchedule, +) from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAll, SerializedAssetAny from airflow.serialization.serialized_objects import DagSerialization -from airflow.timetables.assets import AssetAndTimeSchedule, AssetOrTimeSchedule as CoreAssetOrTimeSchedule +from airflow.timetables.assets import ( + AssetAndTimeSchedule as CoreAssetAndTimeSchedule, + AssetOrTimeSchedule as CoreAssetOrTimeSchedule, +) from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.simple import AssetTriggeredTimetable from airflow.utils.types import DagRunType @@ -123,14 +132,23 @@ def sdk_asset_timetable(test_timetable, test_assets) -> SdkAssetOrTimeSchedule: @pytest.fixture -def asset_and_time_timetable(test_timetable: MockTimetable, test_assets: list[Asset]) -> AssetAndTimeSchedule: +def sdk_asset_and_time_timetable( + test_timetable: MockTimetable, test_assets: list[Asset] +) -> SdkAssetAndTimeSchedule: + return SdkAssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) + + +@pytest.fixture +def core_asset_and_time_timetable( + test_timetable: MockTimetable, test_assets: list[Asset] +) -> CoreAssetAndTimeSchedule: """ Pytest fixture for creating an AssetAndTimeSchedule object. :param test_timetable: The test timetable instance. :param test_assets: A list of Asset instances. """ - return AssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) + return CoreAssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) @pytest.fixture @@ -171,12 +189,29 @@ def test_serialization(sdk_asset_timetable: SdkAssetOrTimeSchedule, monkeypatch: } -def test_serialization_and(asset_and_time_timetable: AssetAndTimeSchedule, monkeypatch: Any) -> None: +def test_serialization_and(sdk_asset_and_time_timetable: SdkAssetAndTimeSchedule, monkeypatch: Any) -> None: """Tests serialization of AssetAndTimeSchedule.""" - monkeypatch.setattr("airflow.serialization.encoders.encode_timetable", serialize_timetable) - serialized = asset_and_time_timetable.serialize() - assert serialized["timetable"] == "serialized_timetable" - assert "asset_condition" in serialized + from airflow.serialization.encoders import _serializer + + monkeypatch.setattr( + "airflow.serialization.encoders.encode_timetable", lambda x: "mock_serialized_timetable" + ) + serialized = _serializer.serialize_timetable(sdk_asset_and_time_timetable) + assert serialized == { + "timetable": "mock_serialized_timetable", + "asset_condition": { + "__type": "asset_all", + "objects": [ + { + "__type": "asset", + "name": "test_asset", + "uri": "test://asset/", + "group": "asset", + "extra": {}, + } + ], + }, + } def test_deserialization(monkeypatch: Any, core_asset_timetable: CoreAssetOrTimeSchedule) -> None: @@ -205,7 +240,9 @@ def test_deserialization(monkeypatch: Any, core_asset_timetable: CoreAssetOrTime assert deserialized == core_asset_timetable -def test_deserialization_and(monkeypatch: Any) -> None: +def test_deserialization_and( + monkeypatch: Any, core_asset_and_time_timetable: CoreAssetAndTimeSchedule +) -> None: """Tests deserialization of AssetAndTimeSchedule.""" monkeypatch.setattr("airflow.serialization.decoders.decode_timetable", lambda x: MockTimetable()) mock_serialized_data = { @@ -223,8 +260,15 @@ def test_deserialization_and(monkeypatch: Any) -> None: ], }, } - deserialized = AssetAndTimeSchedule.deserialize(mock_serialized_data) - assert isinstance(deserialized, AssetAndTimeSchedule) + deserialized = CoreAssetAndTimeSchedule.deserialize(mock_serialized_data) + assert isinstance(deserialized, CoreAssetAndTimeSchedule) + assert isinstance(deserialized.timetable, MockTimetable) + assert isinstance(deserialized.asset_condition, SerializedAssetAll) + assert len(deserialized.asset_condition.objects) == 1 + asset = deserialized.asset_condition.objects[0] + assert isinstance(asset, SerializedAsset) + assert asset.name == "test_asset" + assert asset.uri == "test://asset/" def test_infer_manual_data_interval(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: @@ -238,9 +282,9 @@ def test_infer_manual_data_interval(core_asset_timetable: CoreAssetOrTimeSchedul assert result == DataInterval.exact(run_after) -def test_infer_manual_data_interval_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: +def test_infer_manual_data_interval_and(core_asset_and_time_timetable: CoreAssetAndTimeSchedule) -> None: run_after = DateTime.now() - result = asset_and_time_timetable.infer_manual_data_interval(run_after=run_after) + result = core_asset_and_time_timetable.infer_manual_data_interval(run_after=run_after) assert isinstance(result, DataInterval) @@ -261,10 +305,10 @@ def test_next_dagrun_info(core_asset_timetable: CoreAssetOrTimeSchedule) -> None ) -def test_next_dagrun_info_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: +def test_next_dagrun_info_and(core_asset_and_time_timetable: CoreAssetAndTimeSchedule) -> None: last_interval = DataInterval.exact(DateTime.now()) restriction = TimeRestriction(earliest=DateTime.now(), latest=None, catchup=True) - result = asset_and_time_timetable.next_dagrun_info( + result = core_asset_and_time_timetable.next_dagrun_info( last_automated_data_interval=last_interval, restriction=restriction ) assert result is None or isinstance(result, DagRunInfo) @@ -287,8 +331,8 @@ def test_generate_run_id(core_asset_timetable: CoreAssetOrTimeSchedule) -> None: assert run_id == "manual__2025-06-07T08:09:00+00:00" -def test_generate_run_id_and(asset_and_time_timetable: AssetAndTimeSchedule) -> None: - run_id = asset_and_time_timetable.generate_run_id( +def test_generate_run_id_and(core_asset_and_time_timetable: CoreAssetAndTimeSchedule) -> None: + run_id = core_asset_and_time_timetable.generate_run_id( run_type=DagRunType.MANUAL, extra_args="test", logical_date=DateTime.now(), diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index f304b068237b3..63e7131c403e5 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -25,6 +25,7 @@ "AssetAlias", "AssetAll", "AssetAny", + "AssetAndTimeSchedule", "AssetOrTimeSchedule", "AssetWatcher", "AsyncCallback", @@ -153,6 +154,7 @@ from airflow.sdk.definitions.taskgroup import TaskGroup from airflow.sdk.definitions.template import literal from airflow.sdk.definitions.timetables.assets import ( + AssetAndTimeSchedule, AssetOrTimeSchedule, PartitionedAssetTimetable, ) @@ -181,6 +183,7 @@ "AssetAlias": ".definitions.asset", "AssetAll": ".definitions.asset", "AssetAny": ".definitions.asset", + "AssetAndTimeSchedule": ".definitions.timetables.assets", "AssetOrTimeSchedule": ".definitions.timetables.assets", "AssetWatcher": ".definitions.asset", "AsyncCallback": ".definitions.callback", diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py index e6bb683ebcadb..47109ae691fa2 100644 --- a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py +++ b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py @@ -77,3 +77,22 @@ class AssetOrTimeSchedule(AssetTriggeredTimetable): def __attrs_post_init__(self) -> None: self.active_runs_limit = self.timetable.active_runs_limit self.can_be_scheduled = self.timetable.can_be_scheduled + + +@attrs.define(kw_only=True) +class AssetAndTimeSchedule(BaseTimetable): + """ + Combine time-based scheduling with asset gating. + + :param assets: An asset or list of assets, in the same format as + ``DAG(schedule=...)`` when using event-driven scheduling. This is used + to evaluate whether a queued scheduled run can start. + :param timetable: A timetable instance to evaluate time-based scheduling. + """ + + asset_condition: BaseAsset = attrs.field(alias="assets", converter=_coerce_assets) + timetable: BaseTimetable + + def __attrs_post_init__(self) -> None: + self.active_runs_limit = self.timetable.active_runs_limit + self.can_be_scheduled = self.timetable.can_be_scheduled From 68f9768ecc397ff39afd6d2e88636cdf378e4237 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 29 Mar 2026 21:23:24 -0700 Subject: [PATCH 04/14] Handle asset wait timeout for AssetAndTimeSchedule --- .../src/airflow/jobs/scheduler_job_runner.py | 26 ++++ .../tests/unit/jobs/test_scheduler_job.py | 113 ++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index fb110a3c6e76e..f4a0919df944c 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2321,6 +2321,32 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): continue # For AssetAndTimeSchedule, defer starting until all required assets are queued. if isinstance(dag.timetable, AssetAndTimeSchedule): + # Reuse dagrun_timeout to fail runs that wait in QUEUED for assets for too long. + if ( + dag.dagrun_timeout + and dag_run.queued_at + and dag_run.queued_at < timezone.utcnow() - dag.dagrun_timeout + ): + dag_run.set_state(DagRunState.FAILED) + session.flush() + self.log.info( + "Run %s of %s has timed-out while waiting for assets", + dag_run.run_id, + dag_run.dag_id, + ) + if ( + dag_run.run_type + in ( + DagRunType.SCHEDULED, + DagRunType.MANUAL, + DagRunType.ASSET_TRIGGERED, + ) + and dag_run.dag_model is not None + ): + self._set_exceeds_max_active_runs(dag_model=dag_run.dag_model, session=session) + dag_run.notify_dagrun_state_changed(msg="timed_out") + continue + queued_adrqs = session.scalars( with_row_locks( select(AssetDagRunQueue) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f47adeb16f500..3774862ee1b3e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -9118,6 +9118,119 @@ def _lock_only_selected_row(query, **_): assert adrq_2 is not None +@time_machine.travel("2026-03-29 22:40:00+00:00") +@pytest.mark.usefixtures("disable_load_example") +@pytest.mark.need_serialized_dag +def test_start_queued_dagruns_asset_and_time_times_out_while_waiting_for_assets(session: Session, dag_maker): + asset = Asset(uri="test://asset-and-time-timeout", name="asset-and-time-timeout") + with dag_maker( + dag_id="asset-and-time-times-out-while-waiting", + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("* * * * *", timezone="UTC"), + assets=[asset], + ), + max_active_runs=1, + dagrun_timeout=datetime.timedelta(minutes=2), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + logical_date = timezone.utcnow() - datetime.timedelta(minutes=3) + dag_run = dag_maker.create_dagrun( + run_id="asset_and_time_wait_timeout", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=logical_date, + data_interval=DataInterval.exact(logical_date), + session=session, + ) + dag_run.queued_at = timezone.utcnow() - datetime.timedelta(minutes=3) + dag_maker.dag_model.exceeds_max_non_backfill = True + session.flush() + + job_runner = SchedulerJobRunner(job=Job(), executors=[MockExecutor()]) + job_runner._start_queued_dagruns(session) + session.flush() + + dag_run = session.get(DagRun, dag_run.id) + assert dag_run is not None + assert dag_run.state == DagRunState.FAILED + + session.refresh(dag_maker.dag_model) + assert dag_maker.dag_model.exceeds_max_non_backfill is False + + +@time_machine.travel("2026-03-29 22:40:00+00:00") +@pytest.mark.usefixtures("disable_load_example") +@pytest.mark.need_serialized_dag +def test_start_queued_dagruns_asset_and_time_times_out_older_run_before_starting_newer_one( + session: Session, dag_maker +): + asset = Asset( + uri="test://asset-and-time-timeout-releases-next", name="asset-and-time-timeout-releases-next" + ) + with dag_maker( + dag_id="asset-and-time-timeout-releases-next", + schedule=AssetAndTimeSchedule( + timetable=CronTriggerTimetable("* * * * *", timezone="UTC"), + assets=[asset], + ), + dagrun_timeout=datetime.timedelta(minutes=2), + session=session, + ): + EmptyOperator(task_id="dummy_task") + + older_logical_date = timezone.utcnow() - datetime.timedelta(minutes=3) + older_dag_run = dag_maker.create_dagrun( + run_id="asset_and_time_older_waiting_run", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=older_logical_date, + data_interval=DataInterval.exact(older_logical_date), + session=session, + ) + newer_logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + newer_dag_run = dag_maker.create_dagrun( + run_id="asset_and_time_newer_waiting_run", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=newer_logical_date, + data_interval=DataInterval.exact(newer_logical_date), + session=session, + ) + older_dag_run.queued_at = timezone.utcnow() - datetime.timedelta(minutes=3) + newer_dag_run.queued_at = timezone.utcnow() - datetime.timedelta(minutes=1) + + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == asset.uri)) + session.add( + AssetDagRunQueue( + asset_id=asset_id, + target_dag_id=older_dag_run.dag_id, + created_at=timezone.utcnow(), + ) + ) + session.flush() + + job_runner = SchedulerJobRunner(job=Job(), executors=[MockExecutor()]) + job_runner._start_queued_dagruns(session) + session.flush() + + older_dag_run = session.get(DagRun, older_dag_run.id) + newer_dag_run = session.get(DagRun, newer_dag_run.id) + assert older_dag_run is not None + assert older_dag_run.state == DagRunState.FAILED + assert newer_dag_run is not None + assert newer_dag_run.state == DagRunState.RUNNING + + remaining_adrq = session.scalar( + select(AssetDagRunQueue).where( + AssetDagRunQueue.target_dag_id == newer_dag_run.dag_id, + AssetDagRunQueue.asset_id == asset_id, + ) + ) + assert remaining_adrq is None + + class TestSchedulerJobQueriesCount: """ These tests are designed to detect changes in the number of queries for From 7c7cb77adbfb75a0415ef25b03ac951ac7b7b507 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Sun, 29 Mar 2026 21:38:15 -0700 Subject: [PATCH 05/14] add doc for task sdk --- .../docs/authoring-and-scheduling/asset-scheduling.rst | 6 +++--- airflow-core/docs/authoring-and-scheduling/timetable.rst | 6 ++---- task-sdk/docs/api.rst | 2 ++ 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst index e3979f3de374b..82cb0f522c966 100644 --- a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst +++ b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst @@ -152,11 +152,11 @@ Gate scheduled runs on asset updates Use ``AssetAndTimeSchedule`` when you want a Dag to follow a normal time-based timetable but only start after specific assets have been updated. Airflow creates the DagRun at the scheduled time and keeps it queued until every required asset has queued an event. When the DagRun starts, those asset events are consumed so the next scheduled run waits for new updates. This does not create additional asset-triggered runs. +If ``dagrun_timeout`` is set on the DAG, it also limits how long an ``AssetAndTimeSchedule`` run may remain queued while waiting for its asset condition to become ready. + .. code-block:: python - from airflow.sdk import DAG, Asset - from airflow.timetables.assets import AssetAndTimeSchedule - from airflow.timetables.trigger import CronTriggerTimetable + from airflow.sdk import DAG, Asset, AssetAndTimeSchedule, CronTriggerTimetable example_asset = Asset("s3://asset/example.csv") diff --git a/airflow-core/docs/authoring-and-scheduling/timetable.rst b/airflow-core/docs/authoring-and-scheduling/timetable.rst index 87d55a4c9054c..976d1ac5c51e5 100644 --- a/airflow-core/docs/authoring-and-scheduling/timetable.rst +++ b/airflow-core/docs/authoring-and-scheduling/timetable.rst @@ -279,8 +279,7 @@ Here's an example of a Dag using ``AssetOrTimeSchedule``: .. code-block:: python - from airflow.timetables.assets import AssetOrTimeSchedule - from airflow.timetables.trigger import CronTriggerTimetable + from airflow.sdk import AssetOrTimeSchedule, CronTriggerTimetable @dag( @@ -297,8 +296,7 @@ Here's an example of a Dag using ``AssetAndTimeSchedule`` to require both the ti .. code-block:: python - from airflow.timetables.assets import AssetAndTimeSchedule - from airflow.timetables.trigger import CronTriggerTimetable + from airflow.sdk import AssetAndTimeSchedule, CronTriggerTimetable @dag( diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst index cb9789f5bb69f..4b469e3f2c334 100644 --- a/task-sdk/docs/api.rst +++ b/task-sdk/docs/api.rst @@ -193,6 +193,8 @@ Assets Timetables ---------- +.. autoapiclass:: airflow.sdk.AssetAndTimeSchedule + .. autoapiclass:: airflow.sdk.AssetOrTimeSchedule .. autoapiclass:: airflow.sdk.CronDataIntervalTimetable From ba58a22b3b5e596ef78191c76e514647e8697a28 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Mon, 30 Mar 2026 10:00:59 -0700 Subject: [PATCH 06/14] fix CI error --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 10 +++++----- task-sdk/src/airflow/sdk/__init__.pyi | 2 ++ .../src/airflow/sdk/definitions/timetables/assets.py | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 3774862ee1b3e..ee36bc71b60ff 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -9017,7 +9017,7 @@ def test_start_queued_dagruns_asset_and_time_uses_asset_evaluator(session: Sessi ): EmptyOperator(task_id="dummy_task") - logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + logical_date = pendulum.now("UTC") - datetime.timedelta(minutes=1) dag_run = dag_maker.create_dagrun( run_id="asset_and_time_queued", state=DagRunState.QUEUED, @@ -9060,7 +9060,7 @@ def test_start_queued_dagruns_asset_and_time_deletes_only_selected_adrq_rows(ses ): EmptyOperator(task_id="dummy_task") - logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + logical_date = pendulum.now("UTC") - datetime.timedelta(minutes=1) dag_run = dag_maker.create_dagrun( run_id="asset_and_time_selected_rows", state=DagRunState.QUEUED, @@ -9135,7 +9135,7 @@ def test_start_queued_dagruns_asset_and_time_times_out_while_waiting_for_assets( ): EmptyOperator(task_id="dummy_task") - logical_date = timezone.utcnow() - datetime.timedelta(minutes=3) + logical_date = pendulum.now("UTC") - datetime.timedelta(minutes=3) dag_run = dag_maker.create_dagrun( run_id="asset_and_time_wait_timeout", state=DagRunState.QUEUED, @@ -9180,7 +9180,7 @@ def test_start_queued_dagruns_asset_and_time_times_out_older_run_before_starting ): EmptyOperator(task_id="dummy_task") - older_logical_date = timezone.utcnow() - datetime.timedelta(minutes=3) + older_logical_date = pendulum.now("UTC") - datetime.timedelta(minutes=3) older_dag_run = dag_maker.create_dagrun( run_id="asset_and_time_older_waiting_run", state=DagRunState.QUEUED, @@ -9189,7 +9189,7 @@ def test_start_queued_dagruns_asset_and_time_times_out_older_run_before_starting data_interval=DataInterval.exact(older_logical_date), session=session, ) - newer_logical_date = timezone.utcnow() - datetime.timedelta(minutes=1) + newer_logical_date = pendulum.now("UTC") - datetime.timedelta(minutes=1) newer_dag_run = dag_maker.create_dagrun( run_id="asset_and_time_newer_waiting_run", state=DagRunState.QUEUED, diff --git a/task-sdk/src/airflow/sdk/__init__.pyi b/task-sdk/src/airflow/sdk/__init__.pyi index 7e6d211674eba..39f6cc4321065 100644 --- a/task-sdk/src/airflow/sdk/__init__.pyi +++ b/task-sdk/src/airflow/sdk/__init__.pyi @@ -85,6 +85,7 @@ from airflow.sdk.definitions.retry_policy import ( from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup from airflow.sdk.definitions.template import literal as literal from airflow.sdk.definitions.timetables.assets import ( + AssetAndTimeSchedule, AssetOrTimeSchedule, PartitionedAssetTimetable, ) @@ -114,6 +115,7 @@ __all__ = [ "AssetAlias", "AssetAll", "AssetAny", + "AssetAndTimeSchedule", "AssetOrTimeSchedule", "AssetWatcher", "BaseAsyncOperator", diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py index 47109ae691fa2..7cf239408f308 100644 --- a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py +++ b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py @@ -82,7 +82,7 @@ def __attrs_post_init__(self) -> None: @attrs.define(kw_only=True) class AssetAndTimeSchedule(BaseTimetable): """ - Combine time-based scheduling with asset gating. + Combine time-based scheduling with asset conditions. :param assets: An asset or list of assets, in the same format as ``DAG(schedule=...)`` when using event-driven scheduling. This is used From a1d719e1fcf0d8f61ed476184d067fb36c7e02b1 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Mon, 30 Mar 2026 10:34:39 -0700 Subject: [PATCH 07/14] fix CI error #2 --- airflow-core/tests/unit/timetables/test_assets_timetable.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/timetables/test_assets_timetable.py b/airflow-core/tests/unit/timetables/test_assets_timetable.py index 50c67c162abbf..be6d624f85ffa 100644 --- a/airflow-core/tests/unit/timetables/test_assets_timetable.py +++ b/airflow-core/tests/unit/timetables/test_assets_timetable.py @@ -132,9 +132,7 @@ def sdk_asset_timetable(test_timetable, test_assets) -> SdkAssetOrTimeSchedule: @pytest.fixture -def sdk_asset_and_time_timetable( - test_timetable: MockTimetable, test_assets: list[Asset] -) -> SdkAssetAndTimeSchedule: +def sdk_asset_and_time_timetable(test_timetable, test_assets) -> SdkAssetAndTimeSchedule: return SdkAssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) From 283a0c29a6613326553f14d2b2bc256b652a0ffe Mon Sep 17 00:00:00 2001 From: Aaron Chen Date: Fri, 3 Apr 2026 21:38:04 -0700 Subject: [PATCH 08/14] Update task-sdk/src/airflow/sdk/__init__.py Co-authored-by: Wei Lee --- task-sdk/src/airflow/sdk/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index 63e7131c403e5..e0763d58a0168 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -182,8 +182,8 @@ "Asset": ".definitions.asset", "AssetAlias": ".definitions.asset", "AssetAll": ".definitions.asset", - "AssetAny": ".definitions.asset", "AssetAndTimeSchedule": ".definitions.timetables.assets", + "AssetAny": ".definitions.asset", "AssetOrTimeSchedule": ".definitions.timetables.assets", "AssetWatcher": ".definitions.asset", "AsyncCallback": ".definitions.callback", From 32a7f5a90e635b5fdbf2084ce243f0a4519d24fe Mon Sep 17 00:00:00 2001 From: Aaron Chen Date: Fri, 3 Apr 2026 21:39:30 -0700 Subject: [PATCH 09/14] Update task-sdk/src/airflow/sdk/__init__.pyi Co-authored-by: Wei Lee --- task-sdk/src/airflow/sdk/__init__.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/__init__.pyi b/task-sdk/src/airflow/sdk/__init__.pyi index 39f6cc4321065..a128940339b4c 100644 --- a/task-sdk/src/airflow/sdk/__init__.pyi +++ b/task-sdk/src/airflow/sdk/__init__.pyi @@ -114,8 +114,8 @@ __all__ = [ "Asset", "AssetAlias", "AssetAll", - "AssetAny", "AssetAndTimeSchedule", + "AssetAny", "AssetOrTimeSchedule", "AssetWatcher", "BaseAsyncOperator", From 86a5260e21092c9b539ce10137308c6aea698c17 Mon Sep 17 00:00:00 2001 From: Aaron Chen Date: Fri, 3 Apr 2026 21:40:15 -0700 Subject: [PATCH 10/14] Term modify: DAG -> Dag Co-authored-by: Wei Lee --- airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst index 82cb0f522c966..80d07352f66ab 100644 --- a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst +++ b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst @@ -152,7 +152,7 @@ Gate scheduled runs on asset updates Use ``AssetAndTimeSchedule`` when you want a Dag to follow a normal time-based timetable but only start after specific assets have been updated. Airflow creates the DagRun at the scheduled time and keeps it queued until every required asset has queued an event. When the DagRun starts, those asset events are consumed so the next scheduled run waits for new updates. This does not create additional asset-triggered runs. -If ``dagrun_timeout`` is set on the DAG, it also limits how long an ``AssetAndTimeSchedule`` run may remain queued while waiting for its asset condition to become ready. +If ``dagrun_timeout`` is set on the Dag, it also limits how long an ``AssetAndTimeSchedule`` run may remain queued while waiting for its asset condition to become ready. .. code-block:: python From a2b715ea706c7fb0f2b1448ec12f2b5c6b646202 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Fri, 10 Apr 2026 15:00:10 -0700 Subject: [PATCH 11/14] Enhance AssetAndTimeSchedule handling by deferring scheduled runs until all required assets are queued --- airflow-core/docs/authoring-and-scheduling/timetable.rst | 6 +++--- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/timetable.rst b/airflow-core/docs/authoring-and-scheduling/timetable.rst index 976d1ac5c51e5..e729b2c448cb9 100644 --- a/airflow-core/docs/authoring-and-scheduling/timetable.rst +++ b/airflow-core/docs/authoring-and-scheduling/timetable.rst @@ -301,9 +301,9 @@ Here's an example of a Dag using ``AssetAndTimeSchedule`` to require both the ti @dag( schedule=AssetAndTimeSchedule( - timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset) - ) - # Additional arguments here, replace this comment with actual arguments + timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), + assets=(dag1_asset & dag2_asset), + ), ) def example_gated_dag(): # Dag tasks go here diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index f4a0919df944c..46fda869a77d8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2320,7 +2320,8 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): ) continue # For AssetAndTimeSchedule, defer starting until all required assets are queued. - if isinstance(dag.timetable, AssetAndTimeSchedule): + # Only gate scheduled runs; manual and backfill runs should start immediately. + if isinstance(dag.timetable, AssetAndTimeSchedule) and dag_run.run_type == DagRunType.SCHEDULED: # Reuse dagrun_timeout to fail runs that wait in QUEUED for assets for too long. if ( dag.dagrun_timeout From f5b1453def159471ad7f625813495ab06ccfd71a Mon Sep 17 00:00:00 2001 From: nailo2c Date: Tue, 28 Apr 2026 12:20:19 -0700 Subject: [PATCH 12/14] Fix redundant DagRun type check --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 46fda869a77d8..74582de531b5e 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2335,15 +2335,7 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): dag_run.run_id, dag_run.dag_id, ) - if ( - dag_run.run_type - in ( - DagRunType.SCHEDULED, - DagRunType.MANUAL, - DagRunType.ASSET_TRIGGERED, - ) - and dag_run.dag_model is not None - ): + if dag_run.dag_model is not None: self._set_exceeds_max_active_runs(dag_model=dag_run.dag_model, session=session) dag_run.notify_dagrun_state_changed(msg="timed_out") continue From 322fb8cc9d82c4d1f5828041bf0f6d427385c3b0 Mon Sep 17 00:00:00 2001 From: Aaron Chen Date: Thu, 30 Apr 2026 11:55:51 -0700 Subject: [PATCH 13/14] Update airflow-core/src/airflow/timetables/assets.py Co-authored-by: Wei Lee --- airflow-core/src/airflow/timetables/assets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/timetables/assets.py b/airflow-core/src/airflow/timetables/assets.py index c73c94988bbb0..5a9baff353711 100644 --- a/airflow-core/src/airflow/timetables/assets.py +++ b/airflow-core/src/airflow/timetables/assets.py @@ -110,7 +110,7 @@ def __init__( self, *, timetable: Timetable, - assets: Collection[Asset] | BaseAsset | SerializedAssetBase, + assets: Collection[SerializedAsset] | SerializedAssetBase, ) -> None: self.timetable = timetable From 9b94c1be8d4caae13c8236cf75a7538b551c23a8 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Thu, 30 Apr 2026 14:43:54 -0700 Subject: [PATCH 14/14] Remove unused Asset import and fix test fixtures for new annotation The annotation on AssetAndTimeSchedule.__init__ was migrated to Collection[SerializedAsset] | SerializedAssetBase, leaving the TYPE_CHECKING-only Asset import unreferenced and breaking mypy on two test files that passed SDK Assets to the core class. The scheduler tests now import AssetAndTimeSchedule (and the matching CronTriggerTimetable) from airflow.sdk so the types align with how DAG authors use it; the timetable test fixture mirrors core_asset_timetable by constructing SerializedAsset directly. --- airflow-core/src/airflow/timetables/assets.py | 1 - .../tests/unit/jobs/test_scheduler_job.py | 4 ++-- .../unit/timetables/test_assets_timetable.py | 15 +++++---------- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/timetables/assets.py b/airflow-core/src/airflow/timetables/assets.py index 5a9baff353711..84633ce6cec1a 100644 --- a/airflow-core/src/airflow/timetables/assets.py +++ b/airflow-core/src/airflow/timetables/assets.py @@ -32,7 +32,6 @@ import pendulum - from airflow.sdk.definitions.asset import Asset from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index ee36bc71b60ff..44bbc1e57a516 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -90,8 +90,10 @@ DAG, Asset, AssetAlias, + AssetAndTimeSchedule, AssetWatcher, CronPartitionTimetable, + CronTriggerTimetable, IdentityMapper, StartOfHourMapper, task, @@ -100,9 +102,7 @@ from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable from airflow.serialization.definitions.dag import SerializedDAG from airflow.serialization.serialized_objects import LazyDeserializedDAG -from airflow.timetables.assets import AssetAndTimeSchedule from airflow.timetables.base import DagRunInfo, DataInterval -from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.session import create_session, provide_session from airflow.utils.sqlalchemy import with_row_locks from airflow.utils.state import CallbackState, DagRunState, State, TaskInstanceState diff --git a/airflow-core/tests/unit/timetables/test_assets_timetable.py b/airflow-core/tests/unit/timetables/test_assets_timetable.py index be6d624f85ffa..6b5ba85ff4d88 100644 --- a/airflow-core/tests/unit/timetables/test_assets_timetable.py +++ b/airflow-core/tests/unit/timetables/test_assets_timetable.py @@ -137,16 +137,11 @@ def sdk_asset_and_time_timetable(test_timetable, test_assets) -> SdkAssetAndTime @pytest.fixture -def core_asset_and_time_timetable( - test_timetable: MockTimetable, test_assets: list[Asset] -) -> CoreAssetAndTimeSchedule: - """ - Pytest fixture for creating an AssetAndTimeSchedule object. - - :param test_timetable: The test timetable instance. - :param test_assets: A list of Asset instances. - """ - return CoreAssetAndTimeSchedule(timetable=test_timetable, assets=test_assets) +def core_asset_and_time_timetable(test_timetable: MockTimetable) -> CoreAssetAndTimeSchedule: + return CoreAssetAndTimeSchedule( + timetable=test_timetable, + assets=SerializedAssetAll([SerializedAsset("test_asset", "test://asset/", "asset", {}, [])]), + ) @pytest.fixture