Использование шаблонов Jinja в Airflow

Обзор

Airflow поддерживает использование шаблонов, что позволяет обновлять код DAG динамически во время выполнения с использованием переменных, макросов и управляющих конструкций. Эта функция позволяет в зависимости от контекста выполнения менять в DAG параметры, которые используют информацию о времени выполнения, экземплярах задач и пользовательскую логику.

В Airflow используется фреймворк шаблонов Jinja. Это шаблонизатор на Python, который позволяет встраивать динамические выражения и управляющие конструкции в код.

В этой статье описано использование шаблонов Jinja внутри DAG и операторов в Airflow: рассмотрены основные шаблоны, шаблонизируемые поля, макросы и примеры использования.

Jinja в Airflow

Используя Jinja, Airflow рендерит определенные поля, называемые шаблонизируемыми (templated fields), во время выполнения. Эти поля определяются на уровне операторов. Типовые операторы, такие как BashOperator, PythonOperator, EmailOperator и SqlOperator, имеют шаблонизируемые поля вроде bash_command, sql и subject.

Все доступные опции описаны в справочнике шаблонов Airflow.

Пример использования шаблонов Jinja:

from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime (1)

with DAG("templating_example", start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False) as dag: (2)
    t1 = BashOperator(
        task_id='print_date',
        bash_command='echo "Execution date is {{ ds }}"', (3)
    )
1 Импорт зависимостей.
2 Инициализация DAG.
3 Добавление шаблона в строку вывода. В этом примере {{ ds }} будет заменен на дату выполнения во время запуска (например, 2025-04-22).

Ограничения использования шаблонов

Для использования шаблонов в параметрах операторов используется синтаксис {{ …​ }} и {% …​ %}.

В Airflow поддерживаются следующие типы шаблонов:

  • Переменные. Например: {{ ds }}, {{ task_instance }} и другие.

  • Управляющие конструкции. Например: {% if …​ %}, {% for …​ %}.

  • Пользовательские выражения. Например: {{ macros.ds_add(ds, 7) }}.

Чтобы узнать, какие поля являются шаблонизируемыми, обратитесь к документации нужного оператора. Также можно определить собственные шаблонизируемые поля, дополнив код оператора.

Путь поиска шаблонов

Airflow может загружать внешние шаблоны из разных директорий. Это позволяет разделять шаблоны по типам, например: SQL-запросы или шаблоны отчетов.

Пример:

dag = DAG(
    dag_id='template_path_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    template_searchpath=['/opt/airflow/templates/sql']
)

где template_searchpath — путь к шаблонам SQL.

PythonOperator

Jinja можно использовать в Python-функциях, подставляя значения шаблона до выполнения задачи. Шаблонные строки можно передать через op_kwargs.

Например:

from airflow.operators.python import PythonOperator

def greet_user(execution_date, **kwargs):
    print(f"Hello! Today is {execution_date}")

PythonOperator(
    task_id='greet',
    python_callable=greet_user,
    op_kwargs={"execution_date": "{{ ds }}"},
    dag=dag,
)

Airflow рендерит "{{ ds }}" в фактическую дату до вызова greet_user.

Макросы и фильтры

Airflow включает встроенные макросы, такие как macros.ds_add, macros.datetime, фильтры вроде | ds_format, | replace и другие макросы.

Например:

BashOperator(
    task_id='seven_days_later',
    bash_command='echo "In 7 days: {{ macros.ds_add(ds, 7) }}"',
)

Также можно определить пользовательские макросы внутри DAG или загрузить их через плагины.

Создание DAG с использованием шаблонов Jinja

Ниже приведен пример создания DAG, который генерирует ежедневный отчет в формате Markdown с использованием шаблонов Jinja.

Шаг 1. Подготовка файла шаблона

Создайте файл шаблона Jinja, который будет использоваться для генерации отчетов.

  1. Создайте директорию для шаблонов в среде Airflow:

    $ sudo mkdir -p /opt/airflow/templates
  2. В директории templates создайте файл с именем report_template.md и следующим содержимым:

    # Daily report
    
    **Date:** {{ ds }}
    **Task Status:** {{ task_instance.state }}
    
    ## Summary
    - DAG ID: {{ dag.dag_id }}
    - Task ID: {{ task.task_id }}

    Значения в фигурных скобках — это плейсхолдеры, которые будут заполнены динамически во время выполнения.

Шаг 2. Создание Python-кода для DAG

Создайте файл dynamic_report_dag.py в директории DAG и добавьте следующий код:

from airflow import DAG (1)
from airflow.operators.python import PythonOperator
from datetime import datetime
from jinja2 import Environment, FileSystemLoader

def render_report(ds, **context): (2)
    env = Environment(loader=FileSystemLoader('/opt/airflow/templates'))
    template = env.get_template('report_template.md')
    report = template.render(
        ds=ds,
        dag=context['dag'],
        task=context['task'],
        task_instance=context['ti']
    )

    output_path = f"/opt/airflow/reports/report_{ds}.md" (3)
    with open(output_path, "w") as f:
        f.write(report)

with DAG(  (4)
    dag_id="dynamic_report_dag",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    template_searchpath=['/opt/airflow/templates'],
) as dag:

    generate_report = PythonOperator(
        task_id='generate_report',
        python_callable=render_report,
        provide_context=True,
    )
1 Импорт зависимостей.
2 Определение функции для рендеринга шаблона Jinja.
3 Сохранение отчета.
4 Инициализация DAG. Дополнительную информацию о создании DAG можно получить в статье Создание простого DAG.

Шаг 3. Подготовка директории для отчетов

  1. Создайте директорию, в которую будут сохраняться сгенерированные отчеты:

    $ sudo mkdir -p /opt/airflow/reports
  2. Настройте необходимые права доступа:

    $ sudo chmod -R 755 /opt/airflow/reports/
    $ sudo chown -R airflow:airflow /opt/airflow/reports

Шаг 4. Запуск DAG

  1. Перейдите в веб-интерфейс Airflow.

  2. Найдите и снимите с паузы DAG dynamic_report_dag.

  3. Запустите DAG.

После его выполнения в директории reports появится файл с текущей датой в названии и со следующим содержимым:

# Daily report

**Date:** 2025-04-28
**Task Status:** running

## Summary
- DAG ID: dynamic_report_dag
- Task ID: generate_report

Файл содержит дату выполнения, статус задачи, DAG ID и ID задачи, соответствующие контексту выполнения.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней