Конфигурация коннектора и эксплуатация ======================================== Системные требования --------------------- Для работы коннектора необходимо, чтобы все ноды кластера **ADB** имели доступ к TCP-порту *8123* на всех нодах кластера **Clickhouse**, через которые планируется производиться загрузка данных. Модуль PXF ----------- Для загрузки данных в **Clickhouse** необходимо создать внешнюю таблицу, указав в ее директиве *LOCATION* протокол *PXF* с профилем модуля *TKH* и его параметры. Синтаксис ^^^^^^^^^^ :: CREATE WRITABLE EXTERNAL TABLE ( { [, ...] | LIKE } ) LOCATION ( 'pxf://?' ) FORMAT 'TEXT' ENCODING 'UTF8'; Где ````: :: { PROFILE=TKH | ACCESSOR=io.arenadata.tkh.TkhAccessor&RESOLVER=io.arenadata.tkh.TkhResolver } [ &SERVER= ] Значение ```` и ```` описаны далее. Параметры модуля ^^^^^^^^^^^^^^^^^^ + **URL** -- используется для указания нод, на которые равномерно производится загрузка данных. + URL ноды кластера Clickhouse или список нод, разделеных запятой ``,`` + Clickhouse URL имеет синтаксис: ``:@:`` + Option: ``URL`` + Параметр конфигурации: ``clickhouse.dist.simple.url`` + Значение: String + **Число потоков для отправки** -- число потоков создается каждым экземпляром *PXF* (число экземпляров *PXF* в данный момент равно числу нод в кластере **ADB**). + Option: ``send_threads`` + Параметр конфигурации: ``clickhouse.send.threads`` + Значение: Integer >= 0 + Значение по умолчанию: *2* + **Задержка между отправками в потоках** -- базовая задержка между двумя последовательными запросами к **Clickhouse**. + Option: ``send_delay`` + Параметр конфигурации: ``clickhouse.send.delay`` + Значение: Integer >= 0 + Значение по умолчанию: *300* + **Множитель размера очереди для потоков** -- модель использует очередь для отправки запросов в отправляющие потоки. Максимальная длина очереди определяется как произведение двух переменных: *число потоков для отправки* и *множитель размера очереди*. Важно обратить внимание, что реальный размер очереди может превышать значение полученного произведения, однако, он никогда не превышает значения: *число потоков для отправки* умноженное на *множитель размера очереди + 1*. + Option: ``send_queue_sizeMultiplier`` + Параметр конфигурации: ``clickhouse.send.queue.sizeMultiplier`` + Значение: Double >= 0.0 + Значение по умолчанию: *2.0* + **Время ожидание сети (мс)** -- время ожидания HTTP-запроса. + Option: ``net_timeout`` + Параметр конфигурации: ``clickhouse.network.timeout`` + Значение: Integer >= 0 + Значение по умолчанию: *60000* Дополнительно ^^^^^^^^^^^^^^^ Для оптимальной производительности стоит указывать распределение внешней таблицы аналогично таблице-источнику, что позволит не перераспределять данные перед отправкой, а слать напрямую с сегментов. Этого можно достигнуть, создавая таблицу через ``LIKE source_table`` или явно указывая ту же самую колонку в директиве *DISTRIBUTED BY*. Расширение ----------- Расширение используется в случаях, когда необходимы дополнительные гарантии консистентности вставки. При этом необходимые настройки кластера **Clickhouse** автоматически проверяются при попытке вставки, используя процедуру расширения, но в частности требуется включенная опция ``insert_distributed_sync`` для распределенных таблиц. Создание внешней таблицы ^^^^^^^^^^^^^^^^^^^^^^^^^ При создании внешней таблицы следует указывать имя промежуточной таблицы, которое зависит от id транзакции. Например, если в **Clickhouse** имя таблицы ``default.t``, то в директиве *LOCATION* надо указать ``default.t_tmp_$``. Также следует указывать полное имя таблицы вместе с именем базы данных. .. important:: В параметрах внешней таблицы нельзя использовать директиву *SERVER*, хранящую название конфигурации PXF в локальных файлах (при этом не указывая параметр *URL*). Расширению требуются адреса хостов кластера Clickhouse, которые оно берет из параметра *URL* Полное выражение для создания внешней таблицы выглядит следующим образом: :: create writable external table ext_tct(a int) location ('pxf://default.d_tct_tmp_$?profile=tkh&url=default@sdch1:8123') format 'text' encoding 'utf8'; Где ``sdch1`` -- имя ноды кластера **Clickhouse**, на которой создана распределенная таблица ``default.d_tct``. Процедура и ее параметры ^^^^^^^^^^^^^^^^^^^^^^^^^ После создания внешней таблицы в нее можно выполнить вставку с помощью процедуры расширения: :: select txn('insert into ext_tct select generate_series(1, 10000);'); Процедура определена следующим образом: :: function txn(query text, http_port int default 8123, debug boolean default false, ending_pattern text default '_tmp_$') returns void Параметры процедуры: + ``query`` -- запрос ADB, который вставляет данные во внешнюю таблицу. Запрос валидируется и разбирается для получения информации о кластере Clickhouse и его таблицах. Удобно использовать ``$$ insert into...$$`` вместо кавычек (поскольку это позволяет использовать перевод строки и кавычки внутри ``$$``); + ``http_port`` -- общий для всех нод, вовлеченных в данный запрос, HTTP-порт ClickHouse. В данный момент нет возможности получить это значение через SQL (стоит указывать, если отличен от *8123*); + ``debug`` -- включает логирование в процедуре *txn*. Выводит информацию о всех внутренних операциях: создание промежуточных таблиц, валидация данных, переключение и удаление кусков данных; + ``ending_pattern`` -- по умолчанию процедура *txn* создает промежуточный слой с очень близкой структурой к оригинальной топологии, но имена таблиц при этом имеют окончания, включающие номер транзакции. Этот параметр позволяет изменить шаблон для временных таблиц. Очистка промежуточных таблиц ----------------------------- В случае обрыва транзакции может понадобиться очистка промежуточных таблиц на всех нодах кластера. Для этого можно использовать вспомогательные процедуры и запросы: 1. Определение промежуточных таблиц: :: select database || '.' || name as tbl from sys_tables('sdch1', 8123) where database = 'default' and name like 'd_tct_tmp_%'; tbl ------------------------- default.d_tct_tmp_34395 (1 row) 2. Теперь, зная, что промежуточные таблицы созданы с номером транзакции *34395*, можно удалить их на всех нодах кластера: :: with ch(h, p) as (values ('sdch1', 8123), ('sdch2', null)) select drop_staging_tbl_based_on('default', 'd_tct', ch.h, ch.p, 'default', '', false, '_tmp_34395') from ch;