Пользовательские функции и таблицы Executable

В данной статье описаны способы расширения встроенной функциональности ADQM/ClickHouse для выполнения специализированных задач:

Создание таблицы для выполнения примеров

 

Создайте таблицу sales с тестовыми данными, которые будут использоваться в примерах, приведенных в данной статье:

CREATE TABLE sales (sale_id UInt32, product_name String, product_price UInt32, quantity UInt8)
ENGINE = MergeTree() ORDER BY (sale_id);
INSERT INTO sales VALUES
    (1, 'product_1', 100, 15),
    (2, 'product_2', 200, 20),
    (3, 'product_3', 300, 5),
    (4, 'product_2', 200, 25),
    (5, 'product_1', 100, 50);
   ┌─sale_id─┬─product_name─┬─product_price─┬─quantity─┐
1. │       1 │ product_1    │           100 │       15 │
2. │       2 │ product_2    │           200 │       20 │
3. │       3 │ product_3    │           300 │        5 │
4. │       4 │ product_2    │           200 │       25 │
5. │       5 │ product_1    │           100 │       50 │
   └─────────┴──────────────┴───────────────┴──────────┘

Пользовательские функции SQL

Чтобы создать функцию из лямбда-выражения, используйте запрос CREATE FUNCTION:

CREATE FUNCTION <function_name> [ON CLUSTER <cluster_name>] AS (<parameter_0>, <parameter_1> ...) -> <expression>;

где:

  • <function_name> — имя функции, которое должно быть уникально среди всех пользовательских и системных функций;

  • <parameter_0>, <parameter_1> …​ — список параметров, в котором должны быть перечислены все используемые функцией переменные;

  • <expression> — выражение, состоящее из параметров функции, констант, операторов и вызовов других функций. Рекурсивные функции не допускаются.

Удалить пользовательскую функцию SQL можно запросом DROP FUNCTION <function_name>.

Пример

Создайте функцию, вычисляющую произведение аргументов:

CREATE FUNCTION sql_udf AS (a, b) -> a * b;

Убедитесь, что функция добавлена в список функций ADQM:

SELECT name, create_query FROM system.functions WHERE origin = 'SQLUserDefined';
   ┌─name────┬─create_query─────────────────────────────────┐
1. │ sql_udf │ CREATE FUNCTION sql_udf AS (a, b) -> (a * b) │
   └─────────┴──────────────────────────────────────────────┘

Используйте созданную функцию для расчета общей стоимости каждой продажи в тестовой таблице sales:

SELECT sale_id, product_price, quantity, sql_udf(product_price, quantity) AS total_price FROM sales;
   ┌─sale_id─┬─product_price─┬─quantity─┬─total_price─┐
1. │       1 │           100 │       15 │        1500 │
2. │       2 │           200 │       20 │        4000 │
3. │       3 │           300 │        5 │        1500 │
4. │       4 │           200 │       25 │        5000 │
5. │       5 │           100 │       50 │        5000 │
   └─────────┴───────────────┴──────────┴─────────────┘

Исполняемые пользовательские функции

Исполняемая пользовательская функция (user-defined function, UDF) вызывает внешнюю программу или скрипт для обработки данных. Чтобы создать такую функцию, необходимо выполнить следующие действия:

  1. Опишите конфигурацию функции в XML-файле:

    <functions>
        <function>
            <name>...</name>
            <type>executable</type>
            <return_type>...</return_type>
            <return_name>...</return_name>
            <argument>
                <type>...</type>
                <name>...</name>
            </argument>
            <argument>
                <type>...</type>
                <name>...</name>
            </argument>
            <format>...</format>
            <command>...</command>
        </function>
    </functions>
    Параметры конфигурации исполняемой пользовательской функции

    name

    Имя функции

    type

    Вариант запуска команды. Возможные значения:

    • executable — запускается одна команда;

    • executable_pool — создается пул команд.

    return_type

    Тип возвращаемого значения

    return_name

    Имя возвращаемого значения. Необходимо указать, если имя возвращаемого значения является частью сериализации для формата пользовательской функции (например, Native или JSONEachRow). Значение по умолчанию — result

    argument

    Описание аргумента, включающее его тип (type) и, опционально, имя (name). Имена аргументов необходимо указывать, если они являются частью сериализации для формата пользовательской функции (например, Native или JSONEachRow). Значение имени аргумента по умолчанию — c c номером аргумента. Каждый аргумент функции описывается отдельно

    format

    Формат, в котором аргументы передаются в команду

    command

    Имя скрипта для выполнения или команда, если значение параметра execute_direct — false. Команда должна читать аргументы из stdin и выводить результат в stdout. Обработка должна выполняться в цикле, то есть после обработки группы аргументов команда должна ожидать следующую группу

    lifetime

    Интервал перезагрузки функции в секундах. Если значение 0 (по умолчанию), функция не перезагружается

    execute_direct

    Если значение 1 (по умолчанию), выполняется поиск команды, указанной в command, в каталоге с файлами пользовательских скриптов, который определяется параметром user_scripts_path в конфигурации сервера. Дополнительные аргументы скрипта можно указать, разделяя их пробелами (например: <script_name> <arg1> <arg2>). Если execute_direct = 0, содержимое command передается как аргумент для bin/sh -c

    В конфигурации исполняемой пользовательской функции можно также дополнительно указать параметры выполнения скрипта, описание которых приведено ниже.

  2. В параметре user_defined_executable_functions_config конфигурации сервера (config.xml) укажите путь к XML-файлу с конфигурацией функции (путь можно указать абсолютным или относительно конфигурационного файла сервера). Конфигурации исполняемых пользовательских функций могут находиться в одном или нескольких XML-файлах — в последнем случае можно использовать wildcard-символы * и ?, чтобы указать путь к файлам, например:

    <user_defined_executable_functions_config>*_function.xml</user_defined_executable_functions_config>
  3. В параметре user_scripts_path конфигурации сервера укажите каталог, где будут храниться скрипты для исполняемых пользовательских функций (по умолчанию /var/lib/clickhouse/user_scripts), и поместите в этот каталог скрипт, указанный в конфигурации функции как command. Убедитесь, что:

    • На сервере ADQM установлено программное обеспечение, необходимое для запуска исполняемого скрипта.

    • На выполнение скрипта выдано разрешение.

Пример

Данный пример показывает, как создать пользовательскую функцию, выполняющую Python-скрипт. Предварительно проверьте, что на сервере ADQM установлен Python и все необходимые библиотеки.

Опишите конфигурацию функции в файле /etc/clickhouse-server/UDFs.xml:

<functions>
    <function>
        <type>executable</type>
        <name>total_price_function</name>
        <return_type>UInt32</return_type>
        <return_name>total_price</return_name>
        <argument>
            <type>UInt32</type>
            <name>product_price</name>
        </argument>
        <argument>
            <type>UInt8</type>
            <name>quantity</name>
        </argument>
        <format>JSONEachRow</format>
        <command>total_price.py</command>
    </function>
</functions>

Укажите путь к файлу с конфигурацией исполняемой пользовательской функции в параметре user_defined_executable_functions_config файла config.xml:

<user_defined_executable_functions_config>UDFs.xml</user_defined_executable_functions_config>

В файле /var/lib/clickhouse/user_scripts/total_price.py напишите Python-скрипт:

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
    for line in sys.stdin:
        data = json.loads(line)
        price = data['product_price']
        quantity = data['quantity']
        result = {'total_price': price * quantity}
        print(json.dumps(result), end='\n')
        sys.stdout.flush()

Дайте разрешение на выполнение скрипта:

$ sudo chmod +x /var/lib/clickhouse/user_scripts/total_price.py

Убедитесь, что функция добавлена в ADQM:

SELECT name FROM system.functions WHERE origin = 'ExecutableUserDefined';
   ┌─name─────────────────────┐
1. │ total_price_function     │
   └──────────────────────────┘

Выполните запрос на выборку данных из таблицы sales, используя функцию total_price_function для вычисления стоимости каждой продажи:

SELECT sale_id, product_price, quantity, total_price_function(product_price, quantity) AS total_price FROM sales;
   ┌─sale_id─┬─product_price─┬─quantity─┬─total_price─┐
1. │       1 │           100 │       15 │        1500 │
2. │       2 │           200 │       20 │        4000 │
3. │       3 │           300 │        5 │        1500 │
4. │       4 │           200 │       25 │        5000 │
5. │       5 │           100 │       50 │        5000 │
   └─────────┴───────────────┴──────────┴─────────────┘

Табличная функция executable

Табличная функция executable создает таблицу на основе выходных данных функции, определенной в пользовательском скрипте, который выводит строки в stdout.

Синтаксис табличной функции executable в общем виде:

executable(<script_name>, <format>, <table_structure>, [<input_query> ...] [,SETTINGS <parameter_name>=<value>, ...])

где:

  • <script_name> — имя файла исполняемого скрипта, сохраненного в каталоге, который определяется параметром user_scripts_path в конфигурации сервера (по умолчанию — /var/lib/clickhouse/user_scripts/). Скрипт может считывать данные из любого источника.

  • <format> — формат, в котором функция будет принимать данные.

  • <table_structure> — структура генерируемой таблицы (указывается в виде <column_name1> <data_type1>, <column_name2> <data_type2>, …​).

  • <input_query> — запрос (или набор запросов), результаты которого будут передаваться в stdin для чтения скриптом.

  • SETTINGS — дополнительные настройки выполнение скрипта.

Перед вызовом табличной функции executable убедитесь, что на сервере ADQM установлены все необходимые пакеты для запуска исполняемого скрипта, а также что на выполнения скрипта выдано разрешение.

ПРИМЕЧАНИЕ
  • Если необходимо многократно вызывать один и тот же скрипт с одними и теми же входными запросами, рассмотрите возможность использования табличного движка Executable или ExecutablePool.

  • Ключевым отличием между обычными исполняемыми пользовательскими функциями и табличной функцией executable или табличным движком Executable является то, что обычные исполняемые пользовательские функции не могут изменять количество строк (например, если входные данные содержат 100 строк, в результате вернется 100 строк), а при использовании в табличной функции executable или табличном движке Executable скрипт может выполнять любые преобразования данных, включая сложные агрегации.

