Кастомизация расписания DAG

Обзор

Airflow предоставляет возможность запускать конвейеры данных по расписанию, позволяя задавать временной интервал (data interval) и логическую дату (logical date) каждого запуска DAG.

Расписание DAG можно определить с помощью одного из следующих инструментов:

Все параметры расписания, такие как интервал, дата, время и другие, определяются во внутреннем расписании DAG (timetable). DAG, запланированные с помощью выражения cron или объекта timedelta, также внутренне преобразуются в расписание.

В Airflow доступны расписания по умолчанию, которые обрабатывают выражения cron или timedelta, но для DAG с более специфичными требованиями к расписанию может потребоваться кастомный класс расписания DAG.

Примеры случаев, когда требуется кастомный класс timetable:

  • Конвейеры данных запускаются в разное время каждый день.

  • Расписание DAG требует использования другого формата календаря.

  • DAG запускается с перекрывающимися интервалами данных. Например, запуск DAG должен охватывать период предыдущих запусков.

  • DAG запускается с неравными временными интервалами.

Cron-расписания

Выражения Cron в Airflow следуют стандартному синтаксису cron, который состоит из пяти полей, разделенных пробелами, где каждое поле может принимать значения, перечисленные в таблице ниже.

Сron-выражения задаются в формате <1> <2> <3> <4> <5>, где:

  • <1> — минуты в диапазоне: 0-59.

  • <2> — час в диапазоне: 0-23.

  • <3> — день месяца в диапазоне: 1-31.

  • <4> — месяц в году. Диапазон от 1 до 12, где 1 — это январь.

  • <5> — числовой индикатор дня недели. Каждый день представлен числом в диапазоне от 0 до 6, где 0 — это воскресенье.

Например, расписание DAG, представленное выражением cron 0 18 * * 5, запускается каждую пятницу в 18.00.

Возможные значения Пример

Единственное значение

5

Диапазон значений, определяемый с помощью тире

1-5

Список значений, разделенных запятыми

1,3,5

Каждое возможное значение (звездочка)

*

В Airflow предусмотрены два метода для обработки cron-расписаний: CronTriggerTimetable и CronDataIntervalTimetable. Оба расписания запускают DAG в одно и то же время, но временная метка для run_id у DAG будет разной.

Например, если DAG с расписанием, определенным как 0 0 * * * (12.00 каждый день), включается 31 января в 15.00, CronTriggerTimetable запустит DAG 1 февраля в 12.00, но CronDataIntervalTimetable запустит DAG немедленно, поскольку запуск DAG для ежедневного временного интервала, начинающегося 31 января в 12.00, еще не произошел.

Пример DAG для CronTriggerTimetable, который запускает DAG в 1 час ночи в среду:

from datetime import timedelta
from airflow import DAG
from airflow.timetables.trigger import CronTriggerTimetable

def cron_timetable(*args, **kwargs):
    return CronTriggerTimetable(kwargs['cron_expression'])

dag = DAG(
    dag_id='my_dag',
    timetable=cron_timetable(cron_expression='0 1 * * 3', timezone="UTC"),
    start_date=days_ago(2),
    tags=['example'],
)

@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)
def example_dag():
    pass

При использовании CronDataIntervalTimetable можно указать статический интервал данных с помощью аргумента interval. Он должен использовать объекты datetime.timedelta или dateutil.relativedelta.relativedelta.

Если статический интервал указан, временной интервал запуска DAG охватывает указанную продолжительность и заканчивается временем срабатывания (trigger time).

Например:

from datetime import timedelta
from airflow.timetables.trigger import CronTriggerTimetable

@dag(
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

DAG из приведенного выше примера будет запускаться каждую пятницу в 18.00.

Timedelta-расписания

Расписания, использующие timedelta, позволяют запускать DAG с указанным временным интервалом. Это расписание опирается на значение временного интервала, предоставленное пользователем, и не обязательно согласует даты выполнения с началом дня или часа.

Вы можете использовать timedelta-расписание, указав значения datetime.timedelta или dateutil.relativedelta.relativedelta в параметре schedule.

Например:

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

Расписание событий

Расписание событий позволяет запускать DAG в указанные даты и время. Список дат должен быть небольшого размера, поскольку он должен загружаться каждый раз при обработке DAG.

Например:

from airflow import DAG
import pendulum
from airflow.timetables.events import EventsTimetable

@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2024, 4, 5, 8, 27, tz="America/Vancouver"),
            pendulum.datetime(2024, 4, 17, 8, 27, tz="America/Vancouver"),
            pendulum.datetime(2024, 4, 22, 20, 50, tz="America/Vancouver"),
        ],
        description="Example dates",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

Кастомное расписание

