Работа с TaskFlow

TaskFlow API, добавленный в Airflow 2, представляет новый подход к созданию DAG, который позволяет передавать параметры от функции к функции напрямую и помогает сделать код DAG более лаконичным и удобным для чтения.

В предыдущих версиях Airflow наиболее распространенным способом написания DAG было использование операторов Python (PythonOperator). В традиционном подходе к созданию DAG задачи описывались следующим образом:

def extract_people(path: str, **context) -> None:
    extracted_people = pd.read_csv(path,sep=',',header=1)
    context['ti'].xcom_push(key='extracted_people', value=extracted_people)

def load_people(path: str, **context) -> None:
    extracted_people = context['ti'].xcom_pull(key='extracted_people', task_ids=['extract_people'])[0]
    loaded_people = pd.DataFrame(extracted_people[0])
    loaded_people.to_csv(path, index=None)

task_extract_people = PythonOperator(
    task_id='extract_people',
    python_callable=extract_people,
    op_kwargs={'path': '/opt/airflow/input/people_ages_titles.csv'},
    dag=dag,
    provide_context=True
)

task_load_people = PythonOperator(
    task_id='load_people',
    python_callable=load_people,
    op_kwargs={'path': '/opt/airflow/output/loaded_people.csv'},
    dag=dag,
    provide_context=True
)

task_extract_people >> task_load_people

Это упрощенная версия традиционного DAG. Полный пример доступен в статье Создание простого DAG.

Пример выше иллюстрирует ограничения традиционного подхода, которые могут привести к раздутому коду:

  • Необходимость использования объекта контекста для передачи данных в следующую задачу через сервис XCom.

  • Необходимость отдельно определять операторы, функции и входные параметры, которые будут использованы в задачах.

В TaskFlow API есть специальные декораторы, которые убирают необходимость использовать традиционные операторы и обеспечивают передачу данных между задачами с помощью функций того же XCom, но без необходимости их явного вызова.

При создании декорированной функции не нужно указывать task_id или python_callable. TaskFlow использует имя функции в качестве идентификатора задачи, но вы можете также передать task_id в декораторе, чтобы задать другое имя.

Airflow поддерживает декораторы для задач, групп задач и DAG. В парадигме TaskFlow тот же пример будет выглядеть следующим образом:

@task(task_id="extract_people", retries=2) (1)
def extract_people(path: str) -> pd.DataFrame: (2)
    extracted_people = pd.read_csv(path, sep=',', header=1)
    return extracted_people (3)

@task()
def transform_people(extracted_people: pd.DataFrame) -> list:
    transformed_people = []
    for person in extracted_people.values:
        if int(person[1]) <= 18:
            transformed_people.append({
                'Name': person[0],
                'Age': person[1]
            })
    return transformed_people

extracted_data = extract_people(SOURCE_PATH) (4)
transformed_data = transform_people(extracted_data)
1 Определите задачу с помощью декоратора @task. При необходимости передайте нужные параметры задачи в декораторе.
2 Определите функцию задачи.
3 Здесь больше нет необходимости использовать xcom_pull и xcom_push, поскольку возвращаемое функцией значение автоматически передается другим задачам через XCom, когда определяются зависимости.
4 Определите зависимости задач, вызвав функции задач.

Использование TaskFlow с традиционными операторами

В то время, как традиционные операторы предлагают более детальный контроль над конвейерами данных, TaskFlow обеспечивает более четкие переходы в рабочих процессах и более читаемый код.

В Airflow вы можете использовать как традиционные операторы, так и API TaskFlow.

Передача данных между задачами

Чтобы перенести данные из декорированной задачи в традиционную, используйте атрибут .output объекта задачи при вызове декорированной функции.

Например:

def traditional_function():
    return 10

@task
def taskflow_function(x):
    return x + 5

sum_task = PythonOperator(
    task_id="sum_task",
    python_callable=traditional_function,
)

taskflow_function(sum_task.output)  # the task will return 15

Чтобы перенести данные из традиционной задачи в декорированную, укажите вызванную декорированную задачу непосредственно в параметре традиционного оператора.

Например:

@task
def taskflow_function():
    return 10

def traditional_function(x):
    return x + 5

sum_task = PythonOperator(
    task_id="sum_task",
    python_callable=traditional_function,
    op_args=[taskflow_function()]
)

traditional_function >> taskflow_function()

# sum_task will return 15

Кроме того, существует возможность передавать данные с помощью xcom_pull и xcom_push между традиционными и декорированными функциями.

Поскольку декораторы TaskFlow используют ту же традиционную функциональность XCom для сохранения возвращаемых значений функции, можно извлечь данные из декорированной задачи, используя тот же синтаксис xcom_pull.

Например:

@task
def taskflow_function():
    return 10

def traditional_function(**kwargs):
    ti = kwargs["ti"]
    received_data = ti.xcom_pull(task_ids="taskflow_function")
    print(received_data)

traditional_task = PythonOperator(
    task_id="traditional_task",
    python_callable=traditional_function,
)

traditional_function >> taskflow_function()

# sum_task will return 15
ВАЖНО
TaskFlow сохраняет возвращаемое значение функции только в том случае, если оно используется где-либо еще в DAG.

Кросс-функциональные зависимости

При использовании двух типов функций в одном DAG необходимо определить зависимости в соответствии с каждым типом функции: передача параметров от функции к функции — для TaskFlow, и использование операторов битового сдвига — для традиционных функций.

Например:

@task
def taskflow_function_10():
    return 10

@task
def taskflow_function_20(x):
    return x + 20

def traditional_function(x):
    return x + 30

sum_task = PythonOperator(
    task_id="sum_task",
    python_callable=traditional_function,
)

taskflow_function_20(taskflow_function_10()) >> traditional_function

# sum_task will return 60

Структура DAG в TaskFlow

Задачи в Airflow записываются как функции Python, декорированные @task. При этом сами задачи должны находиться внутри функции DAG, декорированной @dag.

Например:

from datetime import timedelta, datetime
import pandas as pd
from airflow.decorators import dag, task (1)


SOURCE_PATH = '/opt/airflow/input/people_ages_titles.csv'
TARGET_PATH = '/opt/airflow/output/loaded_people.csv'

@dag(                                       (2)
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 12, 17),
    catchup=False,
    tags=['example'],
)
def etl_example():

    @task()                                 (3)
    def extract_people(path: str) -> pd.DataFrame:
        extracted_people = pd.read_csv(path, sep=',', header=1)
        return extracted_people

    @task()
    def transform_people(extracted_people: pd.DataFrame) -> list:
        transformed_people = []
        for person in extracted_people.values:
            if int(person[1]) <= 18:
                transformed_people.append({
                    'Name': person[0],
                    'Age': person[1]
                })
        return transformed_people

    @task()
    def load_people(transformed_people: list, path: str) -> None:
        loaded_people = pd.DataFrame(transformed_people)
        loaded_people.to_csv(path, index=None)

    extracted_data = extract_people(SOURCE_PATH)    (4)
    transformed_data = transform_people(extracted_data)
    load_people(transformed_data, TARGET_PATH)

etl_dag = etl_example()    (5)
1 Импортируйте декораторы задач (task) и DAG.
2 Создайте экземпляр DAG с помощью декоратора @dag и его функцию. По умолчанию имя функции используется в качестве DAG ID.
3 Внутри функции DAG определите функции задач.
4 Определите зависимости, чтобы обеспечить передачу данных между задачами.
5 Вызовите функцию DAG. Все декорированные функции в DAG должны быть вызваны в файле этого DAG, чтобы Airflow мог их зарегистрировать.
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней