Use sensors in Airflow

In Airflow, a sensor is an operator that is used when, in a DAG, it is required to wait until a condition is met to continue with the tasks.

The conditions depend on the type of sensor. Airflow provides standard operators that, for example, allow a task to wait for a certain time to pass (TimeDeltaSensor), wait until a file appears in the system (FileSensor), or wait until another task in another DAG has finished (ExternalTaskSensor). When a sensor’s condition is met, it can trigger downstream tasks.

Sensor parameters

Sensors have the following base parameters:

  • poke_interval — the time that the job should wait in between each try. The default is 60 seconds.

  • timeout — the maximum number of seconds a sensor is allowed to run for. The default timeout is 7 days, or 60 * 60 * 24 * 7 seconds, after which the sensor’s task will be terminated. The value must be less than the schedule interval of your DAG to avoid hanging tasks.

  • mode — the indicator of how long a sensor task will occupy a worker slot. Can have one of two values:

    • poke — the sensor takes up a worker slot until it gets completed;

    • reschedule — the sensor takes up a worker slot only during checks and, in between each poke_interval, the scheduler releases the slot.

Work with sensors

Sensor operators are derived from the airflow.sensors.base class and inherit its attributes. Airflow provides several sensors, which are described in the table below, and some of the sensors can be available through different providers.

Standard Airflow sensors
Name Description

airflow.sensors.bash

Executes a bash command or a script that can be used as a custom sensor. The command should return 0 when it succeeds, any other value otherwise

airflow.sensors.date_time

Waits until the specified datetime. An advantage of this sensor is idempotence for the target_time. It can be applied to cases where TimeSensor and TimeDeltaSensor are not suited

airflow.sensors.external_task

Waits for a task in another DAG to complete before getting itself completed

airflow.sensors.filesystem

Waits for a file or folder to land in a filesystem. If the path given is a directory, then this sensor will return True if the directory is not empty

airflow.sensors.python

Executes an arbitrary callable and waits for it to finish. The @task.sensor decorator is recommended over the classic PythonSensor to execute Python callables to check for True condition

airflow.sensors.time_delta

Waits for a specified time period

airflow.sensors.time_sensor

Waits until the specified time of the day

airflow.sensors.weekday

Waits until the first specified day of the week. For example, if the execution day of the task is 2018-12-22 (Saturday) and you pass FRIDAY, the task will wait until next Friday

A more detailed description of each sensor is available in the Airflow documentation.

Here’s an example of a possible sensor usage in a DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync (1)

dag = DAG(
    dag_id="sensor_example",
    schedule=None,
    start_date=datetime(2023, 6, 4),
)

time_sensor_example = TimeDeltaSensor( (2)
   task_id='time_sensor_example',
   dag=dag,
   delta=timedelta(minutes=1), (3)
   poke_interval=10, (4)
   timeout=30, (5)
   soft_fail=True, (6)
)
1 Import the sensor operators. TimeDeltaSensorAsync is an async version of the TimeDeltaSensor operator.
2 Instantiate the sensor. If the sensor mode is not defined, it defaults to poke.
3 Specifies for how long to wait before the task can be marked as succeeded.
4 Defines how often a sensor checks the conditions (in seconds).
5 Defines for how long the sensor will be working.
6 Marks the sensor’s task as SKIPPED if it fails.

To avoid deadlocks and other performance issues, it is recommended to check the following conditions:

  • Set up a timeout for your sensor that will work for each particular case. The default value for this parameter might be too long in some cases.

  • Use the reschedule mode for long-running sensors so that they do not constantly occupy a working slot, and the poke mode for a short polling interval (i.e., less than 5 minutes) to avoid scheduler overload.

  • Make sure that the poke_interval parameter is applicable to your case. The default value might be unnecessarily frequent.

Write a custom sensor

To create a custom sensor, you have to inherit the BaseSensorOperator class in your Python function.

In this example, we will create a new sensor that checks the existence of a file.

  1. In the dags directory of your Airflow home, add a new Python file that will contain your custom sensor:

    $ sudo vi /opt/airflow/dags/custom_sensor.py
  2. Add the code for your sensor:

    import os
    from airflow.sensors.base import BaseSensorOperator (1)
    
    class MyCustomFileSensor(BaseSensorOperator): (2)
        def __init__(self, filepath, *args, **kwargs): (3)
            self.filepath = filepath
            super().__init__(*args, **kwargs)
    
        def poke(self, context): (4)
            print(f'checking if a file {self.filepath} exists...')
            return os.path.exists(self.filepath) (5)
    1 Import the base sensor class.
    2 Inherit the base class in your custom sensor.
    3 Add additional arguments and keep the arguments of the parent class.
    4 Override the poke method.
    5 Add the logic for checking the existence of the file.
  3. Call your custom sensor inside a DAG:

    from custom_sensor import MyCustomFileSensor (1)
    
    check_file = MyCustomFileSensor(    (2)
        task_id='check_file',
        filepath='/opt/airflow/test_file.txt',
        poke_interval=10,
        timeout=60,
        dag=dag
    )
    1 Import the custom sensor class.
    2 Call the custom sensor function with the required parameters.
  4. Go to the Airflow web UI to trigger the DAG and make sure that the custom sensor function is working.

    Airflow UI: DAG page
    Airflow UI: DAG page
    Airflow UI: DAG page
    Airflow UI: DAG page

    The DAG page shows that the check_file task with the MyCustomFileSensor class has finished successfully.

Found a mistake? Seleсt text and press Ctrl+Enter to report it