Добавление кастомных операторов и хуков в Airflow

Обзор

Airflow предоставляет стандартные операторы, которые позволяют, например, выполнять команды bash, вызывать произвольные функции Python, создавать соединение с внешним приложением или проверять выполнение определенного условия.

Airflow предоставляет "из коробки" большое количество операторов, но в некоторых случаях необходимо изменить функцинальность существующего оператора или создать новый.

В этой статье описывается, как создавать пользовательские операторы и хуки. О создании кастомных сенсоров в Airflow можно прочитать в статье Использование сенсоров в Airflow.

При создании любого оператора Airflow необходимо:

  1. Написать код для нового оператора в методе __init__.

  2. Наследовать класс Airflow, который нужно переписать или изменить, переопределив методы в производном классе:

    • В конструкторе (constructor) определите параметры нового оператора.

    • В методе execute напишите код, который будет выполняться при вызове оператора.

  3. Поместите файл с оператором в директорию, которая присутствует в PYTHONPATH Airflow. По умолчанию это директории dags, plugins и config.

В примерах ниже показывается, как написать кастомные операторы и хуки, руководствуясь этими шагами.

Разделение логики между операторами и хуками

 

Оператор должен содержать бизнес-логику задачи и состояния в памяти. Если вам необходимо создать соединение с внешним сервисом (API, базой данных и т.д.), лучше написать его как хук.

Хук не должен содержать логику оператора, а только предоставлять методы, которые затем будут использоваться оператором для взаимодействия с внешним сервисом.

Разделив логику на два компонента, можно избежать дублирования кода и создать несколько операторов, которые используют один и тот же хук.

Взаимодействие операторов и хуков
Взаимодействие операторов и хуков
Взаимодействие операторов и хуков
Взаимодействие операторов и хуков

Создание кастомного оператора

Ниже приведен пример создания кастомного оператора на основе класса airflow.models.baseoperator.BaseOperator:

  1. В директории dags в домашнем каталоге Airflow добавьте файл Python, который будет содержать код оператора:

    $ sudo vi /opt/airflow/dags/custom_operator.py
  2. Добавьте код оператора:

    from airflow.models.baseoperator import BaseOperator  (1)
    
    class CustomOperator(BaseOperator):  (2)
        def __init__(self, name: str, **kwargs) -> None:  (3)
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):  (4)
            message = f"Hello, {self.name}!"
            print(message)
            return message
    1 Импортируйте базовый класс оператора.
    2 Наследуйте базовый класс в своем операторе.
    3 Добавьте дополнительные аргументы и сохраните аргументы родительского класса.
    4 Переопределите метод execute, задав желаемую функциональность.
  3. Вызовите кастомный оператор в DAG:

    from custom_operator import CustomOperator (1)
    
    print_message = CustomOperator(    (2)
        task_id='print_message',
        name='CustomOperator',
        poke_interval=10,
        timeout=60,
        dag=dag
    )
    1 Импортируйте класс кастомного оператора.
    2 Вызовите функцию оператора.

Создание кастомного хука

Процесс создания кастомного хука отличается от процесса создания кастомного оператора тремя шагами:

  • Хук должен быть унаследован от класса другого хука, например, класса BaseHook или любого другого.

  • В код хука можно включить метод .get_conn(), оборачивающий вызов метода .get_connection(), чтобы получить информацию о соединении Airflow.

  • Должно существовать соединение (Airflow connection), которое будет использоваться хуком.

Ниже приведен пример того, как создать пользовательскую функцию хука, которая инициирует новое соединение с внешним сервисом:

  1. В директории dags домашнего каталога Airflow добавьте файл Python, который будет содержать кастомный хук:

    $ sudo vi /opt/airflow/dags/custom_hook.py
  2. Добавьте код хука:

    from airflow.hooks.base import BaseHook (1)
    
    class CustomHook(BaseHook): (2)
    
        conn_name_attr = "custom_conn_id" (3)
        default_conn_name = "custom_conn_default" (4)
        conn_type = "http" (5)
        hook_name = "CustomHook" (6)
    
        def __init__(
            self, custom_conn_id: str = default_conn_name, *args, **kwargs
        ) -> None: (7)
            super().__init__(*args, **kwargs) (8)
            self.custom_conn_id = custom_conn_id (9)
            self.get_conn() (10)
    
        def get_conn(self):
            conn_id = getattr(self, self.conn_name_attr)
            conn = self.get_connection(conn_id)
            return conn
    1 Импортируйте базовый класс хука.
    2 Определите класс для наследования от базового класса.
    3 Добавьте параметр, который сохраняет ID соединения.
    4 Укажите ID соединения по умолчанию.
    5 Укажите тип соединения.
    6 Укажите имя хука.
    7 Определите метод .init().
    8 Инициализируйте родительский хук.
    9 Назначьте переменные класса.
    10 При необходимости вызовите метод .get_conn() при инициализации.
  3. Вызовите новый хук внутри DAG:

    from custom_hook import CustomHook
  4. Чтобы использовать пользовательский хук, необходимо создать соединение Airflow с параметрами, указанными в коде хука.

    Добавление нового соединения в веб-интерфейсе Airflow
    Добавление нового соединения в веб-интерфейсе Airflow
    Добавление нового соединения в веб-интерфейсе Airflow
    Добавление нового соединения в веб-интерфейсе Airflow

Более подробно о веб-интерфейсе Airflow читайте в статье Обзор UI в Airflow.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней