Generate DAGs dynamically in Airflow

Overview

Traditional DAGs are convenient to use for a single-source pipeline. For example, to extract data from a file, perform the necessary actions, and load the results.

For cases where you need to run the same data pipeline with different parameters on several data sources, it’s more useful to generate DAGs dynamically for each case.

There are two ways of creating DAGs for variable scenarios:

  • Dynamic DAGs generation

    In dynamically generated DAGs, the structure of the DAG changes based on static predefined values (configuration files, environment variables, file names, etc.).

  • Dynamic task mapping

    Dynamic task mapping enables generating parallel tasks at runtime based on the output of a previous task.

The examples in this article are written using the TaskFlow API for convenience. You can generate DAGs dynamically using traditional syntax as well.

Dynamic DAGs generation

Creating a dynamically generated DAG is similar to the process of creating a single DAG. Since Airflow executes all Python code in the dags folder, you can execute any Python code that generates DAG objects.

The DAGs are generated every min_file_process_interval seconds, which by default is every 30 seconds. You can change the value of this parameter by editing the cfg_properties_template field via ADCM.

To edit the min_file_process_interval property via ADCM:

  1. Go to the ADCM UI and select your ADH cluster.

  2. Navigate to Services → Airflow2 → Primary configuration and toggle Show advanced.

  3. Open the airflow.cfg section and click cfg_properties_template.

  4. Set the desired value for min_file_process_interval.

  5. Save the configuration by clicking Save → Create and restart the service by clicking Actions → Restart.

CAUTION

If the value of the min_file_process_interval parameter is too low, many generated DAGs might overload the scheduler or a separate dag-processor process.

To create a dynamic DAG:

  1. In the dags directory of your Airflow home, add a new Python file that will contain your DAG generator:

    $ sudo vi /opt/airflow/dags/dag_generator.py
  2. Add the code for the DAG generator:

    from airflow import DAG
    from airflow.decorators import task
    from datetime import datetime   (1)
    
    for file in ("dev_data.csv", "test_data.csv", "prod_data.csv"):  (2)
        dag_id = f"generated_dag_{file}"
    
        @dag(dag_id=dag_id, schedule="@daily",
        default_args=default_args, catchup=False,
        start_date=datetime(2025,1,1)) (3)
    
        def create_dag(filename):
    
            @task (4)
            def extract(filename):
                return filename
    
            @task
            def process(filename):
                return filename
    
            @task
            def load(filename):
                print(filename)
                return filename
    
            load(process(extract(filename))) (5)
    
        create_dag(filename)  (6)
    1 Import the dependencies.
    2 Create the loop for generating DAGs.
    3 Create the DAG function.
    4 Add tasks.
    5 Define your task dependencies by calling the task functions.
    6 Invoke the DAG function.

If you want to use variables to configure your code, it’s better to use environment variables rather than Airflow variables.

Using Airflow variables in top-level code creates a connection to the metadata database, which can slow down parsing and place extra load on the database.

You can also import variables from one of several external sources, such as configuration files.

For example, you have two JSON configuration files in the include/dag-configs/ directory of your Airflow home. The files have the following contents:

  • dev_config.json:

    {
        "dag_id": "development_dag",
        "schedule": "@daily",
        "input": "dev_data.csv"
    }
  • prod_config.json:

    {
        "dag_id": "production_dag",
        "schedule": "@daily",
        "input": "prod_data.csv"
    }

You can create a Python script that will generate separate DAG files for each configuration file using the DAG file you specified as a template.

To generate DAGs using a template:

  1. In the Airflow home directory, add a new Python file that will contain your DAG template:

    $ sudo vi /opt/airflow/include/templates/dag_template.py
  2. Add the code that the generator will use as a template for DAGs. For example, you can use the code from the previous example and substitute the parameters you want to change with placeholders:

    from airflow import DAG
    from airflow.decorators import task
    from datetime import datetime
    
    @dag(dag_id = "dag_id_placeholder", schedule="schedule_placeholder",
    catchup=False, start_date=datetime(2025,1,1)) (1)
    
    def create_dag(filename):
    
        @task
        def extract(filename):
            return filename
    
        @task
        def process(filename):
            return filename
    
        @task
        def load(filename):
            print(filename)
            return filename
    
        load(process(extract("input_placeholder"))) (2)
    
    create_dag(filename)
    1 Add placeholders for the dag_id and schedule_interval parameters.
    2 Add a placeholder for the input.
  3. In the Airflow home directory, add a Python file that will generate DAGs using the template:

    $ sudo vi /opt/airflow/include/scripts/dag_generator.py
  4. Write the code for the DAG generator:

    import json
    import os
    import shutil
    import fileinput
    
    config_filepath = "include/dag-configs/" (1)
    template_filepath = "include/templates/dag_template.py" (2)
    
    for filename in os.listdir(config_filepath): (3)
        if filename.endswith('.json'):
            config = json.load(open(f"include/data/{filename}"))
            generated_dag = f"dags/generated_dag_{config['dag_id']}.py"
            shutil.copyfile(template_filepath, generated_dag)
            for line in fileinput.input(generated_dag, inplace=True):
                line = line.replace("dag_id_placeholder", config['dag_id'])
                line = line.replace("schedule_placeholder", config['schedule'])
                line = line.replace("input_placeholder", config['input'])
                print(line, end="")
    1 Specify the path to the config files.
    2 Specify the path to the template file.
    3 Describe the logic of substituting parameter placeholders with correct values from the configs.

When the script is executed, it will check the dag-configs directory for available JSON configuration files and generate a DAG file for each configuration file based on the specified template. In the template, it will match the placeholders with the fields from the configuration files and will substitute the values.

Once the DAG files have been generated, Airflow will parse and load them as regular DAGs.

Dynamic task mapping

The above examples describe how to generate DAGs with different parameters, but the number of tasks for generated DAGs stays the same.

You can create tasks based on the unexpected output of the other task using dynamic task mapping.

This method employs the MapReduce framework and works using the expand() method, which creates tasks at runtime based on the output by multiplying the caller task using the provided arguments.

The expand() method is used with an operator and accepts a list or a dictionary of values as input. These values can also be an output of the other task in the XComArg format.

The example of dynamic task mapping:

from airflow.decorators import dag, task
from datetime import datetime

@dag(dag_id = "dynamic_task_dag", schedule="@daily", catchup=False, start_date=datetime(2025,1,1))
def create_tasks():

    @task
    def download_files(file: str): (1)
        print(file)

    files = download_files.expand(file=["file_a", "file_b", "file_c"]) (2)

    download_files(file)

create_tasks()
1 Create a task function. For example, a function that prints a file.
2 Call expand() and pass the list of files as an argument. The DAG will generate three tasks for each file.

The example of dynamic task mapping when the output of the previous task is unexpected:

from airflow.decorators import dag, task
from datetime import datetime
import random

@dag(dag_id = "dynamic_task_dag", schedule="@daily",
catchup=False, start_date=datetime(2025,1,1))

def create_tasks():

    @task
    def get_files():
        return ["file_{i}" for i in range(random.randint(3,5))] (1)

    @task
    def download_files(file: str):
        print(file)

    files = download_files.expand(file=get_files()) (2)

create_tasks()
1 Create a function that has a random output.
2 Call expand() and pass the output of the random function as an argument. The DAG will generate a task for each file.
Found a mistake? Seleсt text and press Ctrl+Enter to report it