Airflow

Airflow — это платформа, позволяющая разрабатывать, планировать, запускать и отслеживать выполнение сложных процессов обработки данных (workflows). Airflow идеально подходит для описания ETL/ELT-процессов, но также может быть полезен для периодического запуска и мониторинга любых иных задач. Для описания процессов Aiflow использует язык программирования Python.

Модель данных

В основе модели данных Airflow лежат следующие объекты.

DAG

Это ключевой объект модели данных Airflow, представляющий семантически связанную группу задач, которые должны выполняться в строго определенном порядке — в соответствии с указанным расписанием. Визуально DAG выглядит как ориентированный ациклический граф (directed acyclic graph), то есть граф без циклических зависимостей.

Простой DAG, изображенный ниже, состоит из трех задач (Task). Обратите внимание, что DAG определяет порядок запуска задач, а также зависимости между ними. Дополнительно Airflow позволяет настроить расписание для обработки DAG, действия в случае неудачного запуска, количество повторных попыток запуска и многое другое. При этом DAG не анализирует, что именно он запускает — за настройку обработки данных отвечают непосредственно сами задачи (Task).

dag dark
Простой DAG
dag light
Простой DAG

Task

Задача (Task) — это отдельный узел DAG, который описывает операции, применяемые к данным. Такими операциями могут быть загрузка данных из внешних источников, агрегация и индексирование, удаление дубликатов и другие типовые ETL-операции.

Operator

Оператор (Operator) — это шаблон для выполнения задач. Если задачи описывают применяемые к данным действия, то операторы определяют конкретные действия на уровне кода: с использованием Python-функций, Bash-скриптов и так далее.

Airflow поддерживает множество встроенных операторов. Кроме этого, доступны операторы из community provider packages. Также есть возможность создать свой собственный Operator путем расширения базового класса BaseOperator.

Отдельный вид операторов — Сенсоры (Sensors) — позволяют обрабатывать реакцию на определенные события: наступление заданного времени, загрузку специфических данных, запуск других DAG/Task и так далее.

Примеры полезных Airflow-операторов приведены ниже.

Операторы Airflow
Оператор Описание

PythonOperator

Выполнение Python-кода

BranchPythonOperator

Выполнение программного кода в зависимости от заданных условий

BashOperator

Запуск Bash-скриптов

SimpleHttpOperator

Отправка HTTP-запросов

PostgresOperator

Отправка запросов в базу данных PostgreSQL

MySqlOperator

Отправка запросов в базу данных MySQL

SqlSensor

Проверка выполнения SQL-запроса

DockerOperator

Запуск Docker-контейнера для задачи (Task)

KubernetesPodOperator

Создание отдельного пода (Pod) Kubernetes для задачи (Task)

EmailOperator

Отправка электронной почты (Email)

DummyOperator

"Пустой" оператор, используемый для группировки задач (Task)

Idempotence

Одна из ключевых концепций, лежащих в основе AirFlow — это хранение информации о каждом запуске DAG в соответствии с настроенным расписанием. Например, если указать, что DAG должен запускаться начиная с 01.01.2022 00:00:00 один раз в сутки, AirFlow будет сохранять информацию о запусках DAG для следующих временных меток: 01.01.2022 00:00:00, 02.01.2022 00:00:00, 03.01.2022 00:00:00 и так далее. Временные метки, приведенные в нашем примере, называются execution_date, соответствующие им экземпляры DAG — DAG Run, а экземпляры задач (Task), связанные с каждым конкретным DAG Run — Task Instance.

Эта концепция крайне важна для поддержания идемпотентности данных (idempotence): запуск или перезапуск задачи с одинаковым набором данных для определенной даты в прошлом всегда приводит к одним и тем же результатам. Дополнительно появляется возможность запускать задачи одного DAG для разных временных меток одновременно.

data model dark
Концепция Task Instance и Dag Run
data model light
Концепция Task Instance и Dag Run

Архитектура

Архитектура Airflow основана на следующих компонентах:

  • Web Server. Предоставляет удобный пользовательский интерфейс, который позволяет запускать DAG и Task, отслеживать их расписание, статус выполнения и так далее.

  • Metadata Database. Общая база данных, которую используют Scheduler, Executor и Web Server для хранения системных метаданных: глобальных переменных, настроек подключения к источникам данных, статусов запуска Task Instance и DAG Run и так далее. Так как в основе этого компонента лежит библиотека SQLAlchemy, требуется установка совместимой с ней базы данных. Например, MySQL или PostgreSQL.

  • Планировщик (Scheduler). Сервис, отвечающий за планирование в Airflow. Он управляет как запуском запланированных процессов по расписанию, так и непосредственной отправкой Task в Executor для выполнения.

    Регулярно (по умолчанию раз в минуту) планировщик анализирует результаты парсинга DAG и проверяет наличие задач, готовых к выполнению. Если необходимые для запуска задач условия соблюдены, планировщик инициализирует создание Task Instance. Для обработки активных задач Scheduler используется Executor, определенный в их настройках.

    Определенные версии Metadata Database (PostgreSQL 9.6+ и MySQL 8+) поддерживают наличие нескольких планировщиков. Их применение обеспечивает максимальную отказоустойчивость системы.

  • Executor. Механизм, с помощью которого выполняются экземпляры задач (Task Instance). По умолчанию Executor работает внутри планировщика (в рамках одного и того же процесса). Но для production-сред рекомендуется использовать типы Executor, делегирующие выполнение задач в Worker.

    Типы Executor
    Тип Описание

    SequentialExecutor

    Запускает задачи (Task) и приостанавливает планировщик (Scheduler) на время их выполнения. Этот тип Executor рекомендуется использовать только для тестирования — для production-среды он не подходит

    LocalExecutor

    Запускает дочерний процесс для каждой задачи (Task), что позволяет обрабатывать несколько задач одновременно. Это является отличной имитацией production-среды в тестовом окружении, но также не рекомендуется для реального использования из-за низкой отказоустойчивости: в случае сбоя на машине, на которой запущен Executor, задача не передается на другие узлы

    CeleryExecutor

    Поддерживает работу множества Worker на различных узлах. Использует библиотеку Celery и требует установки брокера сообщений. Например, Redis или RabbitMQ. При увеличении нагрузки достаточно добавить дополнительный Worker. При сбое любого Worker его задачи передаются работоспособным узлам

    DaskExecutor

    Аналогичен CeleryExecutor, но вместо Celery для параллельных вычислений использует библиотеку Dask

    KubernetesExecutor

    Запускает Worker на отдельном поде (Pod) в Kubernetes для каждого экземпляра задачи (Task Instance)

    CeleryKubernetesExecutor

    Позволяет запускать CeleryExecutor и KubernetesExecutor одновременно. Конкретный тип Executor выбирается в зависимости от текущего статуса очереди задач (Task)

    DebugExecutor

    Используется для запуска и отладки пайплайнов из IDE

  • Worker. Отдельный процесс, в котором выполняются задачи (Task). Расположение Worker зависит от выбранного типа Executor.

  • DAG Directory. Директория с DAG-файлами, доступная для Scheduler, Executor и Worker.

    ВАЖНО
    Все DAG в форме python-скриптов необходимо размещать в DAG Directory. Web UI предназначен только для мониторинга DAG, не для их добавления либо изменения.

Верхнеуровневая архитектурная схема Airflow изображена ниже. В зависимости от используемых типов Executor эта схема может содержать дополнительные компоненты: например, очередь сообщений (Message Queue) между Executor и Worker в случае CeleryExecutor. Но даже в таких частных случаях можно по-прежнему рассматривать Executor и Worker как единый логический компонент, управляющий выполнением задач (Task).

arch dark
Архитектура Airflow
arch light
Архитектура Airflow
ПРИМЕЧАНИЕ
Для получения дополнительной информации по концепциям можно обратиться к документации Airflow.
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней