Оптимизация производительности Airflow

Эта статья описывает методы оптимизации и рекомендуемые практики, которые помогут улучшить производительность сервиса Airflow в кластерах Arenadata Orchestrator. Описанные подходы основаны на последних рекомендациях Apache Airflow и применимы к версиям Airflow от 2.10.5 и выше.

Настройка планировщика

Планировщик (Scheduler) является основным ограничителем пропускной способности Airflow при большом количестве задач. Когда планировщик не успевает обрабатывать DAG, растут очереди задач и увеличивается время ожидания, что может приводить к перегрузке базы данных. Поэтому в плане оптимизации производительности настройка планировщика дает наибольший эффект.

Если время цикла планировщика увеличивается, проблемные места в производительности можно выявить, проверив периодичность парсинга DAG, проблемы в оптимизации кода DAG и значения параметров конфигурации, которые влияют на работу планировщика, например store_serialized_dags и min_file_process_interval.

Если задачи долго остаются в очереди, проверьте состояние Executor, длину очереди Celery broker, значения параметра worker_concurrency и возможное истощение пула соединений с БД.

Ключевые настройки

Параметр Описание Рекомендуемое начальное значение

store_serialized_dags

Включает возможность хранения сериализованных DAG в БД, чтобы планировщик читал предварительно скомпилированные DAG вместо повторной проверки Python на каждом цикле. Снижает нагрузку на CPU и I/O, особенно эффективно при большом количестве DAG

True

min_file_process_interval

Минимальное количество секунд между циклами парсинга DAG. Повышает эффективность парсинга за счет более редких обновлений DAG

30s

dag_dir_list_interval

Интервал сканирования директории dags для проверки новых или удаленных файлов. Можно увеличить для снижения I/O-нагрузки

60s

max_tis_per_query

Количество строк TaskInstance, получаемых за один цикл работы планировщика. Следует экспериментально подобрать баланс между стоимостью запроса в БД и пропускной способностью планировщика. Значение должно быть меньше core.parallelism

16–128

max_dagruns_to_create_per_loop

Максимальное количество DAG, для которых планируются запуски на один цикл (помогает распределять работу между несколькими планировщиками)

10–50

Параллелизм, высокая доступность и координация воркеров

Рекомендуется запускать планировщик на CPU-оптимизированных нодах. При высоких нагрузках на кластер используйте несколько планировщиков в режиме высокой доступности (High Availability, HA), присвоив параметру use_row_level_locking значение True и настроив опцию max_dagruns_to_create_per_loop, чтобы запуски DAG распределялись между планировщиками.

При большом числе DAG или их динамической генерации лучше увеличить CPU и память планировщика, предварительно минимизировав высокоуровневый код в DAG.

Рекомендации по дизайну DAG

Проблемы в дизайне DAG могут значительно увеличить нагрузку на планировщик и воркеры. Чтобы DAG не мешали оптимизации Airflow, используйте лучшие практики разработки DAG, описанные в этом разделе.

Парсинг и высокоуровневый код

При написании DAG важно избегать тяжелых импортов или блокирующих вызовов на верхнем уровне (вне функций задач или операторов). Планировщик и веб-сервер постоянно парсят DAG, и любые длительные операции замедляют этот процесс.

Каждый раз при парсинге DAG выполняется весь высокоуровневый код файла. Если в нем есть импорты, которые занимают продолжительное время, запросы к БД или сетевые вызовы, это сильно увеличивает время парсинга и нагружает планировщик и веб-сервер. При высокой нагрузке и большом количестве DAG такие задержки суммируются, что замедляет циклы планировщика, нарушает SLA и снижает пропускную способность кластера.

Блокирующие вызовы или большие зависимости на модульном уровне могут также вызвать нестабильность. Например, если DAG импортирует библиотеку, загрузка которой занимает несколько секунд, планировщик может зависнуть или отметить DAG как неработающий.

Тяжелые операции следует переносить внутрь функций задач или операторов, чтобы они выполнялись только при запуске задачи. Оптимизированный код DAG делает парсинг более быстрым и предсказуемым даже при большом количестве задач и частых обновлениях кода.

