Логирование в Airflow

Airflow ведет текстовые логи для анализа ошибок, которые могут возникнуть в процессе работы DAG. Эти логи расположены в директории /var/log/airflow/ хоста сервера Airflow, но они также доступны в пользовательском интерфейсе Airflow.

Пути к логам

 
Полный путь к лог-файлам выглядит следующим образом:

/var/log/airflow/dag_id=<DAG_ID>/run_id=<DAG_Run_ID>/task_id=<Task_ID>/<log_number>.log

Здесь:

  • <DAG_ID> — идентификатор DAG.

  • <DAG_Run_ID> — идентификатор запуска DAG, который объединяет в себе тип запуска и временную метку. Например, run_id=scheduled__2024-07-14T14:18:33.254657+00:00.

  • <Task_ID> — идентификатор задачи.

  • <log_number> — номер лог-файла (отсчет начинается с 1).

Полный путь к логам менеджера процессов (process manager) — /var/log/airflow/dag_processor_manager/.

Полный путь к логам планировщика (scheduler) — /var/log/airflow/scheduler/.

Для просмотра логов Airflow на хосте:

  1. Подключитесь к серверу Airflow с помощью SSH и запустите следующую команду:

    $ ls -la /var/log/airflow/

    Вывод в консоль приведен ниже:

    total 12
    drwxr-xr-x.  5 airflow airflow   77 Jul 15 14:18 .
    drwxr-xr-x. 16 root    root    4096 Aug  5 08:44 ..
    drwxr-xr-x. 13 airflow airflow 4096 Jul 30 10:12 dag_id=adcm_check
    drwxr-xr-x.  2 airflow airflow  109 Jul 29 23:54 dag_processor_manager
    drwxr-xr-x. 20 airflow airflow 4096 Aug  5 07:56 scheduler
  2. Откройте содержимое нужного лог-файла:

    $ cat /var/log/airflow/dag_id=adcm_check/run_id=manual__2024-07-15T14:18:43.743847+00:00/task_id=runme_1/attempt=1.log

    Вывод в консоль приведен ниже:

    [2024-07-15T14:18:46.143+0000] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: adcm_check.runme_1 manual__2024-07-15T14:18:43.743847+00:00 [queued]>
    [2024-07-15T14:18:46.144+0000] {taskinstance.py:1308} INFO - Starting attempt 1 of 4
    [2024-07-15T14:18:46.157+0000] {taskinstance.py:1327} INFO - Executing <Task(BashOperator): runme_1> on 2024-07-15 14:18:43.743847+00:00
    [2024-07-15T14:18:46.164+0000] {standard_task_runner.py:57} INFO - Started process 7082 to run task
    [2024-07-15T14:18:46.168+0000] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'adcm_check', 'runme_1', 'manual__2024-07-15T14:18:43.743847+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/adcm_check.py', '--cfg-path', '/tmp/tmpr_3qshf4']
    [2024-07-15T14:18:46.171+0000] {standard_task_runner.py:85} INFO - Job 12: Subtask runme_1
    [2024-07-15T14:18:46.233+0000] {task_command.py:410} INFO - Running <TaskInstance: adcm_check.runme_1 manual__2024-07-15T14:18:43.743847+00:00 [running]> on host elenas-adh3.ru-central1.internal
    [2024-07-15T14:18:46.326+0000] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='adcm_check' AIRFLOW_CTX_TASK_ID='runme_1' AIRFLOW_CTX_EXECUTION_DATE='2024-07-15T14:18:43.743847+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-15T14:18:43.743847+00:00'

Поиск по логам

Чтобы найти в логах интересующую вас информацию, например, сообщения об ошибках, подключитесь к хосту с логами, которые вы хотите просмотреть, и воспользуйтесь командой grep.

Например:

$ cat /var/log/airflow/dag_processor_manager/dag_processor_manager.log | grep -i -A3 -B1 error | grep -i -A3 -B1 error

Эта команда парсит лог менеджера процессов и ищет сообщения, содержащие слово error. Опция -i позволяет игнорировать различия в регистре, а опции -A3 -B1 добавляют к найденной строке с ошибкой одну строчку до нее и три строчки после.

Пример вывода команды:

File Path                                                                                                                  PID    Runtime      # DAGs    # Errors  Last Runtime    Last Run
-------------------------------------------------------------------------------------------------------------------------  -----  ---------  --------  ----------  --------------  -------------------
/opt/airflow/lib/python3.10/site-packages/airflow/example_dags/example_setup_teardown.py                                                            0           0  0.03s           2024-08-05T10:38:25
/opt/airflow/lib/python3.10/site-packages/airflow/example_dags/example_setup_teardown_taskflow.py                                                   0           0  0.03s           2024-08-05T10:38:25
--
...
File Path                                                                                                                  PID    Runtime      # DAGs    # Errors  Last Runtime    Last Run
-------------------------------------------------------------------------------------------------------------------------  -----  ---------  --------  ----------  --------------  -------------------
/opt/airflow/lib/python3.10/site-packages/airflow/example_dags/example_setup_teardown.py                                                            0           0  0.03s           2024-08-05T10:38:55
/opt/airflow/lib/python3.10/site-packages/airflow/example_dags/example_setup_teardown_taskflow.py                                                   0           0  0.03s           2024-08-05T10:38:55
--

Уровни логирования

Airflow использует стандартный фреймворк ведения логов в Python и поддерживает следующие уровни логирования (от наименее к наиболее информативным):

  1. CRITICAL — уведомляет о серьезной ошибке, которая может остановить работу сервиса.

  2. FATAL — сообщает, что операция не может быть выполнена и будет завершена.

  3. ERROR — уведомляет, что программа работает неправильно или прекратила работу.

  4. WARN — предупреждает о потенциальных проблемах. Это означает, что программа работает не по стандартному сценарию и в будущем могут возникнуть проблемы.

  5. INFO — передает информацию о жизненном цикле или состоянии программы.

  6. DEBUG — выводит отладочную информацию о внутренних состояниях программы.

Включение одного уровня логирования включит этот уровень и все уровни выше него. Например, если установлен уровень логирования ERROR, то в логи попадут только ошибки, фатальные и критические сообщения, но не INFO, DEBUG и WARN.

Конфигурация логирования

Чтобы изменить параметры логирования через ADCM:

  1. На странице Clusters выберите нужный кластер.

  2. Перейдите на вкладку Services и нажмите на Airflow2.

  3. Выберите параметр и внесите необходимые изменения.

  4. Подтвердите изменения в конфигурации, нажав Save.

  5. В раскрывающемся меню Actions выберите Restart, убедитесь, что для параметра Apply configs from ADCM установлено значение true, и нажмите Run.

Параметры логирования Airflow:

  • Logging level — уровень логирования для сервиса Airflow.

  • Logging level for Flask-appbuilder UI — уровень логирования для Flask-appbuilder.

  • cfg_properties_template — файл конфигурации Airflow, содержащий также настройки логирования для задач.

Настройки логирования в файле конфигурации cfg_properties_template
[logging]
base_log_folder = /var/log/airflow
remote_logging = False
remote_log_conn_id =
google_key_path =

remote_base_log_folder =
encrypt_s3_logs = False
{% endraw %}
logging_level = {{ services.airflow2.config.airflow_cfg.logging_level }}
{% raw -%}
celery_logging_level =
{% endraw %}
fab_logging_level = {{ services.airflow2.config.airflow_cfg.fab_logging_level }}
{% raw -%}
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /var/log/airflow/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней