Использование шаблонов Jinja в Airflow
Обзор
Airflow поддерживает использование шаблонов, что позволяет обновлять код DAG динамически во время выполнения с использованием переменных, макросов и управляющих конструкций. Эта функция позволяет в зависимости от контекста выполнения менять в DAG параметры, которые используют информацию о времени выполнения, экземплярах задач и пользовательскую логику.
В Airflow используется фреймворк шаблонов Jinja. Это шаблонизатор на Python, который позволяет встраивать динамические выражения и управляющие конструкции в код.
В этой статье описано использование шаблонов Jinja внутри DAG и операторов в Airflow: рассмотрены основные шаблоны, шаблонизируемые поля, макросы и примеры использования.
Jinja в Airflow
Используя Jinja, Airflow рендерит определенные поля, называемые шаблонизируемыми (templated fields), во время выполнения. Эти поля определяются на уровне операторов.
Типовые операторы, такие как BashOperator, PythonOperator, EmailOperator и SqlOperator, имеют шаблонизируемые поля вроде bash_command,
sql и subject.
Все доступные опции описаны в справочнике шаблонов Airflow.
Пример использования шаблонов Jinja:
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime (1)
with DAG("templating_example", start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False) as dag: (2)
t1 = BashOperator(
task_id='print_date',
bash_command='echo "Execution date is {{ ds }}"', (3)
)
| 1 | Импорт зависимостей. |
| 2 | Инициализация DAG. |
| 3 | Добавление шаблона в строку вывода. В этом примере {{ ds }} будет заменен на дату выполнения во время запуска (например, 2025-04-22). |
Ограничения использования шаблонов
Для использования шаблонов в параметрах операторов используется синтаксис {{ … }} и {% … %}.
В Airflow поддерживаются следующие типы шаблонов:
-
Переменные. Например:
{{ ds }},{{ task_instance }}и другие. -
Управляющие конструкции. Например:
{% if … %},{% for … %}. -
Пользовательские выражения. Например:
{{ macros.ds_add(ds, 7) }}.
Чтобы узнать, какие поля являются шаблонизируемыми, обратитесь к документации нужного оператора. Также можно определить собственные шаблонизируемые поля, дополнив код оператора.
Путь поиска шаблонов
Airflow может загружать внешние шаблоны из разных директорий. Это позволяет разделять шаблоны по типам, например: SQL-запросы или шаблоны отчетов.
Пример:
dag = DAG(
dag_id='template_path_example',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
template_searchpath=['/opt/airflow/templates/sql']
)
где template_searchpath — путь к шаблонам SQL.
PythonOperator
Jinja можно использовать в Python-функциях, подставляя значения шаблона до выполнения задачи. Шаблонные строки можно передать через op_kwargs.
Например:
from airflow.operators.python import PythonOperator
def greet_user(execution_date, **kwargs):
print(f"Hello! Today is {execution_date}")
PythonOperator(
task_id='greet',
python_callable=greet_user,
op_kwargs={"execution_date": "{{ ds }}"},
dag=dag,
)
Airflow рендерит "{{ ds }}" в фактическую дату до вызова greet_user.
Макросы и фильтры
Airflow включает встроенные макросы, такие как macros.ds_add, macros.datetime, фильтры вроде | ds_format, | replace и другие макросы.
Например:
BashOperator(
task_id='seven_days_later',
bash_command='echo "In 7 days: {{ macros.ds_add(ds, 7) }}"',
)
Также можно определить пользовательские макросы внутри DAG или загрузить их через плагины.
Создание DAG с использованием шаблонов Jinja
Ниже приведен пример создания DAG, который генерирует ежедневный отчет в формате Markdown с использованием шаблонов Jinja.
Шаг 1. Подготовка файла шаблона
Создайте файл шаблона Jinja, который будет использоваться для генерации отчетов.
-
Создайте директорию для шаблонов в среде Airflow:
$ sudo mkdir -p /opt/airflow/templates -
В директории templates создайте файл с именем report_template.md и следующим содержимым:
# Daily report **Date:** {{ ds }} **Task Status:** {{ task_instance.state }} ## Summary - DAG ID: {{ dag.dag_id }} - Task ID: {{ task.task_id }}Значения в фигурных скобках — это плейсхолдеры, которые будут заполнены динамически во время выполнения.
Шаг 2. Создание Python-кода для DAG
Создайте файл dynamic_report_dag.py в директории DAG и добавьте следующий код:
from airflow import DAG (1)
from airflow.operators.python import PythonOperator
from datetime import datetime
from jinja2 import Environment, FileSystemLoader
def render_report(ds, **context): (2)
env = Environment(loader=FileSystemLoader('/opt/airflow/templates'))
template = env.get_template('report_template.md')
report = template.render(
ds=ds,
dag=context['dag'],
task=context['task'],
task_instance=context['ti']
)
output_path = f"/opt/airflow/reports/report_{ds}.md" (3)
with open(output_path, "w") as f:
f.write(report)
with DAG( (4)
dag_id="dynamic_report_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
catchup=False,
template_searchpath=['/opt/airflow/templates'],
) as dag:
generate_report = PythonOperator(
task_id='generate_report',
python_callable=render_report,
provide_context=True,
)
| 1 | Импорт зависимостей. |
| 2 | Определение функции для рендеринга шаблона Jinja. |
| 3 | Сохранение отчета. |
| 4 | Инициализация DAG. Дополнительную информацию о создании DAG можно получить в статье Создание простого DAG. |
Шаг 3. Подготовка директории для отчетов
-
Создайте директорию, в которую будут сохраняться сгенерированные отчеты:
$ sudo mkdir -p /opt/airflow/reports -
Настройте необходимые права доступа:
$ sudo chmod -R 755 /opt/airflow/reports/ $ sudo chown -R airflow:airflow /opt/airflow/reports
Шаг 4. Запуск DAG
-
Перейдите в веб-интерфейс Airflow.
-
Найдите и снимите с паузы DAG
dynamic_report_dag. -
Запустите DAG.
После его выполнения в директории reports появится файл с текущей датой в названии и со следующим содержимым:
# Daily report **Date:** 2025-04-28 **Task Status:** running ## Summary - DAG ID: dynamic_report_dag - Task ID: generate_report
Файл содержит дату выполнения, статус задачи, DAG ID и ID задачи, соответствующие контексту выполнения.