Work with TaskFlow
The TaskFlow API, added in Airflow 2, introduces a new approach for creating DAGs that enables direct function-to-function parameter passing and helps to make the DAGs code more concise and easier to read.
In previous Airflow versions, the most common way to write a DAG was using Python operators. When this approach is used, the tasks would be defined as follows:
def extract_people(path: str, **context) -> None:
extracted_people = pd.read_csv(path,sep=',',header=1)
context['ti'].xcom_push(key='extracted_people', value=extracted_people)
def load_people(path: str, **context) -> None:
extracted_people = context['ti'].xcom_pull(key='extracted_people', task_ids=['extract_people'])[0]
loaded_people = pd.DataFrame(extracted_people[0])
loaded_people.to_csv(path, index=None)
task_extract_people = PythonOperator(
task_id='extract_people',
python_callable=extract_people,
op_kwargs={'path': '/opt/airflow/input/people_ages_titles.csv'},
dag=dag,
provide_context=True
)
task_load_people = PythonOperator(
task_id='load_people',
python_callable=load_people,
op_kwargs={'path': '/opt/airflow/output/loaded_people.csv'},
dag=dag,
provide_context=True
)
task_extract_people >> task_load_people
This is a simplified version of the traditional DAG. The full example is available in the Create a simple DAG article.
The example above illustrates limitations of the traditional approach that could lead to bloated code:
-
It is necessary to use a context object to transfer the loaded data through the XCom data transport service to the next task.
-
You have to separetely define operators, functions, and input parameters that the tasks will use.
The TaskFlow API introduces specific decorators that handle the data transferring between tasks using the same XCom (but without the need of explicitly calling it) and eliminate the need to instantiate Python operators.
When you write a decorated function, you don’t need to specify task_id
or python_callable
. TaskFlow uses the name of the function as a task ID, but you can pass a task_id
to the decorator as a parameter.
Airflow supports decorators for tasks, groups of tasks, and DAGs. In the TaskFlow paradigm, the same example would look like this:
@task(task_id="extract_people", retries=2) (1)
def extract_people(path: str) -> pd.DataFrame: (2)
extracted_people = pd.read_csv(path, sep=',', header=1)
return extracted_people (3)
@task()
def transform_people(extracted_people: pd.DataFrame) -> list:
transformed_people = []
for person in extracted_people.values:
if int(person[1]) <= 18:
transformed_people.append({
'Name': person[0],
'Age': person[1]
})
return transformed_people
extracted_data = extract_people(SOURCE_PATH) (4)
transformed_data = transform_people(extracted_data)
1 | Define a task by using the @task decorator. If required, pass the necessary task-level parameters to the decorator. |
2 | Define the task’s function. |
3 | There’s no need to use xcom_pull and xcom_push since the function’s return value is automatically passed to other tasks via XCom when the dependencies are defined. |
4 | Define your task dependencies by calling the task functions. |
Use TaskFlow with traditional operators
While traditional operators offer more fine-grained control over data pipelines, TaskFlow makes cleaner transitions in workflows and more readable code.
In Airflow, you can use both traditional operators and the TaskFlow API.
Pass data between tasks
To transfer data from a decorated task to a traditional one, use the .output
attribute of the task object when calling the decorated function.
For example:
def traditional_function():
return 10
@task
def taskflow_function(x):
return x + 5
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
)
taskflow_function(sum_task.output) # the task will return 15
To transfer data from a traditional task to a decorated one, you can call the decorated function in the traditional operator.
For example:
@task
def taskflow_function():
return 10
def traditional_function(x):
return x + 5
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
op_args=[taskflow_function()]
)
traditional_function >> taskflow_function()
# sum_task will return 15
Additionally, you can use xcom_pull
and xcom_push
between traditional and decorated functions as well.
Since TaskFlow decorators use the same traditional XCom functionality to save a function’s return values, you can pull from a decorated task using the same xcom_pull
syntax.
For example:
@task
def taskflow_function():
return 10
def traditional_function(**kwargs):
ti = kwargs["ti"]
received_data = ti.xcom_pull(task_ids="taskflow_function")
print(received_data)
traditional_task = PythonOperator(
task_id="traditional_task",
python_callable=traditional_function,
)
traditional_function >> taskflow_function()
# sum_task will return 15
IMPORTANT
TaskFlow saves the return value of a function only if the return value is used elsewhere in the DAG.
|
Cross-functional dependencies
When using both types of functions in the same DAG, you have to define dependencies according to each function type: function-to-function parameter passing for TaskFlow and using bit-shift operators for traditional functions.
For example:
@task
def taskflow_function_10():
return 10
@task
def taskflow_function_20(x):
return x + 20
def traditional_function(x):
return x + 30
sum_task = PythonOperator(
task_id="sum_task",
python_callable=traditional_function,
)
taskflow_function_20(taskflow_function_10()) >> traditional_function
# sum_task will return 60
TaskFlow DAG structure
In Airflow 2, tasks are written as Python functions, decorated with @task
. When creating a DAG, the task functions must be located inside the DAG function, which is decorated with @dag
.
For example:
from datetime import timedelta, datetime
import pandas as pd
from airflow.decorators import dag, task (1)
SOURCE_PATH = '/opt/airflow/input/people_ages_titles.csv'
TARGET_PATH = '/opt/airflow/output/loaded_people.csv'
@dag( (2)
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 12, 17),
catchup=False,
tags=['example'],
)
def etl_example():
@task() (3)
def extract_people(path: str) -> pd.DataFrame:
extracted_people = pd.read_csv(path, sep=',', header=1)
return extracted_people
@task()
def transform_people(extracted_people: pd.DataFrame) -> list:
transformed_people = []
for person in extracted_people.values:
if int(person[1]) <= 18:
transformed_people.append({
'Name': person[0],
'Age': person[1]
})
return transformed_people
@task()
def load_people(transformed_people: list, path: str) -> None:
loaded_people = pd.DataFrame(transformed_people)
loaded_people.to_csv(path, index=None)
extracted_data = extract_people(SOURCE_PATH) (4)
transformed_data = transform_people(extracted_data)
load_people(transformed_data, TARGET_PATH)
etl_dag = etl_example() (5)
1 | Import the task and DAG decorators. |
2 | Instantiate a DAG using the @dag decorator and its Python function example_etl . By default, the name of the function acts as the DAG identifier. |
3 | Inside the DAG function, define your task functions. |
4 | Define the dependencies to ensure data transfer between the tasks. |
5 | Invoke the DAG function. All the decorated functions in your DAG must be called in that DAG’s file so that Airflow can register them. |