Кастомные расписания позволяют задавать произвольный график запусков DAG в коде Python, когда расписаний на основе cron и timedelta недостаточно.

Чтобы создать кастомное расписание, необходимо:

  • создать класс, который наследует класс расписания (Timetable);

  • реализовать методы next_dagrun_info() и infer_manual_data_interval(), оба из которых возвращают объект DataInterval;

  • зарегистрировать расписание как часть плагина Airflow.

Метод next_dagrun_info() возвращает интервал данных для стандартного расписания DAG и описывает логику для его расчета. Этот метод также содержит логику для расчета параметров start_date, end_date и catchup.

Метод infer_manual_data_interval() описывает, как определяется интервал работы DAG в случае ручного запуска.

В примере ниже показано, как написать DAG с кастомным расписанием.

Чтобы создать DAG с кастомным расписанием:

  1. В директории plugins домашнего каталога Airflow добавьте новый файл Python, который будет содержать кастомное расписание:

    $ sudo vi /opt/airflow/plugins/custom_timetable.py
  2. В файле custom_timetable.py определите структуру расписания:

    from datetime import timedelta
    from typing import Optional
    from pendulum import Date, DateTime, Time, timezone
    from airflow.plugins_manager import AirflowPlugin
    from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable (1)
    
    UTC = timezone("UTC") (2)
    
    class CustomTimetable(Timetable): (3)
        pass
    
    class CustomTimetablePlugin(AirflowPlugin): (4)
        name = "custom_timetable_plugin"
        timetables = [CustomTimetable]
    1 Импорт зависимостей.
    2 Объявление глобальных переменных.
    3 Наследование базового класса.
    4 Регистрация расписания как плагина Airflow.
  3. В классе CustomTimetable напишите реализацию метода next_dagrun_info():

    class CustomTimetable(Timetable):
    
        def next_dagrun_info(
            self,
            *,
            last_automated_data_interval: Optional[DataInterval],
            restriction: TimeRestriction,
        ) -> Optional[DagRunInfo]:
            if last_automated_data_interval is not None:  (1)
                last_start = last_automated_data_interval.start
                delta = timedelta(days=1)
                if last_start.hour == 6:
                    next_start = last_start.set(hour=16, minute=30).replace(tzinfo=UTC)
                    next_end = (last_start+delta).replace(tzinfo=UTC)
                else:
                    next_start = (last_start+delta).set(hour=6, minute=0).replace(tzinfo=UTC)
                    next_end = (last_start+delta).replace(tzinfo=UTC)
            else: (2)
                next_start = restriction.earliest
                if next_start is None:
                    return None
                if not restriction.catchup: (3)
                    next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
                next_start = next_start.set(hour=6, minute=0).replace(tzinfo=UTC)
                next_end = next_start.set(hour=16, minute=30).replace(tzinfo=UTC)
            if restriction.latest is not None and next_start > restriction.latest: (4)
                return None
            return DagRunInfo.interval(start=next_start, end=next_end)
    1 Описание логики для случая, когда уже был предыдущий запуск DAG.
    2 Описание логики первого запуска DAG.
    3 Реализация поведения catchup.
    4 Проверка того, что следующий запуск DAG не начнется вне расписания.
  4. В классе CustomTimetable опишите логику планирования запусков в случае ручного запуска в методе infer_manual_data_interval():

    class CustomTimetable(Timetable):
    
        def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
            delta = timedelta(days=1)
            if run_after >= run_after.set(hour=6, minute=0) and run_after <= run_after.set(hour=16, minute=30):
                start = (run_after-delta).set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
                end = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
            elif run_after >= run_after.set(hour=16, minute=30) and run_after.hour <= 23:
                start = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC)
                end = run_after.set(hour=16, minute=30, second=0).replace(tzinfo=UTC)
            else:
                start = (run_after-delta).set(hour=6, minute=0).replace(tzinfo=UTC)
                end = (run_after-delta).set(hour=16, minute=30).replace(tzinfo=UTC)
            return DataInterval(start=start, end=end)
  5. Используйте кастомное расписание в DAG:

    from custom_timetable import CustomTimetable (1)
    
    @dag(
        dag_id="example_dag",
        start_date=datetime(2024, 3, 10),
        schedule=CustomTimetable(), (2)
        default_args={
            "retries": 1,
            "retry_delay": duration(minutes=3),
        },
        catchup=True
    )
    1 Импорт кастомного расписания.
    2 Вызов кастомного класса для использования его в качестве расписание запуска DAG.

    Дополнительную информацию о запуске DAG можно получить в статье Создание простого DAG.

ВАЖНО
Добавление или обновление плагина Airflow, включая пользовательские расписания, требует перезапуска Airflow Scheduler и веб-сервера.
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней