Конфигурация коннектора и эксплуатация

Системные требования

Для работы коннектора необходимо, чтобы все ноды кластера ADB имели доступ к TCP-порту 8123 на всех нодах кластера Clickhouse, через которые планируется производиться загрузка данных.

Модуль PXF

Для загрузки данных в Clickhouse необходимо создать внешнюю таблицу, указав в ее директиве LOCATION протокол PXF с профилем модуля TKH и его параметры.

Синтаксис

CREATE WRITABLE EXTERNAL TABLE <table_name> (
    { <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
    'pxf://<clickhouse_table_name>?<pxf_parameters><settings>'
)
FORMAT 'TEXT'
ENCODING 'UTF8';

Где <pxf_parameters>:

{ PROFILE=TKH | ACCESSOR=io.arenadata.tkh.TkhAccessor&RESOLVER=io.arenadata.tkh.TkhResolver }
[ &SERVER=<server_name> ]

Значение <server_name> и <settings> описаны далее.

Параметры модуля

  • URL – используется для указания нод, на которые равномерно производится загрузка данных.
    • URL ноды кластера Clickhouse или список нод, разделеных запятой ,
    • Clickhouse URL имеет синтаксис: <user>:<password>@<host>:<port>
    • Option: URL
    • Параметр конфигурации: clickhouse.dist.simple.url
    • Значение: String
  • Максимальная длина запроса (в строках) – когда длина достигнута, пакет формируется и кладется в очередь на отправку.
    • Option: req_length_max
    • Параметр конфигурации: clickhouse.request.length.max
    • Значение: Integer >= 0
    • Значение по умолчанию: 76800
  • Максимальный размер запроса (в байтах) – когда размер достигнут, пакет формируется и кладется в очередь на отправку.
    • Option: req_volume_max
    • Параметр конфигурации: clickhouse.request.volume.max
    • Значение: Integer >= 0
    • Значение по умолчанию: 52428800 (50Mib)
  • Число потоков для отправки – число потоков создается каждым экземпляром PXF (число экземпляров PXF в данный момент равно числу нод в кластере ADB).
    • Option: send_threads
    • Параметр конфигурации: clickhouse.send.threads
    • Значение: Integer >= 0
    • Значение по умолчанию: 2
  • Задержка между отправками в потоках – базовая задержка между двумя последовательными запросами к Clickhouse.
    • Option: send_delay
    • Параметр конфигурации: clickhouse.send.delay
    • Значение: Integer >= 0
    • Значение по умолчанию: 300
  • Множитель размера очереди для потоков – модель использует очередь для отправки запросов в отправляющие потоки. Максимальная длина очереди определяется как произведение двух переменных: число потоков для отправки и множитель размера очереди. Важно обратить внимание, что реальный размер очереди может превышать значение полученного произведения, однако, он никогда не превышает значения: число потоков для отправки умноженное на множитель размера очереди + 1.
    • Option: send_queue_sizeMultiplier
    • Параметр конфигурации: clickhouse.send.queue.sizeMultiplier
    • Значение: Double >= 0.0
    • Значение по умолчанию: 2.0
  • Время ожидание сети (мс) – время ожидания HTTP-запроса.
    • Option: net_timeout
    • Параметр конфигурации: clickhouse.network.timeout
    • Значение: Integer >= 0
    • Значение по умолчанию: 60000

Дополнительно

Для оптимальной производительности стоит указывать распределение внешней таблицы аналогично таблице-источнику, что позволит не перераспределять данные перед отправкой, а слать напрямую с сегментов. Этого можно достигнуть, создавая таблицу через LIKE source_table или явно указывая ту же самую колонку в директиве DISTRIBUTED BY.

Расширение

Расширение используется в случаях, когда необходимы дополнительные гарантии консистентности вставки. При этом необходимые настройки кластера Clickhouse автоматически проверяются при попытке вставки, используя процедуру расширения, но в частности требуется включенная опция insert_distributed_sync для распределенных таблиц.

Создание внешней таблицы

При создании внешней таблицы следует указывать имя промежуточной таблицы, которое зависит от id транзакции. Например, если в Clickhouse имя таблицы default.t, то в директиве LOCATION надо указать default.t_tmp_$. Также следует указывать полное имя таблицы вместе с именем базы данных.

Important

В параметрах внешней таблицы нельзя использовать директиву SERVER, хранящую название конфигурации PXF в локальных файлах (при этом не указывая параметр URL). Расширению требуются адреса хостов кластера Clickhouse, которые оно берет из параметра URL

Полное выражение для создания внешней таблицы выглядит следующим образом:

create writable external table ext_tct(a int)
location ('pxf://default.d_tct_tmp_$?profile=tkh&url=default@sdch1:8123')
format 'text' encoding 'utf8';

Где sdch1 – имя ноды кластера Clickhouse, на которой создана распределенная таблица default.d_tct.

Процедура и ее параметры

После создания внешней таблицы в нее можно выполнить вставку с помощью процедуры расширения:

select txn('insert into ext_tct select generate_series(1, 10000);');

Процедура определена следующим образом:

function txn(query text, http_port int default 8123, debug boolean default false, ending_pattern text default '_tmp_$')
returns void

Параметры процедуры:

  • query – запрос ADB, который вставляет данные во внешнюю таблицу. Запрос валидируется и разбирается для получения информации о кластере Clickhouse и его таблицах. Удобно использовать $$ insert into...$$ вместо кавычек (поскольку это позволяет использовать перевод строки и кавычки внутри $$);
  • http_port – общий для всех нод, вовлеченных в данный запрос, HTTP-порт ClickHouse. В данный момент нет возможности получить это значение через SQL (стоит указывать, если отличен от 8123);
  • debug – включает логирование в процедуре txn. Выводит информацию о всех внутренних операциях: создание промежуточных таблиц, валидация данных, переключение и удаление кусков данных;
  • ending_pattern – по умолчанию процедура txn создает промежуточный слой с очень близкой структурой к оригинальной топологии, но имена таблиц при этом имеют окончания, включающие номер транзакции. Этот параметр позволяет изменить шаблон для временных таблиц.

Очистка промежуточных таблиц

В случае обрыва транзакции может понадобиться очистка промежуточных таблиц на всех нодах кластера. Для этого можно использовать вспомогательные процедуры и запросы:

  1. Определение промежуточных таблиц:
select database || '.' || name as tbl from sys_tables('sdch1', 8123)
where database = 'default' and name like 'd_tct_tmp_%';
           tbl
-------------------------
 default.d_tct_tmp_34395
(1 row)
  1. Теперь, зная, что промежуточные таблицы созданы с номером транзакции 34395, можно удалить их на всех нодах кластера:
with ch(h, p) as (values ('sdch1', 8123), ('sdch2', null))
select drop_staging_tbl_based_on('default', 'd_tct', ch.h, ch.p, 'default', '', false, '_tmp_34395') from ch;