Add custom operators and hooks in Airflow

Overview

Airflow provides standard operators that, for example, allow users to execute bash commands, call arbitrary Python functions, create a connection to an external app, or check for a specific condition.

The list of operators that Airflow provides out-of-the-box is extensive, but in some cases it is necessary to modify an operator or create your own.

This article describes how to write custom operators and hooks. To learn about custom sensors, see the Use sensors in Airflow article.

When creating any custom Airflow operator, you need to:

  1. Write the code for a new operator in the __init__ method.

  2. Inherit an Airflow class and either rewrite or extend its functionality by overriding methods in a derived class:

    • In the constructor, define the parameters of your operator.

    • Provide the code to execute when the runner calls the operator.

  3. Place the file in a directory that is present in the Airflow’s PYTHONPATH. By default, it’s the following directories: dags, plugins, and config.

The examples below illustrate implementations of these steps for writing a custom operator and a hook.

Division of the logic between operators and hooks

 

An operator should contain the business logic of your task and in-memory states. If you need to establish communication with an external service (API, database, etc.), it’s better to write it as a hook.

A hook should not contain the logic of the operator but only expose methods that will then be used by an operator for interfacing with an external service.

By keeping the logic separated into two components, you will be able to build multiple operators that use the same hook without duplicating the code.

Operators and hooks interaction path
Operators and hooks interaction path
Operators and hooks interaction path
Operators and hooks interaction path

Create a custom operator

Here’s an example of creating a custom operator based on the airflow.models.baseoperator.BaseOperator class:

  1. In the dags directory of your Airflow home, add a new Python file that will contain your custom operator:

    $ sudo vi /opt/airflow/dags/custom_operator.py
  2. Add the code for your operator:

    from airflow.models.baseoperator import BaseOperator  (1)
    
    class CustomOperator(BaseOperator):  (2)
        def __init__(self, name: str, **kwargs) -> None:  (3)
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):  (4)
            message = f"Hello, {self.name}!"
            print(message)
            return message
    1 Import the base operator class.
    2 Inherit the base class in your custom operator.
    3 Add new arguments and keep the arguments of the parent class.
    4 Override the execute method with the desired functionality.
  3. Call your custom operator inside a DAG:

    from custom_operator import CustomOperator (1)
    
    print_message = CustomOperator(    (2)
        task_id='print_message',
        name='CustomOperator',
        poke_interval=10,
        timeout=60,
        dag=dag
    )
    1 Import the custom operator class.
    2 Call the custom operator function.

Create a custom hook

Creating a custom hook is different from the process of writing a custom operator in three ways:

  • The custom hook must inherit from a hook class, for example, the BaseHook class or any other existing hook.

  • You can include a .get_conn() method wrapping around a call to the .get_connection() method to retrieve information from an Airflow connection.

  • There must exist an Airflow connection to be used by the hook.

Below is an example of how to create a custom hook function that initiates a new connection to an external tool:

  1. In the dags directory of your Airflow home, add a new Python file that will contain your custom hook:

    $ sudo vi /opt/airflow/dags/custom_hook.py
  2. Add the code for your hook:

    from airflow.hooks.base import BaseHook (1)
    
    class CustomHook(BaseHook): (2)
    
        conn_name_attr = "custom_conn_id" (3)
        default_conn_name = "custom_conn_default" (4)
        conn_type = "http" (5)
        hook_name = "CustomHook" (6)
    
        def __init__(
            self, custom_conn_id: str = default_conn_name, *args, **kwargs
        ) -> None: (7)
            super().__init__(*args, **kwargs) (8)
            self.custom_conn_id = custom_conn_id (9)
            self.get_conn() (10)
    
        def get_conn(self):
            conn_id = getattr(self, self.conn_name_attr)
            conn = self.get_connection(conn_id)
            return conn
    1 Import the base hook class.
    2 Define the class to inherit from the base class.
    3 Add the parameter that receives the connection ID.
    4 Specify a default connection ID.
    5 Specify the connection type.
    6 Specify the name of the hook.
    7 Define the .init() method.
    8 Initialize the parent hook.
    9 Assign class variables.
    10 Optionally, call the .get_conn() method upon initialization.
  3. Call your custom hook inside a DAG:

    from custom_hook import CustomHook
  4. To use the custom hook, you need to create an Airflow connection with the parameters specified in the hook code.

    Creating a new connection in Airflow UI
    Creating a new connection in Airflow UI
    Creating a new connection in Airflow UI
    Creating a new connection in Airflow UI

For more information about the Airflow web interface, see the Airflow UI overview article.

Found a mistake? Seleсt text and press Ctrl+Enter to report it