Usage of clickhouse-copier

ADQM comes with the adqm-clickhouse-copier package, which is installed by default on all hosts with the ADQMDB service during ADQM installation. This package provides the functionality of clickhouse-copier allowing you to copy large amounts of data between clusters of different topologies and reshard data. This article describes how to create copy tasks and how to run clickhouse-copier to execute them, as well as provides some examples of using this tool.

How to use clickhouse-copier

To copy/reshard data via clickhouse-copier, follow the steps below.

Step 1. Configure connection to a coordination service

You can run multiple clickhouse-copier instances on different hosts to perform the same task. To synchronize the copy process across hosts, a coordination service is required — ZooKeeper or ClickHouse Keeper. Create an XML file with parameters for the connection to ZooKeeper/ClickHouse Keeper — copy the zookeeper section from the config.xml file. Optionally, you can also add the logger section with logging settings to it. Copy this file to all hosts where you will run clickhouse-copier.

Example of parameters configuring interaction with a ZooKeeper cluster

 

The keeper.xml file used in the examples within this article below:

<clickhouse>
    <zookeeper>
        <node>
            <host>dev-adqm1.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <node>
            <host>dev-adqm2.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <node>
            <host>dev-adqm3.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <root>/clickhouse</root>
    </zookeeper>
</clickhouse>

Step 2. Configure copy tasks

Create an XML file with a description of copy tasks for clickhouse-copier (for example, task.xml).

Сonfiguration template
<clickhouse>
    <remote_servers>
        <source_cluster>
            <shard>
                    <replica>
                        ...
                    </replica>
                    ...
            </shard>
            ...
        </source_cluster>

        <destination_cluster>
            ...
        </destination_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <!-- A task to copy one table -->
        <table_1>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>source_database</database_pull>
            <table_pull>test_table</table_pull>

            <cluster_push>destination_cluster</cluster_push>
            <database_push>destination_database</database_push>
            <table_push>test_table_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/test_table_copied', '{replica}')
            PARTITION BY partition_key
            ORDER BY sorting_key
            </engine>

            <sharding_key>sharding_key_expr</sharding_key>

            <where_condition>where_condition_expr</where_condition>

            <enabled_partitions>
                <partition>'2024-02-01'</partition>
                <partition>'2024-03-02'</partition>
                ...
            </enabled_partitions>
        </table_1>

        <!-- A task to copy the next table -->
        <table_2>
            ...
        </table_2>
        ...
    </tables>
</clickhouse>
Configuration parameters
Parameter Description

remote_servers

Description of the source and destination clusters from the remote_servers section of the config.xml configuration file (source_cluster and destination_cluster in the above configuration are the names of these clusters).

You can combine ADQM hosts into a logical cluster via ADCM — the cluster description will be automatically added to config.xml. For more details, refer to the article Configure logical clusters in the ADCM interface

max_workers

Maximum number of clickhouse-copier instances (workers) possible to perform a copy task simultaneously. If you start more instances, the extra ones will sleep

settings_pull, settings_push

Settings used to fetch (pull) data from tables of the source cluster and insert (push) data into tables of the destination cluster, respectively

settings

Common settings used for fetch and insert operations (overlaid by the settings_pull and settings_push settings, respectively)

tables

Description of copy tasks. You can specify multiple tasks in the same ZooKeeper/ClickHouse Keeper node — one task to copy one table. For example, there are two tasks specified in the above configuration — table_1 and table_2 (task names in the configuration can be arbitrary). The tasks will be executed sequentially, that is, copying the next table will not start until the previous one is copied.

For each task, specify the following parameters:

  • cluster_pull, database_pull, table_pull — source cluster (from the remote_servers section), its database, and tables that should be copied;

  • cluster_push, database_push, table_push — destination cluster (from the remote_servers section), its database, and tables into which data should be inserted;

  • engine — engine of destination tables (if a table is not found in the destination cluster, it will be created based on columns of the source table and the table engine specified here);

  • sharding_key — sharding key used to insert data into the destination cluster;

  • where_condition — optional expression to filter data when pulling it from hosts of the source cluster;

  • enabled_partitions — partitions that should be copied (other partitions will be ignored). Partition names should have the same format as the partition column of the system.parts table (quoted text). As partition keys in the source and destination clusters can be different, these partition names specify destination partitions.

    This setting is optional (if it is not specified, all partitions will be copied), but it is strictly recommended to specify partitions explicitly. If there are already some ready partitions in the destination cluster, they will be removed at the start of the copying since they will be interpreted as incomplete data from the previous copying.