Пример проблемного DAG:

import pandas as pd  (1)
heavy = heavy_initialization() (2)

with DAG(
    dag_id="bad_dag",
    start_date=datetime(2025, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    @task()
    def do_work_bad():
        heavy.process()

    do_work_bad()
1 Тяжелый импорт здесь замедляет парсинг.
2 Высокоуровневая функция выполняется при парсинге, что также замедляет работу.

Пример исправленного DAG:

with DAG(
    dag_id="good_dag",
    start_date=datetime(2025, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    @task()
    def do_work_good():
        from mylib import heavy_initialization   (1)
        heavy = heavy_initialization()           (2)
        heavy.process()

    do_work_good()
1 Импорт внутри задачи.
2 Вызов функции внутри задачи.

Таким образом, перенос тяжелых импортов и высокоуровневых функций в операторы задач позволяет снизить задержку парсинга и пики использования CPU планировщика.

Динамический мэппинг задач

Динамический мэппинг задач (dynamic task mapping) позволяет оптимизировать производительность Airflow в средах с большим количеством параметризованных или повторяющихся задач с помощью изменения архитектуры DAG. Эта функция позволяет создавать задачи во время выполнения, а не генерировать их при парсинге.

При таком создании DAG код пишется только для одной задачи, в которой задается список входных данных вместо того, чтобы вручную создавать все объекты задач. При запуске такого DAG Airflow автоматически дублирует эту задачу в несколько параллельных экземпляров, а затем создает отдельные задачи в базе метаданных. Это снижает нагрузку на планировщик, ускоряет парсинг и сокращает размер DAG.

XCom и передача данных между задачами

Не рекомендуется передавать большие объемы данных через XCom. Для этого лучше использовать объектные хранилища (S3, HDFS) и передавать через XCom только указатели (пути, URI).

Для небольших структурированных сообщений используйте JSON-сериализацию или удаленный XCom-бэкенд.

Настройка воркеров и Executor

Executor определяет, как Airflow выполняет задачи параллельно. В ADO используются CeleryExecutor и KubernetesExecutor в зависимости от характеристик рабочей нагрузки.

Ключевые параметры настройки Executor в ADO перечислены ниже.

Параметр Описание Значение по умолчанию

parallelism

Глобальный лимит на количество экземпляров задач, которые могут выполняться параллельно во всей среде Airflow

32

dag_concurrency

Максимальное количество экземпляров задач, разрешенных для одновременного выполнения в группе DAG. Предотвращает исчерпание ресурсов кластера одной группой DAG

16

max_active_runs_per_dag

Максимальное количество активных запусков для каждого DAG. Слишком большое значение может привести к перегрузке Executor и планировщика

16

worker_concurrency

Максимальное количество задач, выполняемых одним воркером

16

Параметры параллельной работы воркеров и выделенные им ресурсы напрямую определяют эффективность выполнения задач в кластере.

Конфигурация воркеров определяет пропускную способность, скорость отклика и стабильность всей системы. Параллелизм определяет, сколько задач один воркер может выполнять одновременно. Если значение параллелизма слишком низкое, кластер может недоиспользовать доступные ресурсы CPU и памяти, что приводит к медленной работе DAG и простою инфраструктуры. Если значение слишком высокое, воркер может попытаться выполнить больше задач, чем может обработать хост, что приводит к перегрузке CPU, исчерпанию памяти и завершению задач.

Например, воркер с 8 CPU и параметром параллелизма (worker_concurrency) 64 может попытаться запустить слишком много процессов Python параллельно, что приводит к перегрузке из-за переключения контекста и снижению производительности. Однако, если значение параллелизма равно 2 на той же машине, ресурсы будут использоваться неэффективно, что приведет к ненужному скоплению задач в очереди.

Для оптимальной производительности установите уровень параллелизма, соответствующий доступным ресурсам (ядрам, памяти) и типичному профилю ресурсов ваших задач. Для простых задач (например, запросов к базе данных или вызовов API) более высокий уровень параллелизма может быть эффективным, но для более сложных задач (например, ETL или ML) более низкий уровень параллелизма обеспечивает стабильность и лучшую пропускную способность.

Рабочие процессы Airflow можно масштабировать как вертикально (увеличивая ресурсы CPU, памяти или диска), так и горизонтально, добавляя больше рабочих узлов. Рекомендуется согласовывать мощность вашего оборудования с типом рабочей нагрузки:

  • Задачи с высокой нагрузкой на CPU (например, преобразование данных, отправка заданий Spark) выигрывают от большего количества ядер, но требуют тщательной настройки параллелизма, чтобы задачи, связанные с CPU, не перегружали инфраструктуру.

  • Задачи с высоким потреблением памяти (например, большие фреймы данных Pandas) требуют выделения большего объема памяти и более низких настроек параллелизма для предотвращения ошибок нехватки памяти.

  • Смешанные рабочие нагрузки часто выигрывают от разделения рабочих процессов на пулы или очереди, чтобы разные типы задач выполнялись на рабочих процессах, оптимизированных под их потребности.

Настройка БД метаданных (PostgreSQL)

База метаданных часто становится первым и наиболее критическим узким местом при масштабировании Airflow. Каждый сигнал планировщика, обновление состояния задачи, запись в лог и анализ DAG приводят к операциям чтения и записи в этой базе данных. Поскольку рабочая нагрузка Airflow состоит из очень большого объема небольших, часто выполняемых вставок, обновлений и запросов, база данных должна быть специально настроена для высокой параллельной записи и чтения.

Рекомендации по оптимизации базы данных:

  • Установите значение max_connections достаточно большим для поддержки компонентов Airflow (планировщика, веб-сервера, рабочих процессов, CLI), но сбалансируйте его с использованием пула соединений, чтобы избежать исчерпания памяти.

  • Увеличьте значение wal_buffers и установите правильное значение checkpoint_completion_target, чтобы очистка WAL не блокировала частые обновления.

  • Убедитесь, что хранилище оптимизировано для быстрой случайной записи (например, SSD).

  • Таблицы метаданных Airflow, такие как task_instance, dag_run и log, быстро растут. Если автоочистка недостаточно производительна, раздувание таблиц может привести к замедлению запросов. Увеличьте частоту автоочистки для этих больших таблиц или запланируйте ручные операции VACUUM/ANALYZE для обеспечения предсказуемой производительности.

  • Партиционирование или периодическое архивирование истории старых задач и логов позволяет контролировать размеры таблиц.

Также важно масштабировать базу данных вертикально (процессор, память, IOPS) по мере роста рабочих процессов. Подробнее об оптимизации Arenadata Postgres читайте в статье Оптимизация производительности.

Пул соединений и SQLAlchemy

Параметр Описание Рекомендуемое начальное значение

sql_alchemy_pool_size

Размер пула SQLAlchemy — это максимальное количество подключений к базе данных в пуле. Значение 0 означает отсутствие ограничений

20

sql_alchemy_max_overflow

Максимальный размер переполнения пула. Когда количество соединений достигает размера, заданного параметром pool_size, будут доступны дополнительные соединения в пределах этого лимита. Когда эти дополнительные соединения возвращаются в пул, они отключаются и удаляются. Общее количество одновременных соединений, которое пул допускает, равно сумме значений параметров pool_size и max_overflow, а общее количество ожидающих соединений, которое пул допускает, равно pool_size. max_overflow можно установить равным -1, чтобы обозначить отсутствие ограничения переполнения. Ограничение на общее количество одновременных соединений не накладывается. Значение по умолчанию: 10

30

sql_alchemy_pool_recycle

Время перезапуска пула SQLAlchemy — это количество секунд, в течение которых соединение может простаивать в пуле, прежде чем оно будет прекращено. Если количество подключений к базе данных будет превышено, более низкое значение настройки позволит системе быстрее восстанавливаться

1800

Увеличение sql_alchemy_pool_size сокращает время ожидания подключений к базе данных. Убедитесь, что значение max_connections Postgres соответствует общему количеству подключений Airflow и другим пользователям базы данных.

Контроль параллелизма, пулов и приоритетов

Пулы

По умолчанию Airflow планирует столько задач, сколько позволяют возможности Executor и параллелизм воркеров. Это обеспечивает максимальную пропускную способность, но может легко привести к конкуренции за ресурсы, например, к перегрузке базы данных слишком большим количеством задач, превышению ограничений скорости API или монополизации рабочего процесса заданиями с интенсивным вводом-выводом. Пулы действуют как регулятор параллелизма: они группируют задачи по категориям ресурсов и ограничивают их количество, которое может выполняться одновременно во всем кластере Airflow.

Один из лучших способов использовать эту функцию — создавать пулы для операций, ограниченных ресурсами (например, hadoop_jobs, db_ingest, api_calls), явно задавать количество слотов и назначать задачи пулам с помощью аргумента пула в операторах.

Вы можете настроить пулы с помощью Airflow CLI или веб-интерфейса Airflow.

Приоритеты и SLA

Используйте priority_weight для задач, чтобы влиять на распределение слотов при заполнении пулов. Сочетайте это с пулами, чтобы обеспечить более раннее планирование высокоприоритетных заданий.

Использование ограничений на уровне DAG (например, max_active_runs_per_dag) вместе с пулами помогает сгладить пики выполнения задач и предотвратить исчерпание ресурсов. По умолчанию, если DAG запускается часто или имеет большой объем невыполненных запусков, Airflow может попытаться запланировать несколько запусков DAG одновременно. Параметр max_active_runs_per_dag ограничивает количество запусков DAG, которые могут выполняться параллельно, обеспечивая равномерное выполнение и предотвращая перегрузку системных ресурсов. В сочетании с пулами настройки уровня DAG управляют тем, как отдельный рабочий процесс использует емкость кластера, в то время как пулы координируют параллелизм между различными рабочими процессами и типами ресурсов.

Вы также можете реализовать ограничитель скорости выполнения задач верхнего уровня, который в некоторых случаях записывает данные в промежуточную очередь (например, Kafka).

Ресурсы Web Server

Веб-сервер Airflow обслуживает запросы UI, API и метаданных DAG/задач, объем которых линейно увеличивается с количеством пользователей и активностью DAG. Недостаток ресурсов веб-сервера может привести к медленной загрузке страниц, тайм-аутам и недоступности сервиса в периоды пиковой нагрузки.

Для оптимизации производительности важно масштабировать веб-сервер как вертикально, выделяя достаточно ресурсов CPU, памяти и пропускной способности сети, так и горизонтально, запуская несколько реплик за балансировщиком нагрузки. Дополнительно можно использовать Gunicorn для воркеров, чтобы регулировать их количество и число потоков в соответствии с доступными ресурсами, что позволяет эффективно обслуживать запросы без перегрузки CPU и памяти.

При высокой нагрузке на кластеры можно использовать кеширование статического контента, ограничение количества загружаемых DAG за сеанс пользователя и вынести требовательные по ресурсам API-запросы на отдельные эндпойнты. Это поможет повысить доступность интерфейса Airflow при высокой активности пользователей.

Логирование

Избегайте ведения только локальных логов при масштабировании рабочих процессов. Настройте удаленное логирование (например, в S3/GCS/Elasticsearch) для централизации логов и сокращения дискового ввода-вывода в рабочих процессах. Настройте параметры remote_base_log_folder и remote_log_conn_id через ADCM.

Оценка результатов настройки

Перед внесением существенных изменений в конфигурацию (например, повышение уровня параллелизма или размера пула баз данных) рекомендуется провести нагрузочные тесты с репрезентативными DAG, чтобы оценить влияние на планировщик, базу данных и воркеры. Лучший способ оценить эффективность настройки производительности — это использовать A/B-тестирование, чтобы изолировать влияние других факторов.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней