Динамическая генерация DAG в Airflow
Обзор
Традиционные DAG удобно использовать для конвейера данных с одним источником, например, когда нужно извлечь данные из файла, обработать их и загрузить результаты.
В случаях, когда необходимо запустить один и тот же пайплайн с разными параметрами или для нескольких источников данных, лучшим решением будет генерировать DAG динамически для каждого случая.
Существует два способа генерации DAG с вариативными параметрами:
-
Динамическая генерация DAG (dynamic DAGs generation)
В динамически генерируемых DAG структура DAG меняется в зависимости от определенных статических значений (файлов конфигурации, переменных среды, имен файлов и т.д.).
-
Динамический мэппинг задач (dynamic task mapping)
Динамический мэппинг задач позволяет генерировать параллельные задачи во время выполнения на основе вывода предыдущей задачи.
Для удобства примеры в этой статье написаны с использованием API TaskFlow, но вы также можете динамически генерировать DAG, используя традиционный синтаксис.
Динамическая генерация DAG
Процесс динамического создания DAG похож на процесс создания обычного DAG. Поскольку Airflow выполняет весь Python-код, размещенный в директории dags, вы можете запустить выполнение любого Python-файла, который генерирует объекты DAG.
Частота генерации новых DAG определяется параметром min_file_process_interval
, значение которого по умолчанию составляет 30 секунд. Вы можете изменить значение этого параметра, отредактировав поле cfg_properties_template через ADCM.
Чтобы отредактировать свойство min_file_process_interval
через ADCM:
-
Перейдите в веб-интерфейс ADCM и выберите нужный кластер ADH.
-
Перейдите в Services → Airflow2 → Primary configuration и включите опцию Show advanced.
-
Откройте раздел airflow.cfg и выберите cfg_properties_template.
-
Установите желаемое значение для
min_file_process_interval
. -
Сохраните конфигурацию, выбрав Save → Create, и перезапустите сервис, нажав Actions → Restart.
ВНИМАНИЕ
Если значение параметра |
Чтобы создать динамически генерируемый DAG:
-
В директории dags домашнего каталога Airflow добавьте новый файл Python, который будет содержать код генератора DAG:
$ sudo vi /opt/airflow/dags/dag_generator.py
-
Добавьте код для генератора DAG:
from airflow import DAG from airflow.decorators import task from datetime import datetime (1) for file in ("dev_data.csv", "test_data.csv", "prod_data.csv"): (2) dag_id = f"generated_dag_{file}" @dag(dag_id=dag_id, schedule="@daily", default_args=default_args, catchup=False, start_date=datetime(2025,1,1)) (3) def create_dag(filename): @task (4) def extract(filename): return filename @task def process(filename): return filename @task def load(filename): print(filename) return filename load(process(extract(filename))) (5) create_dag(filename) (6)
1 Импорт зависимостей. 2 Создание цикла для генерации DAG. 3 Создание функции DAG. 4 Добавление задач. 5 Определение зависимостей задач через вызов функций. 6 Вызов функции DAG.
При необходимости конфигурации DAG с помощью переменных рекомендуется использовать переменные окружения, а не переменные Airflow.
Использование переменных Airflow в коде верхнего уровня создает соединение с базой данных, где хранятся метаданные, что может замедлить синтаксический анализ и создать дополнительную нагрузку на базу данных.
Вы также можете импортировать переменные из одного из нескольких внешних источников, таких как файлы конфигурации.
Например, в директории include/dag-configs/ домашнего каталога Airflow размещено два файла конфигурации JSON. Файлы имеют следующее содержимое:
-
dev_config.json:
{ "dag_id": "development_dag", "schedule": "@daily", "input": "dev_data.csv" }
-
prod_config.json:
{ "dag_id": "production_dag", "schedule": "@daily", "input": "prod_data.csv" }
Вы можете создать скрипт Python, который будет генерировать отдельные файлы DAG для каждого файла конфигурации, используя указанный файл DAG в качестве шаблона.
Чтобы генерировать DAG с использованием шаблона:
-
В домашней директории Airflow добавьте новый файл Python, который будет содержать шаблон DAG:
$ sudo vi /opt/airflow/include/templates/dag_template.py
-
Добавьте код, который будет использован генератором в качестве шаблона для DAG. Например, можно использовать код из предыдущего примера и определить параметры, которые будут меняться при генерации:
from airflow import DAG from airflow.decorators import task from datetime import datetime @dag(dag_id = "dag_id_placeholder", schedule="schedule_placeholder", catchup=False, start_date=datetime(2025,1,1)) (1) def create_dag(filename): @task def extract(filename): return filename @task def process(filename): return filename @task def load(filename): print(filename) return filename load(process(extract("input_placeholder"))) (2) create_dag(filename)
1 Добавление переменных, куда будут подставляться значения параметров dag_id
иschedule_interval
.2 Добавление переменной для входного значения. -
В домашней директории Airflow добавьте файл Python, который будет генерировать DAG с использованием шаблона:
$ sudo vi /opt/airflow/include/scripts/dag_generator.py
-
Напишите код генератора DAG:
import json import os import shutil import fileinput config_filepath = "include/dag-configs/" (1) template_filepath = "include/templates/dag_template.py" (2) for filename in os.listdir(config_filepath): (3) if filename.endswith('.json'): config = json.load(open(f"include/data/{filename}")) generated_dag = f"dags/generated_dag_{config['dag_id']}.py" shutil.copyfile(template_filepath, generated_dag) for line in fileinput.input(generated_dag, inplace=True): line = line.replace("dag_id_placeholder", config['dag_id']) line = line.replace("schedule_placeholder", config['schedule']) line = line.replace("input_placeholder", config['input']) print(line, end="")
1 Путь к файлам конфигурации. 2 Путь к файлу шаблона. 3 Описание логики замены переменных на нужные значения из файлов конфигурации.
При выполнении скрипта директория dag-configs проверяется на наличие доступных файлов конфигурации JSON и генерируется файл DAG для каждого файла конфигурации на основе указанного шаблона. Переменные в шаблоне сопоставляются с полями из файлов конфигурации и заменяются на нужные значения.
После генерации файлов Python Airflow загрузит их как обычные DAG.
Динамический мэппинг задач
В примерах выше демонстрируется, как генерировать DAG с разными параметрами. При этом количество задач в сгенерированных DAG остается прежним и не может меняться.
Чтобы создавать задачи в DAG на основе меняющихся выходных данных другой задачи, можно использовать динамический мэппинг задач.
Этот подход использует фреймворк MapReduce и работает с помощью метода expand()
. Этот метод создает новые задачи путем копирования той, которая его вызывает, с использованием предоставленных аргументов и вывода предыдущей задачи.
Метод expand()
используется с оператором и принимает список или словарь значений в качестве входных данных. Эти значения также могут быть выходными данными другой задачи в формате XComArg
.
Пример динамического мэппинга задач:
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id = "dynamic_task_dag", schedule="@daily", catchup=False, start_date=datetime(2025,1,1))
def create_tasks():
@task
def download_files(file: str): (1)
print(file)
files = download_files.expand(file=["file_a", "file_b", "file_c"]) (2)
download_files(file)
create_tasks()
1 | Создание функции задачи. В данном случае — функции, которая выводит содержимое файла. |
2 | Вызов метода expand() и передача ему списка файлов в качестве аргумента. DAG сгенерирует три задачи для каждого файла. |
Пример динамического мэппинга задач, когда вывод предыдущей задачи не определен:
from airflow.decorators import dag, task
from datetime import datetime
import random
@dag(dag_id = "dynamic_task_dag", schedule="@daily",
catchup=False, start_date=datetime(2025,1,1))
def create_tasks():
@task
def get_files():
return ["file_{i}" for i in range(random.randint(3,5))] (1)
@task
def download_files(file: str):
print(file)
files = download_files.expand(file=get_files()) (2)
create_tasks()
1 | Создание функции со случайным выводом. |
2 | Вызов метода expand() и передача ему вывода функции в качестве аргумента. DAG сгенерирует задачу для каждого файла. |