Использование сенсоров в Airflow
Сенсор в Airflow — это оператор, который используется, когда в DAG требуется дождаться выполнения условия для продолжения выполнения задач.
Условия срабатывания сенсора зависят от его типа. Airflow предоставляет набор стандартных операторов, которые позволяют, например, ждать истечения определенного времени (TimeDeltaSensor), ждать появления файла (FileSensor) или завершения задачи в другом DAG (ExternalTaskSensor). Когда условие сенсора выполняется, он может запустить последующие задачи.
Параметры сенсора
У сенсоров есть следующие базовые параметры:
-
poke_interval
— время, в течение которого задача должна ждать перед следующей проверкой условия. Значение по умолчанию — 60 секунд. -
timeout
— максимальное время работы сенсора. Значение по умолчанию — 7 дней или 60 * 60 * 24 * 7 секунд. Когда время истечет, задача сенсора будет завершена. Значение должно быть меньше, чем дата следующего запуска DAG, чтобы избежать зависания задач. -
mode
— индикатор того, как долго задача сенсора будет занимать рабочий слот. Может принимать одно из двух значений:-
poke
— сенсор всегда занимает слот, пока не завершится; -
reschedule
— сенсор занимает слот только во время проверок, а на время, указанное вpoke_interval
, планировщик освобождает слот.
-
Работа с сенсорами
Операторы сенсоров являются производными от класса airflow.sensors.base
и наследуют его атрибуты. Airflow поддерживает несколько сенсоров, которые описаны в таблице ниже, но некоторые провайдеры предоставляют также дополнительные сенсоры.
Название | Описание |
---|---|
airflow.sensors.bash |
Выполняет команду/скрипт bash, который можно использовать в качестве настраиваемого сенсора. Команда должна возвращать |
airflow.sensors.date_time |
Приостанавливает выполнение DAG до наступления определенной даты и времени. Главным преимуществом этого сенсора является идемпотентность для |
airflow.sensors.external_task |
Ожидает завершения задачи в другом DAG |
airflow.sensors.filesystem |
Ожидает появления файла или директории в системе. Если указанный путь — непустая директория, то сенсор вернет значение |
airflow.sensors.python |
Выполняет произвольный Python-код и ждет его завершения. Рекомендуется использовать декоратор |
airflow.sensors.time_delta |
Находится в ожидании определенное время |
airflow.sensors.time_sensor |
Ожидает наступления указанного времени дня |
airflow.sensors.weekday |
Ожидает определенного дня недели. Например, если день выполнения задачи — |
Более подробное описание каждого сенсора доступно в документации Airflow.
Пример возможного использования сенсора в DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync (1)
dag = DAG(
dag_id="sensor_example",
schedule=None,
start_date=datetime(2023, 6, 4),
)
time_sensor_example = TimeDeltaSensor( (2)
task_id='time_sensor_example',
dag=dag,
delta=timedelta(minutes=1), (3)
poke_interval=10, (4)
timeout=30, (5)
soft_fail=True, (6)
)
1 | Импорт операторов сенсоров. TimeDeltaSensorAsync — это асинхронная версия оператора TimeDeltaSensor . |
2 | Создание экземпляра сенсора. Если режим сенсора не указан, по умолчанию используется poke . |
3 | Параметр delta указывает, как долго ждать, прежде чем задача может быть отмечена как выполненная. |
4 | Параметр poke_interval определяет, как часто сенсор проверяет условие (в секундах). |
5 | Параметр timeout определяет, как долго сенсор будет работать. |
6 | Параметр soft_fail отмечает задачу сенсора как SKIPPED , если она не выполнена. |
Создание кастомного сенсора
Чтобы создать кастомный сенсор, необходимо унаследовать класс BaseSensorOperator
. В примере ниже демонстрируется создание нового сенсора, который проверяет наличие файла в директории.
-
Добавьте файл Python, который будет содержать код сенсора, в каталог dags домашней директории Airflow.
$ sudo vi /opt/airflow/dags/custom_sensor.py
-
Напишите код для сенсора:
import os from airflow.sensors.base import BaseSensorOperator (1) class MyCustomFileSensor(BaseSensorOperator): (2) def __init__(self, filepath, *args, **kwargs): (3) self.filepath = filepath super().__init__(*args, **kwargs) def poke(self, context): (4) print(f'checking if a file {self.filepath} exists...') return os.path.exists(self.filepath) (5)
1 Импорт базового класса сенсора. 2 Наследование базового класса. 3 Добавление дополнительного аргумента filepath
и наследование аргументов родительского класса.4 Переопределение метода poke
.5 Python-логика для проверки существования файла. -
Вызовите новый сенсор внутри любого DAG:
from custom_sensor import MyCustomFileSensor (1) check_file = MyCustomFileSensor( (2) task_id='check_file', filepath='/opt/airflow/test_file.txt', poke_interval=10, timeout=60, dag=dag )
1 Импорт класса кастомного сенсора. 2 Вызов функции сенсора с нужными параметрами. -
Перейдите в веб-интерфейс Airflow, чтобы запустить DAG и убедиться, что функция кастомного сенсора работает корректно.
Airflow UI: страница DAGAirflow UI: страница DAGНа странице DAG видно, что задача
check_file
с классомMyCustomFileSensor
успешно завершена.