Use sensors in Airflow
In Airflow, a sensor is an operator that is used when, in a DAG, it is required to wait until a condition is met to continue with the tasks.
The conditions depend on the type of sensor. Airflow provides standard operators that, for example, allow a task to wait for a certain time to pass (TimeDeltaSensor), wait until a file appears in the system (FileSensor), or wait until another task in another DAG has finished (ExternalTaskSensor). When a sensor’s condition is met, it can trigger downstream tasks.
Sensor parameters
Sensors have the following base parameters:
-
poke_interval
— the time that the job should wait in between each try. The default is 60 seconds. -
timeout
— the maximum number of seconds a sensor is allowed to run for. The default timeout is 7 days, or 60 * 60 * 24 * 7 seconds, after which the sensor’s task will be terminated. The value must be less than the schedule interval of your DAG to avoid hanging tasks. -
mode
— the indicator of how long a sensor task will occupy a worker slot. Can have one of two values:-
poke
— the sensor takes up a worker slot until it gets completed; -
reschedule
— the sensor takes up a worker slot only during checks and, in between eachpoke_interval
, the scheduler releases the slot.
-
Work with sensors
Sensor operators are derived from the airflow.sensors.base
class and inherit its attributes. Airflow provides several sensors, which are described in the table below, and some of the sensors can be available through different providers.
Name | Description |
---|---|
airflow.sensors.bash |
Executes a bash command or a script that can be used as a custom sensor. The command should return |
airflow.sensors.date_time |
Waits until the specified datetime. An advantage of this sensor is idempotence for the |
airflow.sensors.external_task |
Waits for a task in another DAG to complete before getting itself completed |
airflow.sensors.filesystem |
Waits for a file or folder to land in a filesystem. If the path given is a directory, then this sensor will return |
airflow.sensors.python |
Executes an arbitrary callable and waits for it to finish. The |
airflow.sensors.time_delta |
Waits for a specified time period |
airflow.sensors.time_sensor |
Waits until the specified time of the day |
airflow.sensors.weekday |
Waits until the first specified day of the week. For example, if the execution day of the task is |
A more detailed description of each sensor is available in the Airflow documentation.
Here’s an example of a possible sensor usage in a DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync (1)
dag = DAG(
dag_id="sensor_example",
schedule=None,
start_date=datetime(2023, 6, 4),
)
time_sensor_example = TimeDeltaSensor( (2)
task_id='time_sensor_example',
dag=dag,
delta=timedelta(minutes=1), (3)
poke_interval=10, (4)
timeout=30, (5)
soft_fail=True, (6)
)
1 | Import the sensor operators. TimeDeltaSensorAsync is an async version of the TimeDeltaSensor operator. |
2 | Instantiate the sensor. If the sensor mode is not defined, it defaults to poke . |
3 | Specifies for how long to wait before the task can be marked as succeeded. |
4 | Defines how often a sensor checks the conditions (in seconds). |
5 | Defines for how long the sensor will be working. |
6 | Marks the sensor’s task as SKIPPED if it fails. |
Write a custom sensor
To create a custom sensor, you have to inherit the BaseSensorOperator
class in your Python function.
In this example, we will create a new sensor that checks the existence of a file.
-
In the dags directory of your Airflow home, add a new Python file that will contain your custom sensor:
$ sudo vi /opt/airflow/dags/custom_sensor.py
-
Add the code for your sensor:
import os from airflow.sensors.base import BaseSensorOperator (1) class MyCustomFileSensor(BaseSensorOperator): (2) def __init__(self, filepath, *args, **kwargs): (3) self.filepath = filepath super().__init__(*args, **kwargs) def poke(self, context): (4) print(f'checking if a file {self.filepath} exists...') return os.path.exists(self.filepath) (5)
1 Import the base sensor class. 2 Inherit the base class in your custom sensor. 3 Add additional arguments and keep the arguments of the parent class. 4 Override the poke
method.5 Add the logic for checking the existence of the file. -
Call your custom sensor inside a DAG:
from custom_sensor import MyCustomFileSensor (1) check_file = MyCustomFileSensor( (2) task_id='check_file', filepath='/opt/airflow/test_file.txt', poke_interval=10, timeout=60, dag=dag )
1 Import the custom sensor class. 2 Call the custom sensor function with the required parameters. -
Go to the Airflow web UI to trigger the DAG and make sure that the custom sensor function is working.
Airflow UI: DAG pageAirflow UI: DAG pageThe DAG page shows that the
check_file
task with theMyCustomFileSensor
class has finished successfully.