Конфигурация коннектора и эксплуатация¶
Системные требования¶
Для работы коннектора необходимо, чтобы все ноды кластера ADB имели доступ к TCP-порту 8123 на всех нодах кластера Clickhouse, через которые планируется производиться загрузка данных.
Модуль PXF¶
Для загрузки данных в Clickhouse необходимо создать внешнюю таблицу, указав в ее директиве LOCATION протокол PXF с профилем модуля TKH и его параметры.
Синтаксис¶
CREATE WRITABLE EXTERNAL TABLE <table_name> (
{ <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
'pxf://<clickhouse_table_name>?<pxf_parameters><settings>'
)
FORMAT 'TEXT'
ENCODING 'UTF8';
Где <pxf_parameters>
:
{ PROFILE=TKH | ACCESSOR=io.arenadata.tkh.TkhAccessor&RESOLVER=io.arenadata.tkh.TkhResolver }
[ &SERVER=<server_name> ]
Значение <server_name>
и <settings>
описаны далее.
Параметры модуля¶
- URL – используется для указания нод, на которые равномерно производится загрузка данных.
- URL ноды кластера Clickhouse или список нод, разделеных запятой
,
- Clickhouse URL имеет синтаксис:
<user>:<password>@<host>:<port>
- Option:
URL
- Параметр конфигурации:
clickhouse.dist.simple.url
- Значение: String
- URL ноды кластера Clickhouse или список нод, разделеных запятой
- Максимальная длина запроса (в строках) – когда длина достигнута, пакет формируется и кладется в очередь на отправку.
- Option:
req_length_max
- Параметр конфигурации:
clickhouse.request.length.max
- Значение: Integer >= 0
- Значение по умолчанию: 76800
- Option:
- Максимальный размер запроса (в байтах) – когда размер достигнут, пакет формируется и кладется в очередь на отправку.
- Option:
req_volume_max
- Параметр конфигурации:
clickhouse.request.volume.max
- Значение: Integer >= 0
- Значение по умолчанию: 52428800 (50Mib)
- Option:
- Число потоков для отправки – число потоков создается каждым экземпляром PXF (число экземпляров PXF в данный момент равно числу нод в кластере ADB).
- Option:
send_threads
- Параметр конфигурации:
clickhouse.send.threads
- Значение: Integer >= 0
- Значение по умолчанию: 2
- Option:
- Задержка между отправками в потоках – базовая задержка между двумя последовательными запросами к Clickhouse.
- Option:
send_delay
- Параметр конфигурации:
clickhouse.send.delay
- Значение: Integer >= 0
- Значение по умолчанию: 300
- Option:
- Множитель размера очереди для потоков – модель использует очередь для отправки запросов в отправляющие потоки. Максимальная длина очереди определяется как произведение двух переменных: число потоков для отправки и множитель размера очереди. Важно обратить внимание, что реальный размер очереди может превышать значение полученного произведения, однако, он никогда не превышает значения: число потоков для отправки умноженное на множитель размера очереди + 1.
- Option:
send_queue_sizeMultiplier
- Параметр конфигурации:
clickhouse.send.queue.sizeMultiplier
- Значение: Double >= 0.0
- Значение по умолчанию: 2.0
- Option:
- Время ожидание сети (мс) – время ожидания HTTP-запроса.
- Option:
net_timeout
- Параметр конфигурации:
clickhouse.network.timeout
- Значение: Integer >= 0
- Значение по умолчанию: 60000
- Option:
Дополнительно¶
Для оптимальной производительности стоит указывать распределение внешней таблицы аналогично таблице-источнику, что позволит не перераспределять данные перед отправкой, а слать напрямую с сегментов. Этого можно достигнуть, создавая таблицу через LIKE source_table
или явно указывая ту же самую колонку в директиве DISTRIBUTED BY.
Расширение¶
Расширение используется в случаях, когда необходимы дополнительные гарантии консистентности вставки. При этом необходимые настройки кластера Clickhouse автоматически проверяются при попытке вставки, используя процедуру расширения, но в частности требуется включенная опция insert_distributed_sync
для распределенных таблиц.
Создание внешней таблицы¶
При создании внешней таблицы следует указывать имя промежуточной таблицы, которое зависит от id транзакции. Например, если в Clickhouse имя таблицы default.t
, то в директиве LOCATION надо указать default.t_tmp_$
. Также следует указывать полное имя таблицы вместе с именем базы данных.
Important
В параметрах внешней таблицы нельзя использовать директиву SERVER, хранящую название конфигурации PXF в локальных файлах (при этом не указывая параметр URL). Расширению требуются адреса хостов кластера Clickhouse, которые оно берет из параметра URL
Полное выражение для создания внешней таблицы выглядит следующим образом:
create writable external table ext_tct(a int)
location ('pxf://default.d_tct_tmp_$?profile=tkh&url=default@sdch1:8123')
format 'text' encoding 'utf8';
Где sdch1
– имя ноды кластера Clickhouse, на которой создана распределенная таблица default.d_tct
.
Процедура и ее параметры¶
После создания внешней таблицы в нее можно выполнить вставку с помощью процедуры расширения:
select txn('insert into ext_tct select generate_series(1, 10000);');
Процедура определена следующим образом:
function txn(query text, http_port int default 8123, debug boolean default false, ending_pattern text default '_tmp_$')
returns void
Параметры процедуры:
query
– запрос ADB, который вставляет данные во внешнюю таблицу. Запрос валидируется и разбирается для получения информации о кластере Clickhouse и его таблицах. Удобно использовать$$ insert into...$$
вместо кавычек (поскольку это позволяет использовать перевод строки и кавычки внутри$$
);http_port
– общий для всех нод, вовлеченных в данный запрос, HTTP-порт ClickHouse. В данный момент нет возможности получить это значение через SQL (стоит указывать, если отличен от 8123);debug
– включает логирование в процедуре txn. Выводит информацию о всех внутренних операциях: создание промежуточных таблиц, валидация данных, переключение и удаление кусков данных;ending_pattern
– по умолчанию процедура txn создает промежуточный слой с очень близкой структурой к оригинальной топологии, но имена таблиц при этом имеют окончания, включающие номер транзакции. Этот параметр позволяет изменить шаблон для временных таблиц.
Очистка промежуточных таблиц¶
В случае обрыва транзакции может понадобиться очистка промежуточных таблиц на всех нодах кластера. Для этого можно использовать вспомогательные процедуры и запросы:
- Определение промежуточных таблиц:
select database || '.' || name as tbl from sys_tables('sdch1', 8123)
where database = 'default' and name like 'd_tct_tmp_%';
tbl
-------------------------
default.d_tct_tmp_34395
(1 row)
- Теперь, зная, что промежуточные таблицы созданы с номером транзакции 34395, можно удалить их на всех нодах кластера:
with ch(h, p) as (values ('sdch1', 8123), ('sdch2', null))
select drop_staging_tbl_based_on('default', 'd_tct', ch.h, ch.p, 'default', '', false, '_tmp_34395') from ch;