Кастомизация расписания DAG
Обзор
Airflow предоставляет возможность запускать конвейеры данных по расписанию, позволяя задавать временной интервал (data interval) и логическую дату (logical date) каждого запуска DAG.
Расписание DAG можно определить с помощью одного из следующих инструментов:
Все параметры расписания, такие как интервал, дата, время и другие, определяются во внутреннем расписании DAG (timetable). DAG, запланированные с помощью выражения cron или объекта timedelta
, также внутренне преобразуются в расписание.
В Airflow доступны расписания по умолчанию, которые обрабатывают выражения cron или timedelta
, но для DAG с более специфичными требованиями к расписанию может потребоваться кастомный класс расписания DAG.
Примеры случаев, когда требуется кастомный класс timetable
:
-
Конвейеры данных запускаются в разное время каждый день.
-
Расписание DAG требует использования другого формата календаря.
-
DAG запускается с перекрывающимися интервалами данных. Например, запуск DAG должен охватывать период предыдущих запусков.
-
DAG запускается с неравными временными интервалами.
Cron-расписания
Выражения Cron в Airflow следуют стандартному синтаксису cron, который состоит из пяти полей, разделенных пробелами, где каждое поле может принимать значения, перечисленные в таблице ниже.
Сron-выражения задаются в формате <1> <2> <3> <4> <5>
, где:
-
<1>
— минуты в диапазоне: 0-59. -
<2>
— час в диапазоне: 0-23. -
<3>
— день месяца в диапазоне: 1-31. -
<4>
— месяц в году. Диапазон от 1 до 12, где 1 — это январь. -
<5>
— числовой индикатор дня недели. Каждый день представлен числом в диапазоне от 0 до 6, где 0 — это воскресенье.
Например, расписание DAG, представленное выражением cron 0 18 * * 5
, запускается каждую пятницу в 18.00.
Возможные значения | Пример |
---|---|
Единственное значение |
5 |
Диапазон значений, определяемый с помощью тире |
1-5 |
Список значений, разделенных запятыми |
1,3,5 |
Каждое возможное значение (звездочка) |
* |
В Airflow предусмотрены два метода для обработки cron-расписаний: CronTriggerTimetable
и CronDataIntervalTimetable
. Оба расписания запускают DAG в одно и то же время, но временная метка для run_id
у DAG будет разной.
Например, если DAG с расписанием, определенным как 0 0 * * *
(12.00 каждый день), включается 31 января в 15.00, CronTriggerTimetable
запустит DAG 1 февраля в 12.00, но CronDataIntervalTimetable
запустит DAG немедленно, поскольку запуск DAG для ежедневного временного интервала, начинающегося 31 января в 12.00, еще не произошел.
Пример DAG для CronTriggerTimetable
, который запускает DAG в 1 час ночи в среду:
from datetime import timedelta
from airflow import DAG
from airflow.timetables.trigger import CronTriggerTimetable
def cron_timetable(*args, **kwargs):
return CronTriggerTimetable(kwargs['cron_expression'])
dag = DAG(
dag_id='my_dag',
timetable=cron_timetable(cron_expression='0 1 * * 3', timezone="UTC"),
start_date=days_ago(2),
tags=['example'],
)
@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)
def example_dag():
pass
При использовании CronDataIntervalTimetable
можно указать статический интервал данных с помощью аргумента interval
. Он должен использовать объекты datetime.timedelta
или dateutil.relativedelta.relativedelta
.
Если статический интервал указан, временной интервал запуска DAG охватывает указанную продолжительность и заканчивается временем срабатывания (trigger time).
Например:
from datetime import timedelta
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
schedule=CronTriggerTimetable(
"0 18 * * 5",
timezone="UTC",
interval=timedelta(days=4, hours=9),
),
...,
)
def example_dag():
pass
DAG из приведенного выше примера будет запускаться каждую пятницу в 18.00.
Timedelta-расписания
Расписания, использующие timedelta, позволяют запускать DAG с указанным временным интервалом. Это расписание опирается на значение временного интервала, предоставленное пользователем, и не обязательно согласует даты выполнения с началом дня или часа.
Вы можете использовать timedelta-расписание, указав значения datetime.timedelta
или dateutil.relativedelta.relativedelta
в параметре schedule
.
Например:
@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
pass
Расписание событий
Расписание событий позволяет запускать DAG в указанные даты и время. Список дат должен быть небольшого размера, поскольку он должен загружаться каждый раз при обработке DAG.
Например:
from airflow import DAG
import pendulum
from airflow.timetables.events import EventsTimetable
@dag(
schedule=EventsTimetable(
event_dates=[
pendulum.datetime(2024, 4, 5, 8, 27, tz="America/Vancouver"),
pendulum.datetime(2024, 4, 17, 8, 27, tz="America/Vancouver"),
pendulum.datetime(2024, 4, 22, 20, 50, tz="America/Vancouver"),
],
description="Example dates",
restrict_to_events=False,
),
...,
)
def example_dag():
pass
Кастомное расписание
Кастомные расписания позволяют задавать произвольный график запусков DAG в коде Python, когда расписаний на основе cron и timedelta
недостаточно.
Чтобы создать кастомное расписание, необходимо:
-
создать класс, который наследует класс расписания (
Timetable
); -
реализовать методы
next_dagrun_info()
иinfer_manual_data_interval()
, оба из которых возвращают объектDataInterval
; -
зарегистрировать расписание как часть плагина Airflow.
Метод next_dagrun_info()
возвращает интервал данных для стандартного расписания DAG и описывает логику для его расчета. Этот метод также содержит логику для расчета параметров start_date
, end_date
и catchup
.
Метод infer_manual_data_interval()
описывает, как определяется интервал работы DAG в случае ручного запуска.
В примере ниже показано, как написать DAG с кастомным расписанием.
Чтобы создать DAG с кастомным расписанием:
-
В директории plugins домашнего каталога Airflow добавьте новый файл Python, который будет содержать кастомное расписание:
$ sudo vi /opt/airflow/plugins/custom_timetable.py
-
В файле custom_timetable.py определите структуру расписания:
from datetime import timedelta from typing import Optional from pendulum import Date, DateTime, Time, timezone from airflow.plugins_manager import AirflowPlugin from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable (1) UTC = timezone("UTC") (2) class CustomTimetable(Timetable): (3) pass class CustomTimetablePlugin(AirflowPlugin): (4) name = "custom_timetable_plugin" timetables = [CustomTimetable]
1 Импорт зависимостей. 2 Объявление глобальных переменных. 3 Наследование базового класса. 4 Регистрация расписания как плагина Airflow. -
В классе
CustomTimetable
напишите реализацию методаnext_dagrun_info()
:class CustomTimetable(Timetable): def next_dagrun_info( self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction, ) -> Optional[DagRunInfo]: if last_automated_data_interval is not None: (1) last_start = last_automated_data_interval.start delta = timedelta(days=1) if last_start.hour == 6: next_start = last_start.set(hour=16, minute=30).replace(tzinfo=UTC) next_end = (last_start+delta).replace(tzinfo=UTC) else: next_start = (last_start+delta).set(hour=6, minute=0).replace(tzinfo=UTC) next_end = (last_start+delta).replace(tzinfo=UTC) else: (2) next_start = restriction.earliest if next_start is None: return None if not restriction.catchup: (3) next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) next_start = next_start.set(hour=6, minute=0).replace(tzinfo=UTC) next_end = next_start.set(hour=16, minute=30).replace(tzinfo=UTC) if restriction.latest is not None and next_start > restriction.latest: (4) return None return DagRunInfo.interval(start=next_start, end=next_end)
1 Описание логики для случая, когда уже был предыдущий запуск DAG. 2 Описание логики первого запуска DAG. 3 Реализация поведения catchup. 4 Проверка того, что следующий запуск DAG не начнется вне расписания. -
В классе
CustomTimetable
опишите логику планирования запусков в случае ручного запуска в методеinfer_manual_data_interval()
:class CustomTimetable(Timetable): def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: delta = timedelta(days=1) if run_after >= run_after.set(hour=6, minute=0) and run_after <= run_after.set(hour=16, minute=30): start = (run_after-delta).set(hour=16, minute=30, second=0).replace(tzinfo=UTC) end = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC) elif run_after >= run_after.set(hour=16, minute=30) and run_after.hour <= 23: start = run_after.set(hour=6, minute=0, second=0).replace(tzinfo=UTC) end = run_after.set(hour=16, minute=30, second=0).replace(tzinfo=UTC) else: start = (run_after-delta).set(hour=6, minute=0).replace(tzinfo=UTC) end = (run_after-delta).set(hour=16, minute=30).replace(tzinfo=UTC) return DataInterval(start=start, end=end)
-
Используйте кастомное расписание в DAG:
from custom_timetable import CustomTimetable (1) @dag( dag_id="example_dag", start_date=datetime(2024, 3, 10), schedule=CustomTimetable(), (2) default_args={ "retries": 1, "retry_delay": duration(minutes=3), }, catchup=True )
1 Импорт кастомного расписания. 2 Вызов кастомного класса для использования его в качестве расписание запуска DAG. Дополнительную информацию о запуске DAG можно получить в статье Создание простого DAG.
ВАЖНО
Добавление или обновление плагина Airflow, включая пользовательские расписания, требует перезапуска Airflow Scheduler и веб-сервера.
|