Примеры использования clickhouse-copier
При установке ADQM на все хосты с сервисом ADQMDB по умолчанию устанавливается пакет adqm-clickhouse-copier, который предоставляет функциональность clickhouse-copier, позволяющую копировать между кластерами разных топологий и перешардировать большие объемы данных. В этой статье описывается как создавать задания на копирование и как запускать clickhouse-copier для их выполнения, а также приводятся примеры использования этого инструмента.
ВНИМАНИЕ
Clickhouse-copier не работает с типами данных Variant, Dynamic и JSON.
|
Использование clickhouse-copier
Чтобы скопировать/перешардировать данные с помощью clickhouse-copier, необходимо выполнить следующие шаги.
Шаг 1. Настройка соединения с сервисом координации
Для выполнения одного и того же задания можно запустить несколько экземпляров clickhouse-copier на разных хостах. Чтобы синхронизировать процесс копирования между хостами, требуется сервис координации — ZooKeeper или ClickHouse Keeper. Создайте XML-файл с параметрами подключения к ZooKeeper/ClickHouse Keeper — скопируйте в него секцию zookeeper
из конфигурационного файла config.xml. Опционально в него также можно добавить секцию logger
с настройками логирования. Скопируйте этот файл на все хосты, где будет запускаться clickhouse-copier.
Файл keeper.xml, который используется в примерах данной статьи:
<clickhouse>
<zookeeper>
<node>
<host>dev-adqm1.ru-central1.internal</host>
<port>2181</port>
</node>
<node>
<host>dev-adqm2.ru-central1.internal</host>
<port>2181</port>
</node>
<node>
<host>dev-adqm3.ru-central1.internal</host>
<port>2181</port>
</node>
<root>/clickhouse</root>
</zookeeper>
</clickhouse>
Шаг 2. Конфигурация заданий на копирование
Создайте XML-файл с описанием заданий на копирование для clickhouse-copier (например, task.xml).
<clickhouse>
<remote_servers>
<source_cluster>
<shard>
<replica>
...
</replica>
...
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<!-- Задание на копирование одной таблицы -->
<table_1>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>source_database</database_pull>
<table_pull>test_table</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>destination_database</database_push>
<table_push>test_table_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/test_table_copied', '{replica}')
PARTITION BY partition_key
ORDER BY sorting_key
</engine>
<sharding_key>sharding_key_expr</sharding_key>
<where_condition>where_condition_expr</where_condition>
<enabled_partitions>
<partition>'2024-02-01'</partition>
<partition>'2024-03-02'</partition>
...
</enabled_partitions>
</table_1>
<!-- Задание на копирование следующей таблицы -->
<table_2>
...
</table_2>
...
</tables>
</clickhouse>
Параметр | Описание |
---|---|
remote_servers |
Описание исходного кластера и кластера назначения из секции Для объединения хостов ADQM в логический кластер можно использовать ADCM, при этом описание кластера автоматически добавится в файл config.xml — подробнее в статье Конфигурирование логических кластеров в интерфейсе ADCM |
max_workers |
Максимальное количество экземпляров clickhouse-copier (workers), одновременно выполняющих задание на копирование. Если запустить больше экземпляров, лишние будут переведены в спящий режим |
settings_pull, settings_push |
Настройки, используемые для выборки данных (pull) из таблиц исходного кластера и вставки данных (push) в таблицы кластера назначения соответственно |
settings |
Общие настройки, используемые для операций выборки и вставки данных (перекрываются настройками |
tables |
Описание заданий на копирование. Можно указать несколько заданий в одном узле ZooKeeper/ClickHouse Keeper — каждое задание для копирования одной таблицы (например, в приведенной выше конфигурации указано два задания — Для каждого задания нужно указать следующие параметры:
|
После завершения описания заданий на копирование загрузите его в ZooKeeper/ClickHouse Keeper — создайте znode (должен располагаться по определенному пути — <znode-path-to-task>/description) и сохраните в нем содержимое файла task.xml. Например, это можно сделать с помощью консоли ZooKeeper CLI, которая доступна на каждом хосте кластера ZooKeeper. Откройте консоль с помощью скрипта zkCli.sh и используйте команду create
:
$ create /clickhouse/clickhouse ""
$ create /clickhouse/clickhouse/copier ""
$ create /clickhouse/clickhouse/copier/task ""
$ create /clickhouse/clickhouse/copier/task/description "$(cat <path-to>/task.xml)"
где <path-to>
— путь к файлу task.xml.
Шаг 3. Запуск clickhouse-copier
Чтобы запустить экземпляр clickhouse-copier для выполнения задания на копирование, выполните следующую команду:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task
где:
-
config
— путь к файлу keeper.xml с параметрами подключения к ZooKeeper/ClickHouse Keeper; -
task-path
— путь к znode, который используется для синхронизации процессов clickhouse-copier и хранения заданий на копирование данных.
Для этой команды также доступны следующие необязательные параметры:
-
daemon
— запустить clickhouse-copier в режиме демона; -
task-file
— путь к файлу с конфигурацией задания для первоначальной загрузки в ZooKeeper/ClickHouse Keeper; -
task-upload-force
— принудительно загрузитьtask-file
, даже если znode уже существует (по умолчанию —false
); -
base-dir
— путь к логам и вспомогательным файлам на хосте, где запущен clickhouse-copier (если этот параметр не указан, подкаталоги для этих файлов создаются в каталоге, из которого был запущен clickhouse-copier).
Чтобы уменьшить сетевой трафик, рекомендуется запускать clickhouse-copier на том же хосте, где расположены исходные данные.
Пример 1. Шардирование таблицы
В этом примере показывается как шардировать таблицу — данные таблицы, реплицируемой на два хоста в рамках одного шарда, копируются в другой кластер и распределяются между двумя шардами, в каждом из которых две реплики.
Исходные данные
Создайте реплицируемую таблицу data_table1
(в базе данных default
) в кластере source_cluster1
, который включает один шард с двумя репликами (описание кластера приведено ниже):
CREATE TABLE data_table1 ON CLUSTER source_cluster1 (value UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1', '{replica}') ORDER BY value;
Сгенерируйте тестовые данные для таблицы (1 миллион строк):
INSERT INTO data_table1 SELECT number FROM numbers(1000000);
В примерах данной статьи кластер назначения называется default_cluster
— пример логического кластера, который автоматически формируется при установке ADQM на четыре хоста с фактором репликации равным 2
. Создайте в этом кластере новую базу данных test_database
, куда clickhouse-copier будет копировать данные:
CREATE DATABASE test_database ON CLUSTER default_cluster;
Копирование
Создайте конфигурационный файл task1.xml, описывающий для clickhouse-copier задание копирования таблицы data_table1
кластера source_cluster1
в таблицу data_table1_copied
кластера default_cluster
.
<clickhouse>
<remote_servers>
<source_cluster1>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</source_cluster1>
<default_cluster>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm4.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<table_local>
<cluster_pull>source_cluster1</cluster_pull>
<database_pull>default</database_pull>
<table_pull>data_table1</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>test_database</database_push>
<table_push>data_table1_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}')
ORDER BY (value)
</engine>
<sharding_key>rand()</sharding_key>
</table_local>
</tables>
</clickhouse>
Загрузите конфигурацию task1.xml в znode /clickhouse/clickhouse/copier/task1/description
кластера ZooKeeper, который указан в keeper.xml.
Из каталога, где находится файл keeper.xml, запустите clickhouse-copier:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task1
Проверка
В результате копирования в кластере default_cluster
на всех хостах в базе данных test_database
будет создана реплицированная таблица. Это можно проверить с помощью запроса:
SELECT create_table_query FROM system.tables where name = 'data_table1_copied' and database ='test_database';
┌─create_table_query────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ CREATE TABLE test_database.data_table1_copied (`value` UInt64) │ │ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}') ORDER BY value SETTINGS index_granularity = 8192 │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Данные распределены примерно поровну между двумя шардами кластера default_cluster
— в таблице ниже приведены запросы для проверки, которые можно выполнить на хостах каждого шарда.
Запрос | Шард 1 | Шард 2 |
---|---|---|
|
┌─count()─┐ │ 499045 │ └─────────┘ |
┌─count()─┐ │ 500955 │ └─────────┘ |
|
┌─value─┐ │ 2 │ │ 4 │ │ 5 │ │ 7 │ │ 9 │ │ 10 │ │ ... |
┌─value─┐ │ 0 │ │ 1 │ │ 3 │ │ 6 │ │ 8 │ │ 11 │ │ ... |
Чтобы отправлять запросы SELECT
на все шарды сразу, можно использовать распределенную таблицу:
CREATE TABLE test_database.distr_table1 ON CLUSTER default_cluster AS test_database.data_table1_copied
ENGINE = Distributed(default_cluster, test_database, data_table1_copied, rand());
SELECT * FROM test_database.distr_table1 WHERE value < 10;
┌─value─┐ │ 4 │ └───────┘ ┌─value─┐ │ 7 │ └───────┘ ┌─value─┐ │ 2 │ │ 5 │ │ 9 │ └───────┘ ┌─value─┐ │ 1 │ └───────┘ ┌─value─┐ │ 6 │ │ 8 │ └───────┘ ┌─value─┐ │ 0 │ │ 3 │ └───────┘
Пример 2. Перешардирование
В этом примере показывается как с помощью clickhouse-copier перешардировать данные — скопировать данные, распределенные по трем шардам кластера source_cluster2
(в каждом шарде одна реплика), в кластер default_cluster
с двумя шардами (в каждом шарде две реплики).
Исходные данные
Создайте таблицу data_table2
в кластере source_cluster2
(в базе данных default
):
CREATE TABLE data_table2 ON CLUSTER source_cluster2 (value UInt64) ENGINE = MergeTree() ORDER BY value;
Создайте распределенную таблицу, которая будет обращаться к таблице data_table2
и позволит выполнять запросы распределенно на нескольких серверах:
CREATE TABLE distr_table2 ON CLUSTER source_cluster2 AS default.data_table2
ENGINE = Distributed(source_cluster2, default, data_table2, rand());
Вставьте данные, направив запрос INSERT
в распределенную таблицу, которая распределит данные по шардам случайным образом:
INSERT INTO distr_table2 SELECT number FROM numbers(1000000);
Примерное распределение данных по шардам приведено в следующей таблице.
Запрос | Шард 1 | Шард 2 | Шард 3 |
---|---|---|---|
|
┌─count()─┐ │ 332973 │ └─────────┘ |
┌─count()─┐ │ 334408 │ └─────────┘ |
┌─count()─┐ │ 332619 │ └─────────┘ |
Копирование
Составьте XML-конфигурацию задания на копирование данных для clickhouse-copier (файл task2.xml).
<clickhouse>
<remote_servers>
<source_cluster2>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</source_cluster2>
<default_cluster>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm4.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<table_local>
<cluster_pull>source_cluster2</cluster_pull>
<database_pull>default</database_pull>
<table_pull>data_table2</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>test_database</database_push>
<table_push>data_table2_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table2_copied', '{replica}')
ORDER BY (value)
</engine>
<sharding_key>rand()</sharding_key>
</table_local>
</tables>
</clickhouse>
Загрузите конфигурацию task2.xml в znode /clickhouse/clickhouse/copier/task2/description
.
Запустите clickhouse-copier из каталога, где хранится keeper.xml:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task2
Проверка
Проверьте, что в базу данных test_database
добавлена таблица data_table2_copied
:
SHOW tables FROM test_database;
┌─name───────────────┐ │ data_table1_copied │ │ data_table2_copied │ └────────────────────┘
Посмотрите, как 1 миллион строк данных из исходной таблицы распределился по шардам кластера назначения.
Запрос | Шард 1 | Шард 2 |
---|---|---|
|
┌─count()─┐ │ 500171 │ └─────────┘ |
┌─count()─┐ │ 499829 │ └─────────┘ |
|
┌─value─┐ │ 3 │ │ 5 │ │ 7 │ │ 8 │ │ 11 │ │ 13 │ │ 14 │ │ 18 │ │ 20 │ │ 22 │ └───────┘ |
┌─value─┐ │ 0 │ │ 1 │ │ 2 │ │ 4 │ │ 6 │ │ 9 │ │ 10 │ │ 12 │ │ 15 │ │ 16 │ └───────┘ |