Создание простого DAG

Данный пример поясняет процесс создания и запуска вашего первого DAG с использованием файлов CSV. Этот DAG будет имитировать процесс ETL (Extract, Transform, Load) и включать следующие задачи:

  • Extract. Эта задача загружает записи из исходного файла CSV и передает их в следующую задачу. Мы будем использовать тестовый файл people_ages_titles.csv, содержащий список из 1000 человек с их именами и возрастом.

  • Transform. Эта задача выбирает записи, полученные от предыдущей задачи, по следующему критерию: возраст людей должен быть не старше 18 лет.

  • Load. Эта задача загружает обработанные записи, полученные от предыдущей задачи, в новый файл CSV.

ПРИМЕЧАНИЕ
  • На практике вы можете модифицировать этот пример путем замены входных файлов на соединения с входными базами данных.

  • Другие примеры можно найти в документации Airflow.

  • Описанный в данной статье пример актуален для версии Airflow 1.10.11.

Шаг 1. Подготовка DAG-файла

Подготовьте DAG с помощью следующих шагов:

  1. Скопируйте файл ETL_test.py в папку вашего DAG, которая обычно расположена в домашнем каталоге Airflow и называется dags. Выясним содержимое файла:

    import pandas as pd (1)
    from datetime import timedelta, datetime
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    
    dag = DAG('ETL_test', (2)
        schedule_interval=timedelta(days=1),
        start_date=datetime(2021, 12, 17, 0))
    
    def extract_people(path: str, **context) -> None: (3)
        extracted_people = pd.read_csv(path,sep=',',header=1)
        context['ti'].xcom_push(key='extracted_people', value=extracted_people)
    
    def transform_people(**context) -> None: (4)
        extracted_people = context['ti'].xcom_pull(key='extracted_people', task_ids=['extract_people'])[0]
        transformed_people = []
        for person in extracted_people.values:
            if int(person[1]) <= 18:
                transformed_people.append({
                    'Name': person[0],
                    'Age': person[1]
                })
        context['ti'].xcom_push(key='transformed_people', value=transformed_people)
    
    def load_people(path: str, **context) -> None: (5)
        transformed_people = context['ti'].xcom_pull(key='transformed_people', task_ids=['transform_people'])
        loaded_people = pd.DataFrame(transformed_people[0])
        loaded_people.to_csv(path, index=None)
    
    task_extract_people = PythonOperator( (6)
        task_id='extract_people',
        python_callable=extract_people,
        op_kwargs={'path': '/opt/airflow/input/people_ages_titles.csv'},
        dag=dag,
        provide_context=True
    )
    
    task_transform_people = PythonOperator( (6)
        task_id='transform_people',
        python_callable=transform_people,
        dag=dag,
        provide_context=True
    )
    
    task_load_people = PythonOperator( (6)
        task_id='load_people',
        python_callable=load_people,
        op_kwargs={'path': '/opt/airflow/output/loaded_people.csv'},
        dag=dag,
        provide_context=True
    )
    
    task_extract_people >> task_transform_people >> task_load_people (7)
    1 Импортирование. Импортируем пакет pandas для обработки файлов CSV.
    2 Определение DAG. Создаем объект ETL_test типа DAG и планируем его запуск, начиная с 2021-12-17 00:00:00.
    3 Первая функция Python. Ее использует первая задача extract_people. Эта функция считывает файл CSV по пути, заданному переменной path, с помощью модуля pandas. Она использует объект context для передачи загруженных данных через сервис транспортирования данных XCOM в следующую задачу transform_people.
    4 Вторая функция Python. Ее использует вторая задача transform_people. Эта функция получает данные, собранные первой задачей extract_people, и выделяет из них записи с определенными значениями колонки Age (меньшими или равными 18). Она копирует найденные данные в массив и делает этот массив доступным для следующей задачи load_people.
    5 Третья функция Python. Ее использует третья задача load_people. Эта функция считывает данные, сформированные второй задачей transform_people, и сохраняет их в новом файле по пути, заданному параметром path. Для этой цели она использует модуль pandas.
    6 Декларирование задач. Этот фрагмент определяет операторы, функции и входные параметры, необходимые для задач. Он также привязывает задачи к графу DAG, заданному ранее.
    7 Определение зависимостей. Этот фрагмент задает зависимости между созданными задачами.
    ВАЖНО

    Если вы используете AirFlow, установленный в контейнере Docker (как это поставляется в составе ADH), учитывайте различия в используемых путях. Например, используйте /opt/airflow/ вместо /srv/airflow/home/.

  2. Убедитесь, что файл расположен в каталоге DAG с помощью следующей команды:

    $ ls -la /srv/airflow/home/dags/

    Вывод на консоль приведен ниже:

    total 8
    drwxr-xr-x. 3 50000 50000   65 Dec 18 13:11 .
    drwxr-xr-x. 6 50000 50000  126 Dec 18 11:07 ..
    -rw-r--r--. 1 50000 50000  799 Dec 16 07:18 adcm_check.py
    -rw-r--r--  1 root  root  1771 Dec 18 13:10 ETL_test.py
    drwxr-xr-x. 2 50000 50000   74 Dec 18 13:10 __pycache__
  3. Создайте папки input и output в домашнем каталоге Airflow:

    $ sudo mkdir /srv/airflow/home/input
    $ sudo mkdir /srv/airflow/home/output

    Убедитесь, что эти папки созданы в домашнем каталоге Airflow:

    $ ls -la /srv/airflow/home/

    Вывод на консоль приведен ниже:

    total 48
    drwxr-xr-x. 6 50000 50000   126 Dec 18 13:58 .
    drwxr-xr-x. 4 root  root     35 Dec 16 07:17 ..
    -rw-r--r--. 1 root  root  37224 Dec 16 07:18 airflow.cfg
    -rw-r--r--  1 50000 50000     3 Dec 18 11:07 airflow-webserver.pid
    drwxr-xr-x. 3 50000 50000    65 Dec 18 13:11 dags
    drwxr-xr-x  2 root  root      6 Dec 18 13:57 input
    drwxr-xr-x. 7 50000 50000   121 Dec 17 15:55 logs
    drwxr-xr-x  2 root  root      6 Dec 18 13:58 output
    -rw-r--r--. 1 50000 50000  2528 Dec 16 07:19 unittests.cfg
  4. Загрузите исходный файл people_ages_titles.csv в локальную файловую систему сервера Airflow. Убедитесь, что файл загружен успешно, используя следующую команду:

    $ ls -la ~

    Вывод на консоль приведен ниже:

    total 64
    drwx------. 7 dasha dasha   196 Dec 17 15:44 .
    drwxr-xr-x. 3 root  root     19 Aug 31 11:55 ..
    drwx------. 3 dasha dasha    17 Aug 31 15:29 .ansible
    -rw-------. 1 dasha dasha 23583 Dec 17 18:17 .bash_history
    -rw-r--r--. 1 dasha dasha    18 Apr  1  2020 .bash_logout
    -rw-r--r--. 1 dasha dasha   193 Apr  1  2020 .bash_profile
    -rw-r--r--. 1 dasha dasha   231 Apr  1  2020 .bashrc
    drwx------. 3 dasha dasha    17 Dec  7 11:46 .cache
    -rw-rw-r--. 1 dasha dasha  7217 Nov 25 07:05 dasha
    -rw-rw-r--. 1 dasha dasha 17661 Dec 17 15:44 people_ages_titles.csv
    drwxrw----. 3 dasha dasha    19 Dec  6 14:06 .pki
    drwxrwxr-x. 2 dasha dasha    21 Nov 30 09:25 .sqlline
    drwx------. 2 dasha dasha    29 Aug 31 11:55 .ssh
  5. Скопируйте файл people_ages_titles.csv в ранее созданную папку input:

    $ sudo cp ~/people_ages_titles.csv /srv/airflow/home/input

    Убедитесь, что операция выполнилась успешно, используя следующую команду:

    $ ls -la /srv/airflow/home/input

    Вывод на консоль приведен ниже:

    total 20
    drwxr-xr-x  2 root  root     36 Dec 18 14:03 .
    drwxr-xr-x. 6 50000 50000   126 Dec 18 13:58 ..
    -rw-r--r--  1 root  root  17661 Dec 18 14:03 people_ages_titles.csv
  6. Измените владельца созданных папок input and output, включая их содержимое, на airflow. В противном случае возможно возникновение ошибок доступа к файлам в процессе исполнения DAG. Для назначения владельца используйте следующие команды:

    $ sudo chown -R 50000:50000 /srv/airflow/home/input
    $ sudo chown -R 50000:50000 /srv/airflow/home/output

    Для проверки результата воспользуйтесь следующей командой:

    $ ls -la /srv/airflow/home/

    Вывод на консоль приведен ниже:

    total 48
    drwxr-xr-x. 6 50000 50000   126 Dec 18 13:58 .
    drwxr-xr-x. 4 root  root     35 Dec 16 07:17 ..
    -rw-r--r--. 1 root  root  37224 Dec 16 07:18 airflow.cfg
    -rw-r--r--  1 50000 50000     3 Dec 18 11:07 airflow-webserver.pid
    drwxr-xr-x. 3 50000 50000    65 Dec 18 13:11 dags
    drwxr-xr-x  2 50000 50000    36 Dec 18 14:03 input
    drwxr-xr-x. 8 50000 50000   137 Dec 18 14:12 logs
    drwxr-xr-x  2 50000 50000     6 Dec 18 13:58 output
    -rw-r--r--. 1 50000 50000  2528 Dec 16 07:19 unittests.cfg

Шаг 2. Запуск DAG через Web-интерфейс

Для запуска DAG и проверки результатов воспользуйтесь следующими шагами:

  1. Откройте домашнюю страницу Web-интерфейса Airflow. Если все настроено правильно, вы увидите состояние графа во вкладке DAGs. В противном случае будут отображены ошибки.

    airflow dag 01 dark
    Отображение нового DAG
    airflow dag 01 light
    Отображение нового DAG
  2. Активизируйте DAG с помощью одного из способов:

    • Нажмите переключатель и ожидайте запуска согласно настроенному расписанию.

    • Откройте DAG и щелкните на иконке Trigger Dag для немедленного запуска DAG.

      airflow dag 02 dark
      Активирование DAG
      airflow dag 02 light
      Активирование DAG
  3. После запуска DAG вы можете увидеть информацию о его состоянии на домашней странице: количество успешных (successful), неудачных (failed), активных (running) и других запусков DAG, а также состояния экземпляров задач. Для просмотра подробной информации о запусках конкретного DAG (DAG runs), щелкните на названии DAG.

    airflow dag 03 dark
    DAG выполняется
    airflow dag 03 light
    DAG выполняется
  4. Просматривая подробную информацию, вы можете увидеть состояния всех запусков DAG и экземпляров задач. При необходимости измените режим просмотра на Graph View для анализа длительности выполнения задач или выполнения других действий, доступных на текущей странице.

    airflow dag 04 dark
    Подробный обзор запусков DAG
    airflow dag 04 light
    Подробный обзор запусков DAG

Если в процессе исполнения DAG появятся ошибки, страница будет выглядеть как представлено ниже.

airflow dag 05 dark
Страница с ошибками
airflow dag 05 light
Страница с ошибками

Шаг 3. Проверка результатов

В завершение, проверим исполнил ли DAG все задачи корректно. Убедитесь, что выходной файл loaded_people.csv появился в папке output:

  1. Проверьте содержимое папки output:

    $ ls -la /srv/airflow/home/output/

    Вывод на консоль приведен ниже:

    total 4
    drwxr-xr-x  2 50000 50000  31 Dec 18 14:25 .
    drwxr-xr-x. 6 50000 50000 126 Dec 18 13:58 ..
    -rw-r--r--  1 50000 50000 832 Dec 18 14:25 loaded_people.csv
  2. Получите выходные данные:

    $ cat /srv/airflow/home/output/loaded_people.csv

    Данные выглядят следующим образом:

    Name,Age
    Arnold Bettie,18
    Campbell Todd,18
    Thompson Warren,18
    Stokes Mittie,18
    Davis Alan,18
    Vaughn Belle,18
    Munoz Ricardo,18
    Norton Bertha,18
    McGee Isabelle,18
    Bailey Cameron,18
    James Loretta,18
    Myers Ricky,18
    Torres Elsie,18
    Phelps Rena,18
    Hill Katharine,18
    Hodges Abbie,18
    Fitzgerald Timothy,18
    Miller Francis,18
    Banks Catherine,18
    Casey Louise,18
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней