User-defined functions and Executable tables

This article describes ways to extend the built-in functionality of ADQM/ClickHouse to perform custom tasks:

Create a table for test examples

 

Create the sales table with test data to be used in the examples this article provides:

CREATE TABLE sales (sale_id UInt32, product_name String, product_price UInt32, quantity UInt8)
ENGINE = MergeTree() ORDER BY (sale_id);
INSERT INTO sales VALUES
    (1, 'product_1', 100, 15),
    (2, 'product_2', 200, 20),
    (3, 'product_3', 300, 5),
    (4, 'product_2', 200, 25),
    (5, 'product_1', 100, 50);
   ┌─sale_id─┬─product_name─┬─product_price─┬─quantity─┐
1. │       1 │ product_1    │           100 │       15 │
2. │       2 │ product_2    │           200 │       20 │
3. │       3 │ product_3    │           300 │        5 │
4. │       4 │ product_2    │           200 │       25 │
5. │       5 │ product_1    │           100 │       50 │
   └─────────┴──────────────┴───────────────┴──────────┘

SQL user-defined functions

To create a function from a lambda expression, use the CREATE FUNCTION query:

CREATE FUNCTION <function_name> [ON CLUSTER <cluster_name>] AS (<parameter_0>, <parameter_1> ...) -> <expression>;

where:

  • <function_name> — function name that should be unique among all user-defined and system functions;

  • <parameter_0>, <parameter_1> …​ — list of parameters where all variables used by the function should be specified;

  • <expression> — expression consisting of function parameters, constants, operators, and calls to other functions. Recursive functions are not allowed.

To delete an SQL user-defined function, use the DROP FUNCTION <function_name> query.

Example

Create a function that multiplies arguments:

CREATE FUNCTION sql_udf AS (a, b) -> a * b;

Check that the function is added to the list of ADQM functions:

SELECT name, create_query FROM system.functions WHERE origin = 'SQLUserDefined';
   ┌─name────┬─create_query─────────────────────────────────┐
1. │ sql_udf │ CREATE FUNCTION sql_udf AS (a, b) -> (a * b) │
   └─────────┴──────────────────────────────────────────────┘

Use the created function to calculate the total cost for each sale in the sales table:

SELECT sale_id, product_price, quantity, sql_udf(product_price, quantity) AS total_price FROM sales;
   ┌─sale_id─┬─product_price─┬─quantity─┬─total_price─┐
1. │       1 │           100 │       15 │        1500 │
2. │       2 │           200 │       20 │        4000 │
3. │       3 │           300 │        5 │        1500 │
4. │       4 │           200 │       25 │        5000 │
5. │       5 │           100 │       50 │        5000 │
   └─────────┴───────────────┴──────────┴─────────────┘

Executable user-defined functions

An executable user-defined function (UDF) calls an external program or script to process data. To create such a function, do the following:

  1. In an XML file, describe the function configuration:

    <functions>
        <function>
            <name>...</name>
            <type>executable</type>
            <return_type>...</return_type>
            <return_name>...</return_name>
            <argument>
                <type>...</type>
                <name>...</name>
            </argument>
            <argument>
                <type>...</type>
                <name>...</name>
            </argument>
            <format>...</format>
            <command>...</command>
        </function>
    </functions>
    Configuration parameters of executable user-defined function

    name

    Function name

    type

    Command execution type. Possible values:

    • executable — a single command is started;

    • executable_pool —  a pool of processes is created.

    return_type

    Type of a returned value

    return_name

    Name of a returned value. It should be specified if a return value name is part of serialization for the user-defined function format (for example, Native or JSONEachRow). The default value is result

    argument

    Argument description that includes the type (type) and optionally name (name) of the argument. Argument names are required if they are part of serialization for the user-defined function format (for example, Native or JSONEachRow). The default argument name is c with an argument number. Each argument should be described in a separate argument tag

    format

    Format in which arguments are passed to a command

    command

    Name of a script to execute or (if execute_direct is set to 0) command. The command should:

    • read arguments from stdin and output the result to stdout;

    • process arguments iteratively (i.e. after processing a group of arguments, it should wait for the next group).

    lifetime

    Function reload interval in seconds. If it is set to 0 (by default), the function is not reloaded

    execute_direct

    If 1 (by default), a command set in command will be searched for in the directory with user script files defined by the user_scripts_path parameter in the server configuration. Additional script arguments can be specified with whitespace separators (for example: <script_name> <arg1> <arg2>). If execute_direct = 0, the command value is passed as an argument for bin/sh -c

    In the configuration of an executable user-defined function, you can also specify additional script execution parameters described below.

  2. In the user_defined_executable_functions_config parameter of the server configuration (config.xml), specify the path to the file with the executable UDF configuration (the path can be specified as absolute or relative to the server configuration file). Configurations of executable user-defined functions can be located in one or more XML files — in the latter case, you can use the * and ? wildcards to specify the path to multiple files, for example:

    <user_defined_executable_functions_config>*_function.xml</user_defined_executable_functions_config>
  3. In the user_scripts_path parameter of the server configuration, specify the directory to store scripts for the executable user-defined functions (the default path is /var/lib/clickhouse/user_scripts), and place the script specified in the executable UDF configuration as command in this directory. Make sure that:

    • An ADQM server has the software required to run the executable script installed.

    • Permission to execute the script has been granted.

Example

This example shows how to create a user-defined function that executes a Python script. Before you begin, make sure that Python and all necessary libraries are installed on your ADQM server.

Describe the executable user-defined function configuration in the /etc/clickhouse-server/UDFs.xml file:

<functions>
    <function>
        <type>executable</type>
        <name>total_price_function</name>
        <return_type>UInt32</return_type>
        <return_name>total_price</return_name>
        <argument>
            <type>UInt32</type>
            <name>product_price</name>
        </argument>
        <argument>
            <type>UInt8</type>
            <name>quantity</name>
        </argument>
        <format>JSONEachRow</format>
        <command>total_price.py</command>
    </function>
</functions>

In the config.xml file, specify the path to the file with this function configuration in the user_defined_executable_functions_config parameter:

<user_defined_executable_functions_config>UDFs.xml</user_defined_executable_functions_config>

In the /var/lib/clickhouse/user_scripts/total_price.py file, write the Python script:

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
    for line in sys.stdin:
        data = json.loads(line)
        price = data['product_price']
        quantity = data['quantity']
        result = {'total_price': price * quantity}
        print(json.dumps(result), end='\n')
        sys.stdout.flush()

Give a permission to execute the script:

$ sudo chmod +x /var/lib/clickhouse/user_scripts/total_price.py

Make sure the function is added to ADQM:

SELECT name FROM system.functions WHERE origin = 'ExecutableUserDefined';
   ┌─name─────────────────────┐
1. │ total_price_function     │
   └──────────────────────────┘

Run a query to select data from the sales table with the total price of each sale calculated via the total_price_function function:

SELECT sale_id, product_price, quantity, total_price_function(product_price, quantity) AS total_price FROM sales;
   ┌─sale_id─┬─product_price─┬─quantity─┬─total_price─┐
1. │       1 │           100 │       15 │        1500 │
2. │       2 │           200 │       20 │        4000 │
3. │       3 │           300 │        5 │        1500 │
4. │       4 │           200 │       25 │        5000 │
5. │       5 │           100 │       50 │        5000 │
   └─────────┴───────────────┴──────────┴─────────────┘

The executable table function

The executable table function creates a table based on the output of a function defined in a user script that prints rows to stdout.

The general syntax of the executable table function is:

executable(<script_name>, <format>, <table_structure>, [<input_query> ...] [,SETTINGS <parameter_name>=<value>, ...])

where:

  • <script_name> — name of an executable script file stored in the directory that is set in the user_scripts_path parameter of the server configuration (the default directory is /var/lib/clickhouse/user_scripts/). A script can read data from any source.

  • <format> — format in which the function will receive data.

  • <table_structure> — structure of a table to be generated (define it as <column_name1> <data_type1>, <column_name2> <data_type2>, …​).

  • <input_query> — query (or set of queries) that streams its results to stdin for the script to read.

  • SETTINGS — additional settings to manage the script execution.

Before you call the executable table function, make sure that your ADQM server has all packages necessary to run the executable script installed, and that permission to execute the script has been granted.

NOTE
  • If you are going to call the same script with the same input queries multiple times, consider using the Executable or ExecutablePool table engine.

  • The key advantage between regular executable UDFs and the executable table function or Executable table engine is that regular executable UDFs cannot change the number of rows they return (e.g. if the input contains 100 rows, the result will return 100 rows). However, when used with the executable table function or Executable table engine, the script can perform any data transformations, including complex aggregations.