Once you have completed the task description, upload it to ZooKeeper/ClickHouse Keeper — create a znode (a path to it should be specific — <znode-path-to-task>/description) and save the contents of the task.xml file in it. For example, you can do this in the ZooKeeper command line interface (CLI) which is available on each host of a ZooKeeper cluster. Run the zkCli.sh script to open ZooKeeper CLI and use the create command:

$ create /clickhouse/clickhouse ""
$ create /clickhouse/clickhouse/copier ""
$ create /clickhouse/clickhouse/copier/task ""
$ create /clickhouse/clickhouse/copier/task/description "$(cat <path-to>/task.xml)"

where <path-to> is the path to the task.xml file.

Step 3. Run clickhouse-copier

To start a clickhouse-copier instance that will perform a copy task, run the following command:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task

where:

  • config — path to the keeper.xml file with parameters for the connection to ZooKeeper/ClickHouse Keeper;

  • task-path — path to a znode used to synchronize clickhouse-copier processes and store copy tasks.

The following optional parameters are also available for this command:

  • daemon — start clickhouse-copier in daemon mode;

  • task-file — path to a file with the task configuration for initial upload to ZooKeeper/ClickHouse Keeper;

  • task-upload-force — force task-file to be uploaded even if a znode already exists (default is false);

  • base-dir — path to logs and auxiliary files on a host where clickhouse-copier runs (if this parameter is omitted, subdirectories for these files are created in the directory from which clickhouse-copier was launched).

To reduce network traffic, it is recommended to run clickhouse-copier on the same host where the source data is located.

Example 1. Sharding a table

This example shows how to shard a table — copy data from a table replicated on two hosts within one shard to another cluster and distribute it between two shards with two replicas in each one.

Prepare source data

Create the data_table1 replicated table (within the default database) in the source_cluster1 cluster, which includes one shard with two replicas (this cluster is described in the configuration below):

CREATE TABLE data_table1 ON CLUSTER source_cluster1 (value UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1', '{replica}') ORDER BY value;

Generate test data for the table (1 million rows):

INSERT INTO data_table1 SELECT number FROM numbers(1000000);

In the examples of this article, a destination cluster is default_cluster — an example of a logical cluster that is automatically generated when you install ADQM on four hosts with a replication factor set to 2. In this cluster, create a new database (test_database) to which clickhouse-copier will copy data:

CREATE DATABASE test_database ON CLUSTER default_cluster;

Copy data

Create the task1.xml configuration file that describes a task for clickhouse-copier to copy the data_table1 table of the source_cluster1 cluster to the data_table1_copied table of the default_cluster cluster.

task1.xml
<clickhouse>
    <remote_servers>
        <source_cluster1>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </source_cluster1>
        <default_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm4.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <table_local>
            <cluster_pull>source_cluster1</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>data_table1</table_pull>

            <cluster_push>default_cluster</cluster_push>
            <database_push>test_database</database_push>
            <table_push>data_table1_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}')
            ORDER BY (value)
            </engine>

            <sharding_key>rand()</sharding_key>
        </table_local>
    </tables>
</clickhouse>

Upload the task1.xml configuration to the /clickhouse/clickhouse/copier/task1/description znode of the ZooKeeper cluster specified in keeper.xml.

Run clickhouse-copier from the directory where the keeper.xml file is located:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task1

Verify

As a result, a replicated table is created in the test_database database on all hosts of the default_cluster cluster. You can check this as follows:

SELECT create_table_query FROM system.tables where name = 'data_table1_copied' and database ='test_database';
┌─create_table_query────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE test_database.data_table1_copied (`value` UInt64)                                                                                │
│ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}') ORDER BY value SETTINGS index_granularity = 8192   │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Data is distributed about equally between two shards of the default_cluster cluster — the table below shows queries for verification that you can execute on hosts of each shard.

Query Shard 1 Shard 2
SELECT count() FROM test_database.data_table1_copied;
┌─count()─┐
│  499045 │
└─────────┘
┌─count()─┐
│  500955 │
└─────────┘
SELECT * FROM test_database.data_table1_copied ORDER BY value ASC;
┌─value─┐
│     2 │
│     4 │
│     5 │
│     7 │
│     9 │
│    10 │
│    ...
┌─value─┐
│     0 │
│     1 │
│     3 │
│     6 │
│     8 │
│    11 │
│    ...

To send SELECT queries to all shards at once, you can use a distributed table:

CREATE TABLE test_database.distr_table1 ON CLUSTER default_cluster AS test_database.data_table1_copied
ENGINE = Distributed(default_cluster, test_database, data_table1_copied, rand());
SELECT * FROM test_database.distr_table1 WHERE value < 10;
┌─value─┐
│     4 │
└───────┘
┌─value─┐
│     7 │
└───────┘
┌─value─┐
│     2 │
│     5 │
│     9 │
└───────┘
┌─value─┐
│     1 │
└───────┘
┌─value─┐
│     6 │
│     8 │
└───────┘
┌─value─┐
│     0 │
│     3 │
└───────┘

Example 2. Resharding

This example shows how to reshard data with clickhouse-copier — copy data distributed across three shards of the source_cluster2 cluster (one replica in each shard) to the default_cluster cluster with two shards and two replicas in each shard.

Prepare source data

Create the data_table2 table in the source_cluster2 cluster (in the default database):

CREATE TABLE data_table2 ON CLUSTER source_cluster2 (value UInt64) ENGINE = MergeTree() ORDER BY value;

Create a distributed table that will access the data_table2 tables and allow execution of distributed queries on multiple servers:

CREATE TABLE distr_table2 ON CLUSTER source_cluster2 AS default.data_table2
ENGINE = Distributed(source_cluster2, default, data_table2, rand());

Insert data to the data_table2 tables. To do this, send the INSERT query to a distributed table — it will distribute data randomly across shards:

INSERT INTO distr_table2 SELECT number FROM numbers(1000000);

The following table shows the data distribution between three shards.

Query Shard 1 Shard 2 Shard 3
SELECT count() FROM data_table2;
┌─count()─┐
│  332973 │
└─────────┘
┌─count()─┐
│  334408 │
└─────────┘
┌─count()─┐
│  332619 │
└─────────┘

Copy data

Create an XML configuration of the copy task for clickhouse-copier (the task2.xml file).

task2.xml
<clickhouse>
    <remote_servers>
        <source_cluster2>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </source_cluster2>
        <default_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm4.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <table_local>
            <cluster_pull>source_cluster2</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>data_table2</table_pull>

            <cluster_push>default_cluster</cluster_push>
            <database_push>test_database</database_push>
            <table_push>data_table2_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table2_copied', '{replica}')
            ORDER BY (value)
            </engine>

            <sharding_key>rand()</sharding_key>
        </table_local>
    </tables>
</clickhouse>

Upload the task2.xml configuration to the /clickhouse/clickhouse/copier/task2/description znode.

Run clickhouse-copier from the directory where keeper.xml is stored:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task2

Verify

Check that the data_table2_copied table has been added to the test_database database:

SHOW tables FROM test_database;
┌─name───────────────┐
│ data_table1_copied │
│ data_table2_copied │
└────────────────────┘

See how 1 million data rows from the source table were distributed between two shards of the destination cluster.

Query Shard 1 Shard 2
SELECT count() FROM test_database.data_table2_copied;
┌─count()─┐
│  500171 │
└─────────┘
┌─count()─┐
│  499829 │
└─────────┘
SELECT * FROM test_database.data_table2_copied limit 10;
┌─value─┐
│     3 │
│     5 │
│     7 │
│     8 │
│    11 │
│    13 │
│    14 │
│    18 │
│    20 │
│    22 │
└───────┘
┌─value─┐
│     0 │
│     1 │
│     2 │
│     4 │
│     6 │
│     9 │
│    10 │
│    12 │
│    15 │
│    16 │
└───────┘
Found a mistake? Seleсt text and press Ctrl+Enter to report it