Работа с TaskFlow
TaskFlow API, добавленный в Airflow 2, представляет новый подход к созданию DAG, который позволяет передавать параметры от функции к функции напрямую и помогает сделать код DAG более лаконичным и удобным для чтения.
В предыдущих версиях Airflow наиболее распространенным способом написания DAG было использование операторов Python (PythonOperator). В традиционном подходе к созданию DAG задачи описывались следующим образом:
def extract_people(path: str, **context) -> None:
extracted_people = pd.read_csv(path,sep=',',header=1)
context['ti'].xcom_push(key='extracted_people', value=extracted_people)
def load_people(path: str, **context) -> None:
extracted_people = context['ti'].xcom_pull(key='extracted_people', task_ids=['extract_people'])[0]
loaded_people = pd.DataFrame(extracted_people[0])
loaded_people.to_csv(path, index=None)
task_extract_people = PythonOperator(
task_id='extract_people',
python_callable=extract_people,
op_kwargs={'path': '/opt/airflow/input/people_ages_titles.csv'},
dag=dag,
provide_context=True
)
task_load_people = PythonOperator(
task_id='load_people',
python_callable=load_people,
op_kwargs={'path': '/opt/airflow/output/loaded_people.csv'},
dag=dag,
provide_context=True
)
task_extract_people >> task_load_people
Это упрощенная версия традиционного DAG. Полный пример доступен в статье Создание простого DAG.
Пример выше иллюстрирует ограничения традиционного подхода, которые могут привести к раздутому коду:
-
Необходимость использования объекта контекста для передачи данных в следующую задачу через сервис XCom.
-
Необходимость отдельно определять операторы, функции и входные параметры, которые будут использованы в задачах.
В TaskFlow API есть специальные декораторы, которые убирают необходимость использовать традиционные операторы и обеспечивают передачу данных между задачами с помощью функций того же XCom, но без необходимости их явного вызова.
При создании декорированной функции не нужно указывать task_id
или python_callable
. TaskFlow использует имя функции в качестве идентификатора задачи, но вы можете также передать task_id
в декораторе, чтобы задать другое имя.
Airflow поддерживает декораторы для задач, групп задач и DAG. В парадигме TaskFlow тот же пример будет выглядеть следующим образом:
@task(task_id="extract_people", retries=2) (1)
def extract_people(path: str) -> pd.DataFrame: (2)
extracted_people = pd.read_csv(path, sep=',', header=1)
return extracted_people (3)
@task()
def transform_people(extracted_people: pd.DataFrame) -> list:
transformed_people = []
for person in extracted_people.values:
if int(person[1]) <= 18:
transformed_people.append({
'Name': person[0],
'Age': person[1]
})
return transformed_people
extracted_data = extract_people(SOURCE_PATH) (4)
transformed_data = transform_people(extracted_data)
1 | Определите задачу с помощью декоратора @task . При необходимости передайте нужные параметры задачи в декораторе. |
2 | Определите функцию задачи. |
3 | Здесь больше нет необходимости использовать xcom_pull и xcom_push , поскольку возвращаемое функцией значение автоматически передается другим задачам через XCom, когда определяются зависимости. |
4 | Определите зависимости задач, вызвав функции задач. |
Использование TaskFlow с традиционными операторами
В то время, как традиционные операторы предлагают более детальный контроль над конвейерами данных, TaskFlow обеспечивает более четкие переходы в рабочих процессах и более читаемый код.
В Airflow вы можете использовать как традиционные операторы, так и API TaskFlow.
Передача данных между задачами
Чтобы перенести данные из декорированной задачи в традиционную, используйте атрибут .output
объекта задачи при вызове декорированной функции.
Например:
def traditional_function():
return 10
@task
def taskflow_function(x):
return x + 5
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
)
taskflow_function(sum_task.output) # the task will return 15
Чтобы перенести данные из традиционной задачи в декорированную, укажите вызванную декорированную задачу непосредственно в параметре традиционного оператора.
Например:
@task
def taskflow_function():
return 10
def traditional_function(x):
return x + 5
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
op_args=[taskflow_function()]
)
traditional_function >> taskflow_function()
# sum_task will return 15
Кроме того, существует возможность передавать данные с помощью xcom_pull
и xcom_push
между традиционными и декорированными функциями.
Поскольку декораторы TaskFlow используют ту же традиционную функциональность XCom для сохранения возвращаемых значений функции, можно извлечь данные из декорированной задачи, используя тот же синтаксис xcom_pull
.
Например:
@task
def taskflow_function():
return 10
def traditional_function(**kwargs):
ti = kwargs["ti"]
received_data = ti.xcom_pull(task_ids="taskflow_function")
print(received_data)
traditional_task = PythonOperator(
task_id="traditional_task",
python_callable=traditional_function,
)
traditional_function >> taskflow_function()
# sum_task will return 15
ВАЖНО
TaskFlow сохраняет возвращаемое значение функции только в том случае, если оно используется где-либо еще в DAG.
|
Кросс-функциональные зависимости
При использовании двух типов функций в одном DAG необходимо определить зависимости в соответствии с каждым типом функции: передача параметров от функции к функции — для TaskFlow, и использование операторов битового сдвига — для традиционных функций.
Например:
@task
def taskflow_function_10():
return 10
@task
def taskflow_function_20(x):
return x + 20
def traditional_function(x):
return x + 30
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
)
taskflow_function_20(taskflow_function_10()) >> traditional_function
# sum_task will return 60
Структура DAG в TaskFlow
Задачи в Airflow записываются как функции Python, декорированные @task
. При этом сами задачи должны находиться внутри функции DAG, декорированной @dag
.
Например:
from datetime import timedelta, datetime
import pandas as pd
from airflow.decorators import dag, task (1)
SOURCE_PATH = '/opt/airflow/input/people_ages_titles.csv'
TARGET_PATH = '/opt/airflow/output/loaded_people.csv'
@dag( (2)
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 12, 17),
catchup=False,
tags=['example'],
)
def etl_example():
@task() (3)
def extract_people(path: str) -> pd.DataFrame:
extracted_people = pd.read_csv(path, sep=',', header=1)
return extracted_people
@task()
def transform_people(extracted_people: pd.DataFrame) -> list:
transformed_people = []
for person in extracted_people.values:
if int(person[1]) <= 18:
transformed_people.append({
'Name': person[0],
'Age': person[1]
})
return transformed_people
@task()
def load_people(transformed_people: list, path: str) -> None:
loaded_people = pd.DataFrame(transformed_people)
loaded_people.to_csv(path, index=None)
extracted_data = extract_people(SOURCE_PATH) (4)
transformed_data = transform_people(extracted_data)
load_people(transformed_data, TARGET_PATH)
etl_dag = etl_example() (5)
1 | Импортируйте декораторы задач (task) и DAG. |
2 | Создайте экземпляр DAG с помощью декоратора @dag и его функцию. По умолчанию имя функции используется в качестве DAG ID. |
3 | Внутри функции DAG определите функции задач. |
4 | Определите зависимости, чтобы обеспечить передачу данных между задачами. |
5 | Вызовите функцию DAG. Все декорированные функции в DAG должны быть вызваны в файле этого DAG, чтобы Airflow мог их зарегистрировать. |