Example

Fill the /var/lib/clickhouse/user_scripts/add_tax.py file with the text of the Python script that will add tax to a product price:

#!/usr/bin/python3

import sys

if __name__ == '__main__':
     for line in sys.stdin:

           split_line = line.split()

           product = split_line[0]
           price = int(split_line[1])
           price_with_tax = price*1.2

           print(product + '\t' + str(price) + '\t' + str(price_with_tax) + '\n', end='')
           sys.stdout.flush()

Give a permission to execute the script:

$ sudo chmod +x /var/lib/clickhouse/user_scripts/add_tax.py

Use the executable table function to get a table with prices of unique products from the sales table and calculate these prices with tax using the add_tax.py script:

SELECT * FROM executable(
    'add_tax.py',
    TabSeparated,
    'product  String, price UInt32, price_with_tax Float32',
    (SELECT DISTINCT ON (product_name) product_name, product_price FROM sales));

Result:

   ┌─product───┬─price─┬─price_with_tax─┐
1. │ product_1 │   100 │            120 │
2. │ product_2 │   200 │            240 │
3. │ product_3 │   300 │            360 │
   └───────────┴───────┴────────────────┘

The Executable and ExecutablePool table engines

The Executable or ExecutablePool table engine allows you to create a table whose rows are generated from your executable script:

  • Executable — runs a script on each data selection;

  • ExecutablePool — maintains a pool of persistent processes and takes processes from the pool for reads.

The basic syntax of a query to create a table based on the Executable or ExecutablePool engine is:

CREATE TABLE <table_name> (<column_name1> <data_type1>, <column_name2> <data_type2>, ...)
ENGINE = Executable|ExecutablePool(<script_name>, <format>, [<input_query> ...])
[SETTINGS <parameter_name>=<value>, ...];

where:

  • <script_name> — executable script whose output will be used to generate data for the <table_name> table each time a SELECT query is sent to it. The script can read data from any source. The script file should be saved in the directory defined by the user_scripts_path server configuration parameter (the default path is_/var/lib/clickhouse/user_scripts/_).

  • <format> — data format in which the table will accept data.

  • <input_query> — query (or set of queries) that streams its results to stdin for the script to read.

  • SETTINGS — additional script execution parameters.

Example

Create an Executable table that will be populated with data generated by the add_tax.py script from the example above:

CREATE TABLE prices_with_tax (product  String, price UInt32, price_with_tax Float32)
ENGINE = Executable('add_tax.py', TabSeparated, (SELECT DISTINCT ON (product_name) product_name, product_price FROM sales));

The CREATE TABLE query creates a table but does not invoke the script. The script is called by a query to select data from the table:

SELECT * FROM prices_with_tax;
   ┌─product───┬─price─┬─price_with_tax─┐
1. │ product_1 │   100 │            120 │
2. │ product_2 │   200 │            240 │
3. │ product_3 │   300 │            360 │
   └───────────┴───────┴────────────────┘

Script execution parameters

When creating an executable user-defined function, calling the executable table function, and creating Executable/ExecutablePool tables (the basic syntax of corresponding queries is provided in the sections above), you can specify the following parameters that control a script execution.

send_chunk_header

Specifies whether to send the number of rows in each chunk of data before sending a chunk to process. The default value is false

command_termination_timeout

Maximum time (in seconds) during which a command should shut down after its pipe is closed. After that time, the SIGTERM signal is sent to the process executing the command. The default value is 10

command_read_timeout

Timeout to read data from command stdout (in milliseconds). The default value is 10000

command_write_timeout

Timeout to write data to command stdin (in milliseconds). The default value is 10000

pool_size

Process pool size. If the value is 0, there are no pool size restrictions. The default value is 16.

The parameter is relevant for:

  • executable user-defined function with type set to executable_pool;

  • executable table function;

  • ExecutablePool table engine.

max_command_execution_time

Maximum script command execution time for processing a block of data (in seconds). The default value is 10

The parameter is relevant for:

  • executable user-defined function with type set to executable_pool;

  • executable table function;

  • ExecutablePool table engine.

Found a mistake? Seleсt text and press Ctrl+Enter to report it