Добавление кастомных операторов и хуков в Airflow
Обзор
Airflow предоставляет стандартные операторы, которые позволяют, например, выполнять команды bash, вызывать произвольные функции Python, создавать соединение с внешним приложением или проверять выполнение определенного условия.
Airflow предоставляет "из коробки" большое количество операторов, но в некоторых случаях необходимо изменить функцинальность существующего оператора или создать новый.
В этой статье описывается, как создавать пользовательские операторы и хуки. О создании кастомных сенсоров в Airflow можно прочитать в статье Использование сенсоров в Airflow.
При создании любого оператора Airflow необходимо:
-
Написать код для нового оператора в методе
__init__
. -
Наследовать класс Airflow, который нужно переписать или изменить, переопределив методы в производном классе:
-
В конструкторе (constructor) определите параметры нового оператора.
-
В методе
execute
напишите код, который будет выполняться при вызове оператора.
-
-
Поместите файл с оператором в директорию, которая присутствует в
PYTHONPATH
Airflow. По умолчанию это директории dags, plugins и config.
В примерах ниже показывается, как написать кастомные операторы и хуки, руководствуясь этими шагами.
Создание кастомного оператора
Ниже приведен пример создания кастомного оператора на основе класса airflow.models.baseoperator.BaseOperator
:
-
В директории dags в домашнем каталоге Airflow добавьте файл Python, который будет содержать код оператора:
$ sudo vi /opt/airflow/dags/custom_operator.py
-
Добавьте код оператора:
from airflow.models.baseoperator import BaseOperator (1) class CustomOperator(BaseOperator): (2) def __init__(self, name: str, **kwargs) -> None: (3) super().__init__(**kwargs) self.name = name def execute(self, context): (4) message = f"Hello, {self.name}!" print(message) return message
1 Импортируйте базовый класс оператора. 2 Наследуйте базовый класс в своем операторе. 3 Добавьте дополнительные аргументы и сохраните аргументы родительского класса. 4 Переопределите метод execute
, задав желаемую функциональность. -
Вызовите кастомный оператор в DAG:
from custom_operator import CustomOperator (1) print_message = CustomOperator( (2) task_id='print_message', name='CustomOperator', poke_interval=10, timeout=60, dag=dag )
1 Импортируйте класс кастомного оператора. 2 Вызовите функцию оператора.
Создание кастомного хука
Процесс создания кастомного хука отличается от процесса создания кастомного оператора тремя шагами:
-
Хук должен быть унаследован от класса другого хука, например, класса
BaseHook
или любого другого. -
В код хука можно включить метод
.get_conn()
, оборачивающий вызов метода.get_connection()
, чтобы получить информацию о соединении Airflow. -
Должно существовать соединение (Airflow connection), которое будет использоваться хуком.
Ниже приведен пример того, как создать пользовательскую функцию хука, которая инициирует новое соединение с внешним сервисом:
-
В директории dags домашнего каталога Airflow добавьте файл Python, который будет содержать кастомный хук:
$ sudo vi /opt/airflow/dags/custom_hook.py
-
Добавьте код хука:
from airflow.hooks.base import BaseHook (1) class CustomHook(BaseHook): (2) conn_name_attr = "custom_conn_id" (3) default_conn_name = "custom_conn_default" (4) conn_type = "http" (5) hook_name = "CustomHook" (6) def __init__( self, custom_conn_id: str = default_conn_name, *args, **kwargs ) -> None: (7) super().__init__(*args, **kwargs) (8) self.custom_conn_id = custom_conn_id (9) self.get_conn() (10) def get_conn(self): conn_id = getattr(self, self.conn_name_attr) conn = self.get_connection(conn_id) return conn
1 Импортируйте базовый класс хука. 2 Определите класс для наследования от базового класса. 3 Добавьте параметр, который сохраняет ID соединения. 4 Укажите ID соединения по умолчанию. 5 Укажите тип соединения. 6 Укажите имя хука. 7 Определите метод .init()
.8 Инициализируйте родительский хук. 9 Назначьте переменные класса. 10 При необходимости вызовите метод .get_conn()
при инициализации. -
Вызовите новый хук внутри DAG:
from custom_hook import CustomHook
-
Чтобы использовать пользовательский хук, необходимо создать соединение Airflow с параметрами, указанными в коде хука.
Добавление нового соединения в веб-интерфейсе AirflowДобавление нового соединения в веб-интерфейсе Airflow
Более подробно о веб-интерфейсе Airflow читайте в статье Обзор UI в Airflow.