Пример

Заполните файл /var/lib/clickhouse/user_scripts/add_tax.py текстом Python-скрипта, который будет добавлять налог к цене продукта:

#!/usr/bin/python3

import sys

if __name__ == '__main__':
     for line in sys.stdin:

           split_line = line.split()

           product = split_line[0]
           price = int(split_line[1])
           price_with_tax = price*1.2

           print(product + '\t' + str(price) + '\t' + str(price_with_tax) + '\n', end='')
           sys.stdout.flush()

Дайте разрешение на выполнение скрипта:

$ sudo chmod +x /var/lib/clickhouse/user_scripts/add_tax.py

Используйте табличную функцию executable, чтобы получить таблицу с ценами уникальных продуктов из таблицы продаж sales и вычислить цены с учетом налога с помощью скрипта add_tax.py:

SELECT * FROM executable(
    'add_tax.py',
    TabSeparated,
    'product  String, price UInt32, price_with_tax Float32',
    (SELECT DISTINCT ON (product_name) product_name, product_price FROM sales));

Результат:

   ┌─product───┬─price─┬─price_with_tax─┐
1. │ product_1 │   100 │            120 │
2. │ product_2 │   200 │            240 │
3. │ product_3 │   300 │            360 │
   └───────────┴───────┴────────────────┘

Табличные движки Executable и ExecutablePool

Табличные движки Executable и ExecutablePool позволяют создать таблицу, строки которой генерируются из указанного скрипта:

  • Executable — запускает скрипт при каждом запросе данных;

  • ExecutablePool — поддерживает пул постоянных процессов и берет процессы из пула для чтения данных.

Синтаксис запроса для создания таблицы на основе движка Executable или ExecutablePool в общем виде:

CREATE TABLE <table_name> (<column_name1> <data_type1>, <column_name2> <data_type2>, ...)
ENGINE = Executable|ExecutablePool(<script_name>, <format>, [<input_query> ...])
[SETTINGS <parameter_name>=<value>, ...];

где:

  • <script_name> — исполняемый скрипт, на основе выходных данных которого будут строиться данные таблицы <table_name> каждый раз, когда в нее будет направляться запрос SELECT. Скрипт может считывать данные из любого источника. Файл скрипта должен быть сохранен в каталоге, определяемом параметром конфигурации сервера user_scripts_path (по умолчанию — /var/lib/clickhouse/user_scripts/).

  • <format> — формат данных, в котором таблица будет принимать данные.

  • <input_query> — запрос (или набор запросов), результаты которого будут передаваться в stdin для чтения скриптом.

  • SETTINGS — дополнительные параметры выполнения скрипта.

Пример

Создайте таблицу типа Executable, которая будет заполняться данными, генерируемыми скриптом add_tax.py из примера выше:

CREATE TABLE prices_with_tax (product  String, price UInt32, price_with_tax Float32)
ENGINE = Executable('add_tax.py', TabSeparated, (SELECT DISTINCT ON (product_name) product_name, product_price FROM sales));

Запрос CREATE TABLE создает таблицу, но не вызывает скрипт. Скрипт вызывается запросом на выборку данных из таблицы:

SELECT * FROM prices_with_tax;
   ┌─product───┬─price─┬─price_with_tax─┐
1. │ product_1 │   100 │            120 │
2. │ product_2 │   200 │            240 │
3. │ product_3 │   300 │            360 │
   └───────────┴───────┴────────────────┘

Параметры выполнения скрипта

При создании исполняемой пользовательской функции, вызове табличной функции executable и создании таблиц на основе движков Executable/ExecutablePool (базовый синтаксис запросов приводится в соответствующих разделах выше) можно указать следующие параметры, регулирующие выполнение скрипта.

send_chunk_header

Контролирует, нужно ли отправлять количество строк в каждом блоке данных перед отправкой блока на обработку. Значение по умолчанию — false

command_termination_timeout

Максимальное время (в секундах), в течение которого команда должна завершиться после закрытия конвейера. Если команда не завершается, то процессу отправляется сигнал SIGTERM. Значение по умолчанию — 10

command_read_timeout

Время ожидания чтения данных из stdout команды (в миллисекундах). Значение по умолчанию — 10000

command_write_timeout

Время ожидания записи данных в stdin команды (в миллисекундах). Значение по умолчанию — 10000

pool_size

Размер пула процессов. Если значение 0, то ограничений по размеру пула нет. Значение по умолчанию — 16.

Параметр релевантен для:

  • исполняемой пользовательской функции типа executable_pool;

  • табличной функции executable;

  • табличного движка ExecutablePool.

max_command_execution_time

Максимальное время выполнения команды исполняемого скрипта для обработки блока данных (в секундах). Значение по умолчанию — 10

Параметр релевантен для:

  • исполняемой пользовательской функции типа executable_pool;

  • табличной функции executable;

  • табличного движка ExecutablePool.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней