Customize DAG scheduling

Overview

Airflow provides the ability to run data pipelines on a schedule, allowing you to define the data interval and logical date of each DAG run.

A DAG schedule can be defined with one of the following tools:

  • built-in timedelta object of the Python datetime module;

  • Linux Cron;

  • custom timetable classes.

All the schedule parameters, like data interval, date and time, and other, are determined in a DAG’s internal timetable. DAGs scheduled with a cron expression or a timedelta object are internally converted into a timetable as well.

Airflow has default timetables that handle cron expressions or timedeltas, but for DAGs with a more specific schedule, it might be required to create a custom timetable class and pass it to the DAG’s schedule argument.

The examples of cases when a custom timetable is needed:

  • Data pipelines start at different times each day.

  • The DAG schedule requires the use of a different calendar.

  • DAG runs with overlapping data intervals. For example, a DAG run must cover the period of the previous runs.

  • DAG runs with uneven intervals.

Cron-based schedule

Cron expressions in Airflow follow the standard cron syntax, which consists of five fields separated by whitespaces, where each field can accept values listed in the table below.

A cron expression syntax is <1> <2> <3> <4> <5>, where:

  • <1> — the minute in the range: 0-59.

  • <2> — the hour in the range: 0-23.

  • <3> — the day of the month in the range: 1-31.

  • <4> — the month in the year. The range is from 1 to 12, where 1 is January.

  • <5> — the numeric indicator of a day in the week. Each day is represented by a number in a range from 0 to 6, where 0 is Sunday.

For example, a DAG’s schedule represented by a cron expression 0 18 * * 5 runs every Friday at 6 PM.

Possible value Example

A single value

5

A range of values defined using a dash

1-5

A list of values separated by commas

1,3,5

Every possible value is represented by an asterisk

*

Airflow has two timetables CronTriggerTimetable and CronDataIntervalTimetable for handling cron schedules. Both timetables trigger DAG runs at the same time but the timestamp for the run_id for DAGs will be different.

For example, if a DAG with the schedule defined as 0 0 * * * (12 AM every day) is enabled at 3 PM on January 31, CronTriggerTimetable will trigger a new DAG run at 12 AM on February 1, but CronDataIntervalTimetable will trigger the DAG immediately, because the DAG run for the daily time interval beginning at 12 AM on January 31 did not occur yet.

An example DAG for CronTriggerTimetable that triggers a DAG run at 1 AM on Wednesday:

from datetime import timedelta
from airflow import DAG
from airflow.timetables.trigger import CronTriggerTimetable

def cron_timetable(*args, **kwargs):
    return CronTriggerTimetable(kwargs['cron_expression'])

dag = DAG(
    dag_id='my_dag',
    timetable=cron_timetable(cron_expression='0 1 * * 3', timezone="UTC"),
    start_date=days_ago(2),
    tags=['example'],
)

@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)
def example_dag():
    pass

When using CronDataIntervalTimetable, you can specify a static data interval using the interval argument. It must use datetime.timedelta or dateutil.relativedelta.relativedelta objects.

If specified, the DAG’s data interval spans the specified duration and ends with the trigger time.

For example:

from datetime import timedelta
from airflow.timetables.trigger import CronTriggerTimetable

@dag(
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

The DAG from the above example will run every Friday at 6 PM.

Timedelta schedule

The timedelta schedule uses a timetable that runs DAGs with a specified time interval. This timetable relies on the data interval value provided by the user and does not necessarily align execution dates with the start of the day or an hour.

You can use a timedelta schedule by providing a datetime.timedelta or dateutil.relativedelta.relativedelta values to the schedule parameter.

For example:

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

Events timetable

Events timetable enables running DAGs at specified datetimes. The list of dates should have a moderate size since it must be loaded every time the DAG is parsed.

For example:

from airflow import DAG
import pendulum
from airflow.timetables.events import EventsTimetable

@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2024, 4, 5, 8, 27, tz="America/Vancouver"),
            pendulum.datetime(2024, 4, 17, 8, 27, tz="America/Vancouver"),
            pendulum.datetime(2024, 4, 22, 20, 50, tz="America/Vancouver"),
        ],
        description="Example dates",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

Custom timetables

Custom timetables allow users to define their own schedules in Python code when cron-based and timedelta timetables are not sufficient.

To create a custom timetable, you need to:

  • create a class that inherits from a timetable;

  • implement the next_dagrun_info() and infer_manual_data_interval() methods, both of which return a DataInterval object;

  • register the custom timetable as part of an Airflow plugin.

The next_dagrun_info() method returns the data interval for the DAG’s regular schedule and specifies the logic to calculate it. The method also contains the logic for calculating the start_date, end_date, and catchup parameters.

The infer_manual_data_interval() method determines how to define the interval in the case of manual execution.

The example below illustrates how to write a DAG with a custom timetable.

To create a DAG with a custom timetable:

  1. In the plugins directory of your Airflow home, add a new Python file that will contain your custom timetable:

    $ sudo vi /opt/airflow/plugins/custom_timetable.py
  2. In the custom_timetable.py file, define the structure of the timetable:

    from datetime import timedelta
    from typing import Optional
    from pendulum import Date, DateTime, Time, timezone
    from airflow.plugins_manager import AirflowPlugin
    from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable (1)
    
    UTC = timezone("UTC") (2)
    
    class CustomTimetable(Timetable): (3)
        pass
    
    class CustomTimetablePlugin(AirflowPlugin): (4)
        name = "custom_timetable_plugin"
        timetables = [CustomTimetable]
    1 Import the dependencies.
    2 Declare global variables if necessary.
    3 Inherit the base class in your custom timetable.
    4 Register your timetable as an Airflow plugin.
  3. In the CustomTimetable class, write the implementation of the next_dagrun_info() method:

    class CustomTimetable(Timetable):
    
        def next_dagrun_info(
            self,
            *,
            last_automated_data_interval: Optional[DataInterval],
            restriction: TimeRestriction,
        ) -> Optional[DagRunInfo]:
            if last_automated_data_interval is not None:  (1)
                last_start = last_automated_data_interval.start
                delta = timedelta(days=1)
                if last_start.hour == 6:
                    next_start = last_start.set(hour=16, minute=30).replace(tzinfo=UTC)
                    next_end = (last_start+delta).replace(tzinfo=UTC)
                else:
                    next_start = (last_start+delta).set(hour=6, minute=0).replace(tzinfo=UTC)
                    next_end = (last_start+delta).replace(tzinfo=UTC)
            else: (2)
                next_start = restriction.earliest
                if next_start is None:
                    return None
                if not restriction.catchup: (3)
                    next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
                next_start = next_start.set(hour=6, minute=0).replace(tzinfo=UTC)
                next_end = next_start.set(hour=16, minute=30).replace(tzinfo=UTC)
            if restriction.latest is not None and next_start > restriction.latest: (4)
                return None
            return DagRunInfo.interval(start=next_start, end=next_end)
    1 Write the logic for the case when there was a previous run.
    2 Write the logic for the first run.
    3 Implement the catchup behavior.
    4 Make sure the next run won’t start out of schedule.
  4. In the CustomTimetable class, write the logic for scheduling runs in case of a manual start in the infer_manual_data_interval() method:

    class CustomTimetable(Timetable):
    
        def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
            delta = timedelta(days=1)
            if run_after >= run_after.set(hour=6, minute=0) and run_after <= run_after.set(hour=16, minute=30):
                start = (run_after-delta).set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
                end = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
            elif run_after >= run_after.set(hour=16, minute=30) and run_after.hour <= 23:
                start = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
                end = run_after.set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
            else:
                start = (run_after-delta).set(hour=6, minute=0).replace(tzinfo=UTC)
                end = (run_after-delta).set(hour=16, minute=30).replace(tzinfo=UTC)
            return DataInterval(start=start, end=end)
  5. Use the custom timetable in a DAG:

    from custom_timetable import CustomTimetable (1)
    
    @dag(
        dag_id="example_dag",
        start_date=datetime(2024, 3, 10),
        schedule=CustomTimetable(), (2)
        default_args={
            "retries": 1,
            "retry_delay": duration(minutes=3),
        },
        catchup=True
    )
    1 Import the custom timetable.
    2 Call the custom timetable class to use it as the DAG’s schedule.

    For more information on how to run a DAG, see the Create a simple DAG article.

IMPORTANT
Adding or updating an Airflow plugin, including custom timetables, requires a restart of Airflow Scheduler and the web server.
Found a mistake? Seleсt text and press Ctrl+Enter to report it