Usage of clickhouse-copier
ADQM comes with the adqm-clickhouse-copier package, which is installed by default on all hosts with the ADQMDB service during ADQM installation. This package provides the functionality of clickhouse-copier allowing you to copy large amounts of data between clusters of different topologies and reshard data. This article describes how to create copy tasks and how to run clickhouse-copier to execute them, as well as provides some examples of using this tool.
How to use clickhouse-copier
To copy/reshard data via clickhouse-copier, follow the steps below.
Step 1. Configure connection to a coordination service
You can run multiple clickhouse-copier instances on different hosts to perform the same task. To synchronize the copy process across hosts, a coordination service is required — ZooKeeper or ClickHouse Keeper. Create an XML file with parameters for the connection to ZooKeeper/ClickHouse Keeper — copy the zookeeper
section from the config.xml file. Optionally, you can also add the logger
section with logging settings to it. Copy this file to all hosts where you will run clickhouse-copier.
The keeper.xml file used in the examples within this article below:
<clickhouse>
<zookeeper>
<node>
<host>dev-adqm1.ru-central1.internal</host>
<port>2181</port>
</node>
<node>
<host>dev-adqm2.ru-central1.internal</host>
<port>2181</port>
</node>
<node>
<host>dev-adqm3.ru-central1.internal</host>
<port>2181</port>
</node>
<root>/clickhouse</root>
</zookeeper>
</clickhouse>
Step 2. Configure copy tasks
Create an XML file with a description of copy tasks for clickhouse-copier (for example, task.xml).
<clickhouse>
<remote_servers>
<source_cluster>
<shard>
<replica>
...
</replica>
...
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<!-- A task to copy one table -->
<table_1>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>source_database</database_pull>
<table_pull>test_table</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>destination_database</database_push>
<table_push>test_table_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/test_table_copied', '{replica}')
PARTITION BY partition_key
ORDER BY sorting_key
</engine>
<sharding_key>sharding_key_expr</sharding_key>
<where_condition>where_condition_expr</where_condition>
<enabled_partitions>
<partition>'2024-02-01'</partition>
<partition>'2024-03-02'</partition>
...
</enabled_partitions>
</table_1>
<!-- A task to copy the next table -->
<table_2>
...
</table_2>
...
</tables>
</clickhouse>
Parameter | Description |
---|---|
remote_servers |
Description of the source and destination clusters from the You can combine ADQM hosts into a logical cluster via ADCM — the cluster description will be automatically added to config.xml. For more details, refer to the article Configure logical clusters in the ADCM interface |
max_workers |
Maximum number of clickhouse-copier instances (workers) possible to perform a copy task simultaneously. If you start more instances, the extra ones will sleep |
settings_pull, settings_push |
Settings used to fetch (pull) data from tables of the source cluster and insert (push) data into tables of the destination cluster, respectively |
settings |
Common settings used for fetch and insert operations (overlaid by the |
tables |
Description of copy tasks. You can specify multiple tasks in the same ZooKeeper/ClickHouse Keeper node — one task to copy one table. For example, there are two tasks specified in the above configuration — For each task, specify the following parameters:
|
Once you have completed the task description, upload it to ZooKeeper/ClickHouse Keeper — create a znode (a path to it should be specific — <znode-path-to-task>/description) and save the contents of the task.xml file in it. For example, you can do this in the ZooKeeper command line interface (CLI) which is available on each host of a ZooKeeper cluster. Run the zkCli.sh script to open ZooKeeper CLI and use the create
command:
$ create /clickhouse/clickhouse ""
$ create /clickhouse/clickhouse/copier ""
$ create /clickhouse/clickhouse/copier/task ""
$ create /clickhouse/clickhouse/copier/task/description "$(cat <path-to>/task.xml)"
where <path-to>
is the path to the task.xml file.
Step 3. Run clickhouse-copier
To start a clickhouse-copier instance that will perform a copy task, run the following command:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task
where:
-
config
— path to the keeper.xml file with parameters for the connection to ZooKeeper/ClickHouse Keeper; -
task-path
— path to a znode used to synchronize clickhouse-copier processes and store copy tasks.
The following optional parameters are also available for this command:
-
daemon
— start clickhouse-copier in daemon mode; -
task-file
— path to a file with the task configuration for initial upload to ZooKeeper/ClickHouse Keeper; -
task-upload-force
— forcetask-file
to be uploaded even if a znode already exists (default isfalse
); -
base-dir
— path to logs and auxiliary files on a host where clickhouse-copier runs (if this parameter is omitted, subdirectories for these files are created in the directory from which clickhouse-copier was launched).
To reduce network traffic, it is recommended to run clickhouse-copier on the same host where the source data is located.
Example 1. Sharding a table
This example shows how to shard a table — copy data from a table replicated on two hosts within one shard to another cluster and distribute it between two shards with two replicas in each one.
Prepare source data
Create the data_table1
replicated table (within the default
database) in the source_cluster1
cluster, which includes one shard with two replicas (this cluster is described in the configuration below):
CREATE TABLE data_table1 ON CLUSTER source_cluster1 (value UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1', '{replica}') ORDER BY value;
Generate test data for the table (1 million rows):
INSERT INTO data_table1 SELECT number FROM numbers(1000000);
In the examples of this article, a destination cluster is default_cluster
— an example of a logical cluster that is automatically generated when you install ADQM on four hosts with a replication factor set to 2
. In this cluster, create a new database (test_database
) to which clickhouse-copier will copy data:
CREATE DATABASE test_database ON CLUSTER default_cluster;
Copy data
Create the task1.xml configuration file that describes a task for clickhouse-copier to copy the data_table1
table of the source_cluster1
cluster to the data_table1_copied
table of the default_cluster
cluster.
<clickhouse>
<remote_servers>
<source_cluster1>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</source_cluster1>
<default_cluster>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm4.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<table_local>
<cluster_pull>source_cluster1</cluster_pull>
<database_pull>default</database_pull>
<table_pull>data_table1</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>test_database</database_push>
<table_push>data_table1_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}')
ORDER BY (value)
</engine>
<sharding_key>rand()</sharding_key>
</table_local>
</tables>
</clickhouse>
Upload the task1.xml configuration to the /clickhouse/clickhouse/copier/task1/description
znode of the ZooKeeper cluster specified in keeper.xml.
Run clickhouse-copier from the directory where the keeper.xml file is located:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task1
Verify
As a result, a replicated table is created in the test_database
database on all hosts of the default_cluster
cluster. You can check this as follows:
SELECT create_table_query FROM system.tables where name = 'data_table1_copied' and database ='test_database';
┌─create_table_query────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ CREATE TABLE test_database.data_table1_copied (`value` UInt64) │ │ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}') ORDER BY value SETTINGS index_granularity = 8192 │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Data is distributed about equally between two shards of the default_cluster
cluster — the table below shows queries for verification that you can execute on hosts of each shard.
Query | Shard 1 | Shard 2 |
---|---|---|
|
┌─count()─┐ │ 499045 │ └─────────┘ |
┌─count()─┐ │ 500955 │ └─────────┘ |
|
┌─value─┐ │ 2 │ │ 4 │ │ 5 │ │ 7 │ │ 9 │ │ 10 │ │ ... |
┌─value─┐ │ 0 │ │ 1 │ │ 3 │ │ 6 │ │ 8 │ │ 11 │ │ ... |
To send SELECT
queries to all shards at once, you can use a distributed table:
CREATE TABLE test_database.distr_table1 ON CLUSTER default_cluster AS test_database.data_table1_copied
ENGINE = Distributed(default_cluster, test_database, data_table1_copied, rand());
SELECT * FROM test_database.distr_table1 WHERE value < 10;
┌─value─┐ │ 4 │ └───────┘ ┌─value─┐ │ 7 │ └───────┘ ┌─value─┐ │ 2 │ │ 5 │ │ 9 │ └───────┘ ┌─value─┐ │ 1 │ └───────┘ ┌─value─┐ │ 6 │ │ 8 │ └───────┘ ┌─value─┐ │ 0 │ │ 3 │ └───────┘
Example 2. Resharding
This example shows how to reshard data with clickhouse-copier — copy data distributed across three shards of the source_cluster2
cluster (one replica in each shard) to the default_cluster
cluster with two shards and two replicas in each shard.
Prepare source data
Create the data_table2
table in the source_cluster2
cluster (in the default
database):
CREATE TABLE data_table2 ON CLUSTER source_cluster2 (value UInt64) ENGINE = MergeTree() ORDER BY value;
Create a distributed table that will access the data_table2
tables and allow execution of distributed queries on multiple servers:
CREATE TABLE distr_table2 ON CLUSTER source_cluster2 AS default.data_table2
ENGINE = Distributed(source_cluster2, default, data_table2, rand());
Insert data to the data_table2
tables. To do this, send the INSERT
query to a distributed table — it will distribute data randomly across shards:
INSERT INTO distr_table2 SELECT number FROM numbers(1000000);
The following table shows the data distribution between three shards.
Query | Shard 1 | Shard 2 | Shard 3 |
---|---|---|---|
|
┌─count()─┐ │ 332973 │ └─────────┘ |
┌─count()─┐ │ 334408 │ └─────────┘ |
┌─count()─┐ │ 332619 │ └─────────┘ |
Copy data
Create an XML configuration of the copy task for clickhouse-copier (the task2.xml file).
<clickhouse>
<remote_servers>
<source_cluster2>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</source_cluster2>
<default_cluster>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm1.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm2.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<weight>1</weight>
<replica>
<host>dev-adqm3.ru-central1.internal</host>
<port>9000</port>
</replica>
<replica>
<host>dev-adqm4.ru-central1.internal</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>2</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<distributed_foreground_insert>1</distributed_foreground_insert>
</settings>
<tables>
<table_local>
<cluster_pull>source_cluster2</cluster_pull>
<database_pull>default</database_pull>
<table_pull>data_table2</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>test_database</database_push>
<table_push>data_table2_copied</table_push>
<engine>
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table2_copied', '{replica}')
ORDER BY (value)
</engine>
<sharding_key>rand()</sharding_key>
</table_local>
</tables>
</clickhouse>
Upload the task2.xml configuration to the /clickhouse/clickhouse/copier/task2/description
znode.
Run clickhouse-copier from the directory where keeper.xml is stored:
$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task2
Verify
Check that the data_table2_copied
table has been added to the test_database
database:
SHOW tables FROM test_database;
┌─name───────────────┐ │ data_table1_copied │ │ data_table2_copied │ └────────────────────┘
See how 1 million data rows from the source table were distributed between two shards of the destination cluster.
Query | Shard 1 | Shard 2 |
---|---|---|
|
┌─count()─┐ │ 500171 │ └─────────┘ |
┌─count()─┐ │ 499829 │ └─────────┘ |
|
┌─value─┐ │ 3 │ │ 5 │ │ 7 │ │ 8 │ │ 11 │ │ 13 │ │ 14 │ │ 18 │ │ 20 │ │ 22 │ └───────┘ |
┌─value─┐ │ 0 │ │ 1 │ │ 2 │ │ 4 │ │ 6 │ │ 9 │ │ 10 │ │ 12 │ │ 15 │ │ 16 │ └───────┘ |