diff --git a/airflow-core/docs/authoring-and-scheduling/timetable.rst b/airflow-core/docs/authoring-and-scheduling/timetable.rst index b19cc80211392..f96aa8dc9f32f 100644 --- a/airflow-core/docs/authoring-and-scheduling/timetable.rst +++ b/airflow-core/docs/authoring-and-scheduling/timetable.rst @@ -151,6 +151,63 @@ must be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta` def example_dag(): pass +.. versionadded:: 3.0.0 + The ``run_immediately`` argument was introduced in Airflow 3. + +The optional ``run_immediately`` argument controls which cron point is scheduled when a Dag is first +enabled or re-enabled after a pause. It has no effect when ``catchup=True`` (in that case the +scheduler always continues from where it left off). + +* ``run_immediately=True`` *(default)* — schedule the **most recent past** cron point immediately. +* ``run_immediately=False`` — skip the past cron point and wait for the **next future** cron point. +* ``run_immediately=timedelta(...)`` — schedule the most recent past cron point only if it fired + within the given window; otherwise wait for the **next future** cron point. + +.. code-block:: python + + from datetime import datetime, timedelta + + from airflow.timetables.trigger import CronTriggerTimetable + + + @dag( + # Runs every 10 minutes. + # run_immediately=False: always skip the most recent past slot and wait + # for the next 10-minute boundary. + schedule=CronTriggerTimetable( + "*/10 * * * *", + timezone="UTC", + run_immediately=False, + ), + start_date=datetime(2024, 1, 1), + catchup=False, + ..., + ) + def example_dag(): + pass + + + @dag( + # Runs hourly. + # run_immediately=timedelta(minutes=10): run the most recent past slot + # only if it fired within the last 10 minutes; otherwise wait for next. + schedule=CronTriggerTimetable( + "0 * * * *", + timezone="UTC", + run_immediately=timedelta(minutes=10), + ), + start_date=datetime(2024, 1, 1), + catchup=False, + ..., + ) + def example_dag_with_buffer(): + pass + +.. note:: + + ``run_immediately`` is a parameter of ``CronTriggerTimetable``, **not** of the ``DAG`` + constructor. Passing it directly to ``DAG(run_immediately=...)`` has no effect. + .. _MultipleCronTriggerTimetable: @@ -169,7 +226,7 @@ This is similar to CronTriggerTimetable_ except it takes multiple cron expressio def example_dag(): pass -The same optional ``interval`` argument as CronTriggerTimetable_ is also available. +The same optional ``interval`` and ``run_immediately`` arguments as CronTriggerTimetable_ are also available. .. code-block:: python @@ -350,10 +407,11 @@ The following is another example showing the difference in the case of skipping Suppose there are two running Dags with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the Dags at 3PM on January 31st and re-enable them at 3PM on February 2nd, -- `CronTriggerTimetable`_ skips the Dag runs that were supposed to trigger on February 1st and 2nd. The next Dag run will be triggered at 12AM on February 3rd. -- `CronDataIntervalTimetable`_ skips the Dag runs that were supposed to trigger on February 1st only. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. +- Both `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ skip the Dag run that was supposed to trigger on February 1st. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. + +The difference between the two timetables in this scenario is the ``run_id`` timestamp: for ``CronTriggerTimetable``, the ``run_id`` reflects midnight on February 2nd (the trigger time), while for ``CronDataIntervalTimetable``, the ``run_id`` reflects midnight on February 1st (the start of the data interval being processed). -In these examples, you see how a trigger timetable creates Dag runs more intuitively and similar to what +In the first example (enabling a new Dag), you see how a trigger timetable creates Dag runs more intuitively and similar to what people expect a workflow to behave, while a data interval timetable is designed heavily around the data interval it processes, and does not reflect a workflow's own properties. diff --git a/airflow-core/src/airflow/timetables/trigger.py b/airflow-core/src/airflow/timetables/trigger.py index d530d5b898a82..bafb8061cbb68 100644 --- a/airflow-core/src/airflow/timetables/trigger.py +++ b/airflow-core/src/airflow/timetables/trigger.py @@ -87,12 +87,15 @@ def next_dagrun_info( else: next_start_time = self._align_to_next(restriction.earliest) else: - start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] if last_automated_data_interval is not None: - start_time_candidates.append(self._get_next(last_automated_data_interval.end)) - elif restriction.earliest is None: - # Run immediately has no effect if there is restriction on earliest - start_time_candidates.append(self._calc_first_run()) + # _calc_first_run respects run_immediately to decide between the + # most recent past cron point and the next future one. + start_time_candidates = [ + self._calc_first_run(), + self._get_next(last_automated_data_interval.end), + ] + else: + start_time_candidates = [self._calc_first_run()] if restriction.earliest is not None: start_time_candidates.append(self._align_to_next(restriction.earliest)) next_start_time = max(start_time_candidates) @@ -165,19 +168,17 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable): :param timezone: Which timezone to use to interpret the cron string :param interval: timedelta that defines the data interval start. Default 0. - *run_immediately* controls, if no *start_time* is given to the DAG, when - the first run of the DAG should be scheduled. It has no effect if there - already exist runs for this DAG. - - * If *True*, always run immediately the most recent possible DAG run. - * If *False*, wait to run until the next scheduled time in the future. - * If passed a ``timedelta``, will run the most recent possible DAG run - if that run's ``data_interval_end`` is within timedelta of now. - * If *None*, the timedelta is calculated as 10% of the time between the - most recent past scheduled time and the next scheduled time. E.g. if - running every hour, this would run the previous time if less than 6 - minutes had past since the previous run time, otherwise it would wait - until the next hour. + *run_immediately* controls which cron point is scheduled when a Dag is + first enabled or re-enabled after a pause. It always takes effect + regardless of whether ``start_date`` is set, but has no effect when + ``catchup=True``. + + .. versionadded:: 3.0.0 + + * If *True* (default), always run the most recent past cron point immediately. + * If *False*, skip the past cron point and wait for the next future one. + * If passed a ``timedelta``, run the most recent past cron point only if it + is within that timedelta of now; otherwise wait for the next future one. """ def __init__( @@ -186,7 +187,7 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, ) -> None: super().__init__(cron, timezone) self._interval = interval @@ -200,7 +201,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable: data["expression"], timezone=parse_timezone(data["timezone"]), interval=decode_interval(data["interval"]), - run_immediately=decode_run_immediately(data.get("run_immediately", False)), + run_immediately=decode_run_immediately(data.get("run_immediately", True)), ) def serialize(self) -> dict[str, Any]: @@ -215,26 +216,21 @@ def serialize(self) -> dict[str, Any]: def _calc_first_run(self) -> DateTime: """ - If no start_time is set, determine the start. + Determine which cron point to schedule next based on ``run_immediately``. - If True, always prefer past run, if False, never. If None, if within 10% of next run, - if timedelta, if within that timedelta from past run. + If *True*, always run the most recent past cron point. + If *False*, always wait for the next future cron point. + If a ``timedelta``, run the most recent past cron point only if it falls + within that window of now; otherwise wait for the next future cron point. """ now = coerce_datetime(utcnow()) past_run_time = self._align_to_prev(now) - next_run_time = self._align_to_next(now) if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true. return past_run_time - - gap_between_runs = next_run_time - past_run_time - gap_to_past = now - past_run_time if isinstance(self._run_immediately, datetime.timedelta): - buffer_between_runs = self._run_immediately - else: - buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5)) - if gap_to_past <= buffer_between_runs: - return past_run_time - return next_run_time + if now - past_run_time <= self._run_immediately: + return past_run_time + return self._align_to_next(now) class MultipleCronTriggerTimetable(Timetable): @@ -253,7 +249,7 @@ def __init__( *crons: str, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, ) -> None: if not crons: raise ValueError("cron expression required") @@ -373,19 +369,15 @@ class CronPartitionTimetable(CronTriggerTimetable): The partition key will be derived from the partition date. :param key_format: How to translate the partition date into a string partition key. - *run_immediately* controls, if no *start_time* is given to the Dag, when - the first run of the Dag should be scheduled. It has no effect if there - already exist runs for this Dag. + *run_immediately* controls which cron point is scheduled when a Dag is + first enabled or re-enabled after a pause. It always takes effect + regardless of whether ``start_date`` is set, but has no effect when + ``catchup=True``. - * If *True*, always run immediately the most recent possible Dag run. - * If *False*, wait to run until the next scheduled time in the future. - * If passed a ``timedelta``, will run the most recent possible Dag run - if that run's ``data_interval_end`` is within timedelta of now. - * If *None*, the timedelta is calculated as 10% of the time between the - most recent past scheduled time and the next scheduled time. E.g. if - running every hour, this would run the previous time if less than 6 - minutes had past since the previous run time, otherwise it would wait - until the next hour. + * If *True* (default), always run the most recent past cron point immediately. + * If *False*, skip the past cron point and wait for the next future one. + * If passed a ``timedelta``, run the most recent past cron point only if it + is within that timedelta of now; otherwise wait for the next future one. # todo: AIP-76 talk about how we can have auto-reprocessing of partitions # todo: AIP-76 we could allow a tuple of integer + time-based @@ -399,7 +391,7 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, run_offset: int | datetime.timedelta | relativedelta | None = None, - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, # todo: AIP-76 we can't infer partition date from this, so we need to store it separately. key_format: str = r"%Y-%m-%dT%H:%M:%S", ) -> None: @@ -426,7 +418,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable: cron=data["expression"], timezone=parse_timezone(data["timezone"]), run_offset=offset, - run_immediately=decode_run_immediately(data.get("run_immediately", False)), + run_immediately=decode_run_immediately(data.get("run_immediately", True)), key_format=data["key_format"], ) @@ -485,15 +477,15 @@ def next_dagrun_info_v2( else: next_start_time = self._align_to_next(restriction.earliest) else: - prev_candidate = self._align_to_prev(coerce_datetime(utcnow())) - start_time_candidates = [prev_candidate] if last_dagrun_info is not None: - next_candidate = self._get_next(last_dagrun_info.run_after) - start_time_candidates.append(next_candidate) - elif restriction.earliest is None: - # Run immediately has no effect if there is restriction on earliest - first_run = self._calc_first_run() - start_time_candidates.append(first_run) + # _calc_first_run respects run_immediately to decide between the + # most recent past cron point and the next future one. + start_time_candidates = [ + self._calc_first_run(), + self._get_next(last_dagrun_info.run_after), + ] + else: + start_time_candidates = [self._calc_first_run()] if restriction.earliest is not None: earliest = self._align_to_next(restriction.earliest) start_time_candidates.append(earliest) diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py b/airflow-core/tests/unit/timetables/test_trigger_timetable.py index 114080685ca5a..735fc97508be5 100644 --- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py +++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py @@ -59,7 +59,7 @@ [ pytest.param( None, - YESTERDAY + DELTA_FROM_MIDNIGHT, + CURRENT_TIME + DELTA_FROM_MIDNIGHT, id="first-run", ), pytest.param( @@ -79,11 +79,11 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( last_automated_data_interval: DataInterval | None, next_start_time: pendulum.DateTime, ) -> None: - """If ``catchup=False`` and start_date is a day before""" + """If ``catchup=False`` and start_date is a day before, run_immediately=False skips the past run""" timetable = CronTriggerTimetable( "30 16 * * *", timezone=utc, - run_immediately=False, # Should have no effect since earliest is not None + run_immediately=False, ) next_info = timetable.next_dagrun_info( last_automated_data_interval=last_automated_data_interval, @@ -480,7 +480,8 @@ def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: dic ("run_immediately", "current_time", "correct_interval"), [ (True, WAY_AFTER, PREVIOUS), - (False, JUST_AFTER, PREVIOUS), + (True, JUST_AFTER, PREVIOUS), + (False, JUST_AFTER, NEXT), (False, WAY_AFTER, NEXT), (datetime.timedelta(minutes=10), JUST_AFTER, PREVIOUS), (datetime.timedelta(minutes=10), WAY_AFTER, NEXT), @@ -501,18 +502,20 @@ def test_run_immediately(catchup, run_immediately, current_time, correct_interva @pytest.mark.parametrize("catchup", [True, False]) -def test_run_immediately_fast_dag(catchup): +def test_run_immediately_false_skips_to_next(catchup): + """With run_immediately=False, always skip to the next cron point regardless of how close now is.""" timetable = CronTriggerTimetable( - "*/10 3 * * *", # Runs every 10 minutes, so falls back to 5 min hardcoded limit on buffer time + "*/10 3 * * *", timezone=utc, run_immediately=False, ) + next_10min = DagRunInfo.exact(pendulum.datetime(year=2024, month=8, day=15, hour=3, minute=10)) with time_machine.travel(JUST_AFTER, tick=False): next_info = timetable.next_dagrun_info( last_automated_data_interval=None, restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup), ) - assert next_info == PREVIOUS + assert next_info == next_10min @pytest.mark.parametrize( @@ -593,7 +596,7 @@ def test_multi_serialization(): "expressions": ["@every 30s", "*/2 * * * *"], "timezone": "UTC", "interval": 600.0, - "run_immediately": False, + "run_immediately": True, } tt = MultipleCronTriggerTimetable.deserialize(data) @@ -603,7 +606,7 @@ def test_multi_serialization(): assert tt._timetables[1]._expression == "*/2 * * * *" assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc assert tt._timetables[0]._interval == tt._timetables[1]._interval == datetime.timedelta(minutes=10) - assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is False + assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is True @pytest.mark.db_test @@ -818,6 +821,75 @@ def test_next_run_info_from_dag_model(schedule, partition_key, expected, dag_mak assert info == expected +def test_run_immediately_false_with_start_date(): + """run_immediately=False should be respected even when start_date (earliest) is set. + + With run_immediately=False, the timetable always skips the past cron point + and waits for the next future one — regardless of how recently the boundary fired. + """ + timetable = CronTriggerTimetable( + "*/10 * * * *", + timezone=utc, + run_immediately=False, + ) + # 4 minutes past the 7:10 boundary — should still skip to 7:20 + current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 20, tz=utc)) + + +def test_run_immediately_true_with_start_date(): + """run_immediately=True (default) runs the most recent past cron point even when start_date is set.""" + timetable = CronTriggerTimetable( + "*/10 * * * *", + timezone=utc, + run_immediately=True, + ) + # Unpause at 7:14 — should run 7:10 immediately + current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 10, tz=utc)) + + +def test_run_immediately_false_after_unpause(): + """After a pause/resume, run_immediately=False skips the past boundary and waits for the next.""" + timetable = CronTriggerTimetable( + "0 0 * * *", # @daily + timezone=utc, + run_immediately=False, + ) + # Last run was Jan 31 midnight, unpause at 3PM Feb 2 + last_interval = DataInterval.exact(pendulum.datetime(2024, 1, 31, tz=utc)) + current_time = pendulum.datetime(2024, 2, 2, 15, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_interval, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + # run_immediately=False: skip Feb 2 (most recent past boundary) and wait for Feb 3 + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 2, 3, tz=utc)) + + def test_generate_run_id_without_partition_key() -> None: """ Tests the generate_run_id method of CronPartitionTimetable. diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py index 4bfc5090230fe..7190211913580 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py @@ -514,7 +514,7 @@ def test_serialize_timetable_with_dataset_or_time_schedule(): dag_id="test", start_date=datetime.datetime(2025, 1, 1), schedule=AssetOrTimeSchedule( - timetable=CronTriggerTimetable("0 0 * 3 *", timezone="UTC"), + timetable=CronTriggerTimetable("0 0 * 3 *", timezone="UTC", run_immediately=True), assets=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2")) & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})), ), @@ -527,7 +527,7 @@ def test_serialize_timetable_with_dataset_or_time_schedule(): "expression": "0 0 * 3 *", "timezone": "UTC", "interval": 0.0, - "run_immediately": False, + "run_immediately": True, }, }, "asset_condition": { diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 36fe5125a7f0f..72e52f90ca2f6 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -2627,7 +2627,7 @@ def test_dag_info_schedule_asset_or_time_schedule(self): dag_id="dag_id", start_date=datetime.datetime(2024, 6, 1), schedule=AssetOrTimeSchedule( - timetable=CronTriggerTimetable("*/4 3 * * *", timezone="UTC"), + timetable=CronTriggerTimetable("*/4 3 * * *", timezone="UTC", run_immediately=True), assets=((Asset("uri1", extra={"a": 1}) | Asset("uri2")) & (Asset("uri3") | Asset("uri4"))), ), ) @@ -2691,7 +2691,7 @@ def test_dag_info_schedule_asset_or_time_schedule(self): "expression": "*/4 3 * * *", "timezone": "UTC", "interval": 0.0, - "run_immediately": False, + "run_immediately": True, }, }, }, diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py index 608f4c0a57f5d..ca77aaeaa021d 100644 --- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py +++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py @@ -67,23 +67,19 @@ class CronTriggerTimetable(CronMixin, BaseTimetable): :param timezone: Which timezone to use to interpret the cron string :param interval: timedelta that defines the data interval start. Default 0. - *run_immediately* controls, if no *start_time* is given to the Dag, when - the first run of the Dag should be scheduled. It has no effect if there - already exist runs for this Dag. - - * If *True*, always run immediately the most recent possible Dag run. - * If *False*, wait to run until the next scheduled time in the future. - * If passed a ``timedelta``, will run the most recent possible Dag run - if that run's ``data_interval_end`` is within timedelta of now. - * If *None*, the timedelta is calculated as 10% of the time between the - most recent past scheduled time and the next scheduled time. E.g. if - running every hour, this would run the previous time if less than 6 - minutes had past since the previous run time, otherwise it would wait - until the next hour. + *run_immediately* controls which cron point is scheduled when a Dag is + first enabled or re-enabled after a pause. It always takes effect + regardless of whether ``start_date`` is set, but has no effect when + ``catchup=True``. + + * If *True* (default), always run the most recent past cron point immediately. + * If *False*, skip the past cron point and wait for the next future one. + * If passed a ``timedelta``, run the most recent past cron point only if it + is within that timedelta of now; otherwise wait for the next future one. """ interval: datetime.timedelta | relativedelta = attrs.field(kw_only=True, default=datetime.timedelta()) - run_immediately: bool | datetime.timedelta = attrs.field(kw_only=True, default=False) + run_immediately: bool | datetime.timedelta = attrs.field(kw_only=True, default=True) @attrs.define(init=False) @@ -105,7 +101,7 @@ def __init__( *crons: str, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, ) -> None: if not crons: raise ValueError("cron expression required") @@ -138,18 +134,15 @@ class CronPartitionTimetable(CronTriggerTimetable): The partition key will be derived from the partition date. :param key_format: How to translate the partition date into a string partition key. - *run_immediately* controls, if no *start_time* is given to the Dag, when - the first run of the Dag should be scheduled. It has no effect if there already exist runs for this Dag. + *run_immediately* controls which cron point is scheduled when a Dag is + first enabled or re-enabled after a pause. It always takes effect + regardless of whether ``start_date`` is set, but has no effect when + ``catchup=True``. - * If *True*, always run immediately the most recent possible Dag run. - * If *False*, wait to run until the next scheduled time in the future. - * If passed a ``timedelta``, will run the most recent possible Dag run - if that run's ``data_interval_end`` is within timedelta of now. - * If *None*, the timedelta is calculated as 10% of the time between the - most recent past scheduled time and the next scheduled time. E.g. if - running every hour, this would run the previous time if less than 6 - minutes had past since the previous run time, otherwise it would wait - until the next hour. + * If *True* (default), always run the most recent past cron point immediately. + * If *False*, skip the past cron point and wait for the next future one. + * If passed a ``timedelta``, run the most recent past cron point only if it + is within that timedelta of now; otherwise wait for the next future one. # todo: AIP-76 talk about how we can have auto-reprocessing of partitions # todo: AIP-76 we could allow a tuple of integer + time-based @@ -166,7 +159,7 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, run_offset: int | datetime.timedelta | relativedelta | None = None, - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, # todo: AIP-76 we can't infer partition date from this, so we need to store it separately key_format: str = r"%Y-%m-%dT%H:%M:%S", ) -> None: