Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions airflow-core/docs/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,63 @@ must be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta`
def example_dag():
pass

.. versionadded:: 3.0.0
The ``run_immediately`` argument was introduced in Airflow 3.

The optional ``run_immediately`` argument controls which cron point is scheduled when a Dag is first
enabled or re-enabled after a pause. It has no effect when ``catchup=True`` (in that case the
scheduler always continues from where it left off).

Comment thread
manipatnam marked this conversation as resolved.
* ``run_immediately=True`` *(default)* — schedule the **most recent past** cron point immediately.
* ``run_immediately=False`` — skip the past cron point and wait for the **next future** cron point.
* ``run_immediately=timedelta(...)`` — schedule the most recent past cron point only if it fired
within the given window; otherwise wait for the **next future** cron point.

.. code-block:: python

from datetime import datetime, timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
# Runs every 10 minutes.
# run_immediately=False: always skip the most recent past slot and wait
# for the next 10-minute boundary.
schedule=CronTriggerTimetable(
"*/10 * * * *",
timezone="UTC",
run_immediately=False,
),
start_date=datetime(2024, 1, 1),
catchup=False,
...,
)
def example_dag():
pass


@dag(
# Runs hourly.
# run_immediately=timedelta(minutes=10): run the most recent past slot
# only if it fired within the last 10 minutes; otherwise wait for next.
schedule=CronTriggerTimetable(
"0 * * * *",
timezone="UTC",
run_immediately=timedelta(minutes=10),
),
start_date=datetime(2024, 1, 1),
catchup=False,
...,
)
def example_dag_with_buffer():
pass

.. note::

``run_immediately`` is a parameter of ``CronTriggerTimetable``, **not** of the ``DAG``
constructor. Passing it directly to ``DAG(run_immediately=...)`` has no effect.


.. _MultipleCronTriggerTimetable:

Expand All @@ -169,7 +226,7 @@ This is similar to CronTriggerTimetable_ except it takes multiple cron expressio
def example_dag():
pass

The same optional ``interval`` argument as CronTriggerTimetable_ is also available.
The same optional ``interval`` and ``run_immediately`` arguments as CronTriggerTimetable_ are also available.

.. code-block:: python

Expand Down Expand Up @@ -350,10 +407,11 @@ The following is another example showing the difference in the case of skipping

Suppose there are two running Dags with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the Dags at 3PM on January 31st and re-enable them at 3PM on February 2nd,

- `CronTriggerTimetable`_ skips the Dag runs that were supposed to trigger on February 1st and 2nd. The next Dag run will be triggered at 12AM on February 3rd.
- `CronDataIntervalTimetable`_ skips the Dag runs that were supposed to trigger on February 1st only. A Dag run for February 2nd is immediately triggered after you re-enable the Dag.
- Both `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ skip the Dag run that was supposed to trigger on February 1st. A Dag run for February 2nd is immediately triggered after you re-enable the Dag.

The difference between the two timetables in this scenario is the ``run_id`` timestamp: for ``CronTriggerTimetable``, the ``run_id`` reflects midnight on February 2nd (the trigger time), while for ``CronDataIntervalTimetable``, the ``run_id`` reflects midnight on February 1st (the start of the data interval being processed).

In these examples, you see how a trigger timetable creates Dag runs more intuitively and similar to what
In the first example (enabling a new Dag), you see how a trigger timetable creates Dag runs more intuitively and similar to what
people expect a workflow to behave, while a data interval timetable is designed heavily around the data
interval it processes, and does not reflect a workflow's own properties.

Expand Down
104 changes: 48 additions & 56 deletions airflow-core/src/airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ def next_dagrun_info(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
start_time_candidates.append(self._calc_first_run())
# _calc_first_run respects run_immediately to decide between the
# most recent past cron point and the next future one.
start_time_candidates = [
self._calc_first_run(),
self._get_next(last_automated_data_interval.end),
]
else:
start_time_candidates = [self._calc_first_run()]
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)
Expand Down Expand Up @@ -165,19 +168,17 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable):
:param timezone: Which timezone to use to interpret the cron string
:param interval: timedelta that defines the data interval start. Default 0.

*run_immediately* controls, if no *start_time* is given to the DAG, when
the first run of the DAG should be scheduled. It has no effect if there
already exist runs for this DAG.

* If *True*, always run immediately the most recent possible DAG run.
* If *False*, wait to run until the next scheduled time in the future.
* If passed a ``timedelta``, will run the most recent possible DAG run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
*run_immediately* controls which cron point is scheduled when a Dag is
first enabled or re-enabled after a pause. It always takes effect
regardless of whether ``start_date`` is set, but has no effect when
``catchup=True``.

.. versionadded:: 3.0.0

* If *True* (default), always run the most recent past cron point immediately.
* If *False*, skip the past cron point and wait for the next future one.
* If passed a ``timedelta``, run the most recent past cron point only if it
is within that timedelta of now; otherwise wait for the next future one.
"""

def __init__(
Expand All @@ -186,7 +187,7 @@ def __init__(
*,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
) -> None:
super().__init__(cron, timezone)
self._interval = interval
Comment thread
manipatnam marked this conversation as resolved.
Expand All @@ -200,7 +201,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable:
data["expression"],
timezone=parse_timezone(data["timezone"]),
interval=decode_interval(data["interval"]),
run_immediately=decode_run_immediately(data.get("run_immediately", False)),
run_immediately=decode_run_immediately(data.get("run_immediately", True)),
)

def serialize(self) -> dict[str, Any]:
Expand All @@ -215,26 +216,21 @@ def serialize(self) -> dict[str, Any]:

def _calc_first_run(self) -> DateTime:
"""
If no start_time is set, determine the start.
Determine which cron point to schedule next based on ``run_immediately``.

If True, always prefer past run, if False, never. If None, if within 10% of next run,
if timedelta, if within that timedelta from past run.
If *True*, always run the most recent past cron point.
If *False*, always wait for the next future cron point.
If a ``timedelta``, run the most recent past cron point only if it falls
within that window of now; otherwise wait for the next future cron point.
"""
now = coerce_datetime(utcnow())
past_run_time = self._align_to_prev(now)
next_run_time = self._align_to_next(now)
if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true.
return past_run_time

gap_between_runs = next_run_time - past_run_time
gap_to_past = now - past_run_time
if isinstance(self._run_immediately, datetime.timedelta):
buffer_between_runs = self._run_immediately
else:
buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5))
if gap_to_past <= buffer_between_runs:
return past_run_time
return next_run_time
if now - past_run_time <= self._run_immediately:
return past_run_time
return self._align_to_next(now)


class MultipleCronTriggerTimetable(Timetable):
Expand All @@ -253,7 +249,7 @@ def __init__(
*crons: str,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
) -> None:
if not crons:
raise ValueError("cron expression required")
Expand Down Expand Up @@ -373,19 +369,15 @@ class CronPartitionTimetable(CronTriggerTimetable):
The partition key will be derived from the partition date.
:param key_format: How to translate the partition date into a string partition key.

*run_immediately* controls, if no *start_time* is given to the Dag, when
the first run of the Dag should be scheduled. It has no effect if there
already exist runs for this Dag.
*run_immediately* controls which cron point is scheduled when a Dag is
first enabled or re-enabled after a pause. It always takes effect
regardless of whether ``start_date`` is set, but has no effect when
``catchup=True``.

* If *True*, always run immediately the most recent possible Dag run.
* If *False*, wait to run until the next scheduled time in the future.
* If passed a ``timedelta``, will run the most recent possible Dag run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
* If *True* (default), always run the most recent past cron point immediately.
* If *False*, skip the past cron point and wait for the next future one.
* If passed a ``timedelta``, run the most recent past cron point only if it
is within that timedelta of now; otherwise wait for the next future one.

# todo: AIP-76 talk about how we can have auto-reprocessing of partitions
# todo: AIP-76 we could allow a tuple of integer + time-based
Expand All @@ -399,7 +391,7 @@ def __init__(
*,
timezone: str | Timezone | FixedTimezone,
run_offset: int | datetime.timedelta | relativedelta | None = None,
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
# todo: AIP-76 we can't infer partition date from this, so we need to store it separately.
key_format: str = r"%Y-%m-%dT%H:%M:%S",
) -> None:
Comment thread
manipatnam marked this conversation as resolved.
Expand All @@ -426,7 +418,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable:
cron=data["expression"],
timezone=parse_timezone(data["timezone"]),
run_offset=offset,
run_immediately=decode_run_immediately(data.get("run_immediately", False)),
run_immediately=decode_run_immediately(data.get("run_immediately", True)),
key_format=data["key_format"],
)

Expand Down Expand Up @@ -485,15 +477,15 @@ def next_dagrun_info_v2(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
start_time_candidates = [prev_candidate]
if last_dagrun_info is not None:
next_candidate = self._get_next(last_dagrun_info.run_after)
start_time_candidates.append(next_candidate)
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
first_run = self._calc_first_run()
start_time_candidates.append(first_run)
# _calc_first_run respects run_immediately to decide between the
# most recent past cron point and the next future one.
start_time_candidates = [
self._calc_first_run(),
self._get_next(last_dagrun_info.run_after),
]
else:
start_time_candidates = [self._calc_first_run()]
if restriction.earliest is not None:
earliest = self._align_to_next(restriction.earliest)
start_time_candidates.append(earliest)
Expand Down
Loading
Loading