Использование сенсоров в Airflow

Сенсор в Airflow — это оператор, который используется, когда в DAG требуется дождаться выполнения условия для продолжения выполнения задач.

Условия срабатывания сенсора зависят от его типа. Airflow предоставляет набор стандартных операторов, которые позволяют, например, ждать истечения определенного времени (TimeDeltaSensor), ждать появления файла (FileSensor) или завершения задачи в другом DAG (ExternalTaskSensor). Когда условие сенсора выполняется, он может запустить последующие задачи.

Параметры сенсора

У сенсоров есть следующие базовые параметры:

  • poke_interval — время, в течение которого задача должна ждать перед следующей проверкой условия. Значение по умолчанию — 60 секунд.

  • timeout — максимальное время работы сенсора. Значение по умолчанию — 7 дней или 60 * 60 * 24 * 7 секунд. Когда время истечет, задача сенсора будет завершена. Значение должно быть меньше, чем дата следующего запуска DAG, чтобы избежать зависания задач.

  • mode — индикатор того, как долго задача сенсора будет занимать рабочий слот. Может принимать одно из двух значений:

    • poke — сенсор всегда занимает слот, пока не завершится;

    • reschedule — сенсор занимает слот только во время проверок, а на время, указанное в poke_interval, планировщик освобождает слот.

Работа с сенсорами

Операторы сенсоров являются производными от класса airflow.sensors.base и наследуют его атрибуты. Airflow поддерживает несколько сенсоров, которые описаны в таблице ниже, но некоторые провайдеры предоставляют также дополнительные сенсоры.

Стандартные сенсоры Airflow
Название Описание

airflow.sensors.bash

Выполняет команду/скрипт bash, который можно использовать в качестве настраиваемого сенсора. Команда должна возвращать 0 при успешном выполнении, а в остальных случаях — любое другое значение

airflow.sensors.date_time

Приостанавливает выполнение DAG до наступления определенной даты и времени. Главным преимуществом этого сенсора является идемпотентность для target_time. Он обрабатывает некоторые случаи, для которых TimeSensor и TimeDeltaSensor не подходят

airflow.sensors.external_task

Ожидает завершения задачи в другом DAG

airflow.sensors.filesystem

Ожидает появления файла или директории в системе. Если указанный путь — непустая директория, то сенсор вернет значение True

airflow.sensors.python

Выполняет произвольный Python-код и ждет его завершения. Рекомендуется использовать декоратор @task.sensor вместо классического PythonSensor для выполнения Python-функций

airflow.sensors.time_delta

Находится в ожидании определенное время

airflow.sensors.time_sensor

Ожидает наступления указанного времени дня

airflow.sensors.weekday

Ожидает определенного дня недели. Например, если день выполнения задачи — 2018-12-22 (суббота), а в сенсор передается значение FRIDAY (пятница), задача будет ждать до следующей пятницы

Более подробное описание каждого сенсора доступно в документации Airflow.

Пример возможного использования сенсора в DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync (1)

dag = DAG(
    dag_id="sensor_example",
    schedule=None,
    start_date=datetime(2023, 6, 4),
)

time_sensor_example = TimeDeltaSensor( (2)
   task_id='time_sensor_example',
   dag=dag,
   delta=timedelta(minutes=1), (3)
   poke_interval=10, (4)
   timeout=30, (5)
   soft_fail=True, (6)
)
1 Импорт операторов сенсоров. TimeDeltaSensorAsync — это асинхронная версия оператора TimeDeltaSensor.
2 Создание экземпляра сенсора. Если режим сенсора не указан, по умолчанию используется poke.
3 Параметр delta указывает, как долго ждать, прежде чем задача может быть отмечена как выполненная.
4 Параметр poke_interval определяет, как часто сенсор проверяет условие (в секундах).
5 Параметр timeout определяет, как долго сенсор будет работать.
6 Параметр soft_fail отмечает задачу сенсора как SKIPPED, если она не выполнена.

Чтобы избежать блокировки (deadlock) задач и других проблем с производительностью, рассмотрите следующие рекомендации по работе с сенсорами:

  • Устанавливайте timeout для сенсора, который подойдет для вашей задачи. Значение по умолчанию для этого параметра может быть слишком большим в некоторых случаях.

  • Используйте режим reschedule для долго работающих сенсоров, чтобы они не занимали рабочий слот постоянно, а режим poke — для задач с частой проверкой условий (т.е. менее 5 минут), чтобы избежать перегрузки планировщика.

  • Убедитесь, что параметр poke_interval подходит для конкретной задачи. Значение по умолчанию предполагает очень частые проверки условий.

Создание кастомного сенсора

Чтобы создать кастомный сенсор, необходимо унаследовать класс BaseSensorOperator. В примере ниже демонстрируется создание нового сенсора, который проверяет наличие файла в директории.

  1. Добавьте файл Python, который будет содержать код сенсора, в каталог dags домашней директории Airflow.

    $ sudo vi /opt/airflow/dags/custom_sensor.py
  2. Напишите код для сенсора:

    import os
    from airflow.sensors.base import BaseSensorOperator (1)
    
    class MyCustomFileSensor(BaseSensorOperator): (2)
        def __init__(self, filepath, *args, **kwargs): (3)
            self.filepath = filepath
            super().__init__(*args, **kwargs)
    
        def poke(self, context): (4)
            print(f'checking if a file {self.filepath} exists...')
            return os.path.exists(self.filepath) (5)
    1 Импорт базового класса сенсора.
    2 Наследование базового класса.
    3 Добавление дополнительного аргумента filepath и наследование аргументов родительского класса.
    4 Переопределение метода poke.
    5 Python-логика для проверки существования файла.
  3. Вызовите новый сенсор внутри любого DAG:

    from custom_sensor import MyCustomFileSensor (1)
    
    check_file = MyCustomFileSensor(    (2)
        task_id='check_file',
        filepath='/opt/airflow/test_file.txt',
        poke_interval=10,
        timeout=60,
        dag=dag
    )
    1 Импорт класса кастомного сенсора.
    2 Вызов функции сенсора с нужными параметрами.
  4. Перейдите в веб-интерфейс Airflow, чтобы запустить DAG и убедиться, что функция кастомного сенсора работает корректно.

    Airflow UI: страница DAG
    Airflow UI: страница DAG
    Airflow UI: страница DAG
    Airflow UI: страница DAG

    На странице DAG видно, что задача check_file с классом MyCustomFileSensor успешно завершена.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней