Создание простого DAG
Данный пример поясняет процесс создания и запуска вашего первого DAG с использованием файлов CSV. Этот DAG будет имитировать процесс ETL (Extract, Transform, Load) и включать следующие задачи:
-
Extract. Эта задача загружает записи из исходного файла CSV и передает их в следующую задачу. Мы будем использовать тестовый файл people_ages_titles.csv, содержащий список из 1000 человек с их именами и возрастом.
-
Transform. Эта задача выбирает записи, полученные от предыдущей задачи, по следующему критерию: возраст людей должен быть не старше 18 лет.
-
Load. Эта задача загружает обработанные записи, полученные от предыдущей задачи, в новый файл CSV.
ПРИМЕЧАНИЕ
|
Шаг 1. Подготовка DAG-файла
Подготовьте DAG с помощью следующих шагов:
-
Скопируйте файл ETL_test.py в папку вашего DAG, которая обычно расположена в домашнем каталоге Airflow и называется dags. Выясним содержимое файла:
import pandas as pd (1) from datetime import timedelta, datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator dag = DAG('ETL_test', (2) schedule_interval=timedelta(days=1), start_date=datetime(2021, 12, 17, 0)) def extract_people(path: str, **context) -> None: (3) extracted_people = pd.read_csv(path,sep=',',header=1) context['ti'].xcom_push(key='extracted_people', value=extracted_people) def transform_people(**context) -> None: (4) extracted_people = context['ti'].xcom_pull(key='extracted_people', task_ids=['extract_people'])[0] transformed_people = [] for person in extracted_people.values: if int(person[1]) <= 18: transformed_people.append({ 'Name': person[0], 'Age': person[1] }) context['ti'].xcom_push(key='transformed_people', value=transformed_people) def load_people(path: str, **context) -> None: (5) transformed_people = context['ti'].xcom_pull(key='transformed_people', task_ids=['transform_people']) loaded_people = pd.DataFrame(transformed_people[0]) loaded_people.to_csv(path, index=None) task_extract_people = PythonOperator( (6) task_id='extract_people', python_callable=extract_people, op_kwargs={'path': '/opt/airflow/input/people_ages_titles.csv'}, dag=dag, provide_context=True ) task_transform_people = PythonOperator( (6) task_id='transform_people', python_callable=transform_people, dag=dag, provide_context=True ) task_load_people = PythonOperator( (6) task_id='load_people', python_callable=load_people, op_kwargs={'path': '/opt/airflow/output/loaded_people.csv'}, dag=dag, provide_context=True ) task_extract_people >> task_transform_people >> task_load_people (7)
1 Импортирование. Импортируем пакет pandas для обработки файлов CSV. 2 Определение DAG. Создаем объект ETL_test
типа DAG и планируем его запуск, начиная с 2021-12-17 00:00:00.3 Первая функция Python. Ее использует первая задача extract_people
. Эта функция считывает файл CSV по пути, заданному переменнойpath
, с помощью модуляpandas
. Она использует объектcontext
для передачи загруженных данных через сервис транспортирования данных XCOM в следующую задачуtransform_people
.4 Вторая функция Python. Ее использует вторая задача transform_people
. Эта функция получает данные, собранные первой задачейextract_people
, и выделяет из них записи с определенными значениями колонкиAge
(меньшими или равными 18). Она копирует найденные данные в массив и делает этот массив доступным для следующей задачиload_people
.5 Третья функция Python. Ее использует третья задача load_people
. Эта функция считывает данные, сформированные второй задачейtransform_people
, и сохраняет их в новом файле по пути, заданному параметромpath
. Для этой цели она использует модульpandas
.6 Декларирование задач. Этот фрагмент определяет операторы, функции и входные параметры, необходимые для задач. Он также привязывает задачи к графу DAG, заданному ранее. 7 Определение зависимостей. Этот фрагмент задает зависимости между созданными задачами. ВАЖНОЕсли вы используете AirFlow, установленный в контейнере Docker (как это поставляется в составе ADH), учитывайте различия в используемых путях. Например, используйте /opt/airflow/ вместо /srv/airflow/home/.
-
Убедитесь, что файл расположен в каталоге DAG с помощью следующей команды:
$ ls -la /srv/airflow/home/dags/
Вывод на консоль приведен ниже:
total 8 drwxr-xr-x. 3 50000 50000 65 Dec 18 13:11 . drwxr-xr-x. 6 50000 50000 126 Dec 18 11:07 .. -rw-r--r--. 1 50000 50000 799 Dec 16 07:18 adcm_check.py -rw-r--r-- 1 root root 1771 Dec 18 13:10 ETL_test.py drwxr-xr-x. 2 50000 50000 74 Dec 18 13:10 __pycache__
-
Создайте папки input и output в домашнем каталоге Airflow:
$ sudo mkdir /srv/airflow/home/input $ sudo mkdir /srv/airflow/home/output
Убедитесь, что эти папки созданы в домашнем каталоге Airflow:
$ ls -la /srv/airflow/home/
Вывод на консоль приведен ниже:
total 48 drwxr-xr-x. 6 50000 50000 126 Dec 18 13:58 . drwxr-xr-x. 4 root root 35 Dec 16 07:17 .. -rw-r--r--. 1 root root 37224 Dec 16 07:18 airflow.cfg -rw-r--r-- 1 50000 50000 3 Dec 18 11:07 airflow-webserver.pid drwxr-xr-x. 3 50000 50000 65 Dec 18 13:11 dags drwxr-xr-x 2 root root 6 Dec 18 13:57 input drwxr-xr-x. 7 50000 50000 121 Dec 17 15:55 logs drwxr-xr-x 2 root root 6 Dec 18 13:58 output -rw-r--r--. 1 50000 50000 2528 Dec 16 07:19 unittests.cfg
-
Загрузите исходный файл people_ages_titles.csv в локальную файловую систему сервера Airflow. Убедитесь, что файл загружен успешно, используя следующую команду:
$ ls -la ~
Вывод на консоль приведен ниже:
total 64 drwx------. 7 dasha dasha 196 Dec 17 15:44 . drwxr-xr-x. 3 root root 19 Aug 31 11:55 .. drwx------. 3 dasha dasha 17 Aug 31 15:29 .ansible -rw-------. 1 dasha dasha 23583 Dec 17 18:17 .bash_history -rw-r--r--. 1 dasha dasha 18 Apr 1 2020 .bash_logout -rw-r--r--. 1 dasha dasha 193 Apr 1 2020 .bash_profile -rw-r--r--. 1 dasha dasha 231 Apr 1 2020 .bashrc drwx------. 3 dasha dasha 17 Dec 7 11:46 .cache -rw-rw-r--. 1 dasha dasha 7217 Nov 25 07:05 dasha -rw-rw-r--. 1 dasha dasha 17661 Dec 17 15:44 people_ages_titles.csv drwxrw----. 3 dasha dasha 19 Dec 6 14:06 .pki drwxrwxr-x. 2 dasha dasha 21 Nov 30 09:25 .sqlline drwx------. 2 dasha dasha 29 Aug 31 11:55 .ssh
-
Скопируйте файл people_ages_titles.csv в ранее созданную папку input:
$ sudo cp ~/people_ages_titles.csv /srv/airflow/home/input
Убедитесь, что операция выполнилась успешно, используя следующую команду:
$ ls -la /srv/airflow/home/input
Вывод на консоль приведен ниже:
total 20 drwxr-xr-x 2 root root 36 Dec 18 14:03 . drwxr-xr-x. 6 50000 50000 126 Dec 18 13:58 .. -rw-r--r-- 1 root root 17661 Dec 18 14:03 people_ages_titles.csv
-
Измените владельца созданных папок input and output, включая их содержимое, на
airflow
. В противном случае возможно возникновение ошибок доступа к файлам в процессе исполнения DAG. Для назначения владельца используйте следующие команды:$ sudo chown -R 50000:50000 /srv/airflow/home/input $ sudo chown -R 50000:50000 /srv/airflow/home/output
Для проверки результата воспользуйтесь следующей командой:
$ ls -la /srv/airflow/home/
Вывод на консоль приведен ниже:
total 48 drwxr-xr-x. 6 50000 50000 126 Dec 18 13:58 . drwxr-xr-x. 4 root root 35 Dec 16 07:17 .. -rw-r--r--. 1 root root 37224 Dec 16 07:18 airflow.cfg -rw-r--r-- 1 50000 50000 3 Dec 18 11:07 airflow-webserver.pid drwxr-xr-x. 3 50000 50000 65 Dec 18 13:11 dags drwxr-xr-x 2 50000 50000 36 Dec 18 14:03 input drwxr-xr-x. 8 50000 50000 137 Dec 18 14:12 logs drwxr-xr-x 2 50000 50000 6 Dec 18 13:58 output -rw-r--r--. 1 50000 50000 2528 Dec 16 07:19 unittests.cfg
Шаг 2. Запуск DAG через Web-интерфейс
Для запуска DAG и проверки результатов воспользуйтесь следующими шагами:
-
Откройте домашнюю страницу Web-интерфейса Airflow. Если все настроено правильно, вы увидите состояние графа во вкладке DAGs. В противном случае будут отображены ошибки.
Отображение нового DAGОтображение нового DAG -
Активизируйте DAG с помощью одного из способов:
-
Нажмите переключатель и ожидайте запуска согласно настроенному расписанию.
-
Откройте DAG и щелкните на иконке Trigger Dag для немедленного запуска DAG.
Активирование DAGАктивирование DAG
-
-
После запуска DAG вы можете увидеть информацию о его состоянии на домашней странице: количество успешных (successful), неудачных (failed), активных (running) и других запусков DAG, а также состояния экземпляров задач. Для просмотра подробной информации о запусках конкретного DAG (DAG runs), щелкните на названии DAG.
DAG выполняетсяDAG выполняется -
Просматривая подробную информацию, вы можете увидеть состояния всех запусков DAG и экземпляров задач. При необходимости измените режим просмотра на Graph View для анализа длительности выполнения задач или выполнения других действий, доступных на текущей странице.
Подробный обзор запусков DAGПодробный обзор запусков DAG
Если в процессе исполнения DAG появятся ошибки, страница будет выглядеть как представлено ниже.


Шаг 3. Проверка результатов
В завершение, проверим исполнил ли DAG все задачи корректно. Убедитесь, что выходной файл loaded_people.csv появился в папке output:
-
Проверьте содержимое папки output:
$ ls -la /srv/airflow/home/output/
Вывод на консоль приведен ниже:
total 4 drwxr-xr-x 2 50000 50000 31 Dec 18 14:25 . drwxr-xr-x. 6 50000 50000 126 Dec 18 13:58 .. -rw-r--r-- 1 50000 50000 832 Dec 18 14:25 loaded_people.csv
-
Получите выходные данные:
$ cat /srv/airflow/home/output/loaded_people.csv
Данные выглядят следующим образом:
Name,Age Arnold Bettie,18 Campbell Todd,18 Thompson Warren,18 Stokes Mittie,18 Davis Alan,18 Vaughn Belle,18 Munoz Ricardo,18 Norton Bertha,18 McGee Isabelle,18 Bailey Cameron,18 James Loretta,18 Myers Ricky,18 Torres Elsie,18 Phelps Rena,18 Hill Katharine,18 Hodges Abbie,18 Fitzgerald Timothy,18 Miller Francis,18 Banks Catherine,18 Casey Louise,18