Примеры использования clickhouse-copier

При установке ADQM на все хосты с сервисом ADQMDB по умолчанию устанавливается пакет adqm-clickhouse-copier, который предоставляет функциональность clickhouse-copier, позволяющую копировать между кластерами разных топологий и перешардировать большие объемы данных. В этой статье описывается как создавать задания на копирование и как запускать clickhouse-copier для их выполнения, а также приводятся примеры использования этого инструмента.

ВНИМАНИЕ
Clickhouse-copier не работает с типами данных Variant, Dynamic и JSON.

Использование clickhouse-copier

Чтобы скопировать/перешардировать данные с помощью clickhouse-copier, необходимо выполнить следующие шаги.

Шаг 1. Настройка соединения с сервисом координации

Для выполнения одного и того же задания можно запустить несколько экземпляров clickhouse-copier на разных хостах. Чтобы синхронизировать процесс копирования между хостами, требуется сервис координации — ZooKeeper или ClickHouse Keeper. Создайте XML-файл с параметрами подключения к ZooKeeper/ClickHouse Keeper — скопируйте в него секцию zookeeper из конфигурационного файла config.xml. Опционально в него также можно добавить секцию logger с настройками логирования. Скопируйте этот файл на все хосты, где будет запускаться clickhouse-copier.

Пример конфигурации параметров соединения с кластером ZooKeeper

 

Файл keeper.xml, который используется в примерах данной статьи:

<clickhouse>
    <zookeeper>
        <node>
            <host>dev-adqm1.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <node>
            <host>dev-adqm2.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <node>
            <host>dev-adqm3.ru-central1.internal</host>
            <port>2181</port>
        </node>
        <root>/clickhouse</root>
    </zookeeper>
</clickhouse>

Шаг 2. Конфигурация заданий на копирование

Создайте XML-файл с описанием заданий на копирование для clickhouse-copier (например, task.xml).

Структура конфигурации в общем виде
<clickhouse>
    <remote_servers>
        <source_cluster>
            <shard>
                    <replica>
                        ...
                    </replica>
                    ...
            </shard>
            ...
        </source_cluster>

        <destination_cluster>
            ...
        </destination_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <!-- Задание на копирование одной таблицы -->
        <table_1>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>source_database</database_pull>
            <table_pull>test_table</table_pull>

            <cluster_push>destination_cluster</cluster_push>
            <database_push>destination_database</database_push>
            <table_push>test_table_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/test_table_copied', '{replica}')
            PARTITION BY partition_key
            ORDER BY sorting_key
            </engine>

            <sharding_key>sharding_key_expr</sharding_key>

            <where_condition>where_condition_expr</where_condition>

            <enabled_partitions>
                <partition>'2024-02-01'</partition>
                <partition>'2024-03-02'</partition>
                ...
            </enabled_partitions>
        </table_1>

        <!-- Задание на копирование следующей таблицы -->
        <table_2>
            ...
        </table_2>
        ...
    </tables>
</clickhouse>
Параметры конфигурации
Параметр Описание

remote_servers

Описание исходного кластера и кластера назначения из секции remote_servers конфигурационного файла config.xml (source_cluster и destination_cluster в приведенной выше конфигурации — названия этих кластеров).

Для объединения хостов ADQM в логический кластер можно использовать ADCM, при этом описание кластера автоматически добавится в файл config.xml — подробнее в статье Конфигурирование логических кластеров в интерфейсе ADCM

max_workers

Максимальное количество экземпляров clickhouse-copier (workers), одновременно выполняющих задание на копирование. Если запустить больше экземпляров, лишние будут переведены в спящий режим

settings_pull, settings_push

Настройки, используемые для выборки данных (pull) из таблиц исходного кластера и вставки данных (push) в таблицы кластера назначения соответственно

settings

Общие настройки, используемые для операций выборки и вставки данных (перекрываются настройками settings_pull и settings_push соответственно)

tables

Описание заданий на копирование. Можно указать несколько заданий в одном узле ZooKeeper/ClickHouse Keeper — каждое задание для копирования одной таблицы (например, в приведенной выше конфигурации указано два задания — table_1 и table_2). Задания будут выполняться последовательно, то есть копирование следующей таблицы не начнется, пока не будет скопирована предыдущая. Названия заданий в конфигурации могут быть произвольными.

Для каждого задания нужно указать следующие параметры:

  • cluster_pull, database_pull, table_pull — исходный кластер (из секции remote_servers), база данных в нем и таблицы, которые необходимо скопировать;

  • cluster_push, database_push, table_push — кластер назначения (из секции remote_servers), база данных в нем и таблицы, в которые нужно вставить данные;

  • engine — движок таблицы назначения (если таблица в кластере назначения не найдена, она будет создана на основе описаний столбцов исходной таблицы и табличного движка, указанного в этом параметре);

  • sharding_key — ключ шардирования, который будет применяться при вставке данных в кластер назначения;

  • where_condition — необязательное выражение, по которому будут фильтроваться данные при их выборке с хостов исходного кластера;

  • enabled_partitions — партиции, которые должны быть скопированы (остальные партиции будут игнорироваться). Имена партиций должны иметь то же формат, что и столбец partition таблицы system.parts (текст в кавычках). Поскольку ключи партиционирования таблиц исходного кластера и кластера назначения могут быть разными, эти имена партиций определяют партиции в кластере назначения.

    Эта настройка является необязательной (если не указана, будут скопированы все партиции), но настоятельно рекомендуется указывать партиции явно. Если в кластере назначения уже есть некоторые готовые партиции, они будут удалены в начале копирования, так как будут считаться неполными данными от предыдущего копирования.

После завершения описания заданий на копирование загрузите его в ZooKeeper/ClickHouse Keeper — создайте znode (должен располагаться по определенному пути — <znode-path-to-task>/description) и сохраните в нем содержимое файла task.xml. Например, это можно сделать с помощью консоли ZooKeeper CLI, которая доступна на каждом хосте кластера ZooKeeper. Откройте консоль с помощью скрипта zkCli.sh и используйте команду create:

$ create /clickhouse/clickhouse ""
$ create /clickhouse/clickhouse/copier ""
$ create /clickhouse/clickhouse/copier/task ""
$ create /clickhouse/clickhouse/copier/task/description "$(cat <path-to>/task.xml)"

где <path-to> — путь к файлу task.xml.

Шаг 3. Запуск clickhouse-copier

Чтобы запустить экземпляр clickhouse-copier для выполнения задания на копирование, выполните следующую команду:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task

где:

  • config — путь к файлу keeper.xml с параметрами подключения к ZooKeeper/ClickHouse Keeper;

  • task-path — путь к znode, который используется для синхронизации процессов clickhouse-copier и хранения заданий на копирование данных.

Для этой команды также доступны следующие необязательные параметры:

  • daemon — запустить clickhouse-copier в режиме демона;

  • task-file — путь к файлу с конфигурацией задания для первоначальной загрузки в ZooKeeper/ClickHouse Keeper;

  • task-upload-force — принудительно загрузить task-file, даже если znode уже существует (по умолчанию — false);

  • base-dir — путь к логам и вспомогательным файлам на хосте, где запущен clickhouse-copier (если этот параметр не указан, подкаталоги для этих файлов создаются в каталоге, из которого был запущен clickhouse-copier).

Чтобы уменьшить сетевой трафик, рекомендуется запускать clickhouse-copier на том же хосте, где расположены исходные данные.

Пример 1. Шардирование таблицы

В этом примере показывается как шардировать таблицу — данные таблицы, реплицируемой на два хоста в рамках одного шарда, копируются в другой кластер и распределяются между двумя шардами, в каждом из которых две реплики.

Исходные данные

Создайте реплицируемую таблицу data_table1 (в базе данных default) в кластере source_cluster1, который включает один шард с двумя репликами (описание кластера приведено ниже):

CREATE TABLE data_table1 ON CLUSTER source_cluster1 (value UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1', '{replica}') ORDER BY value;

Сгенерируйте тестовые данные для таблицы (1 миллион строк):

INSERT INTO data_table1 SELECT number FROM numbers(1000000);

В примерах данной статьи кластер назначения называется default_cluster — пример логического кластера, который автоматически формируется при установке ADQM на четыре хоста с фактором репликации равным 2. Создайте в этом кластере новую базу данных test_database, куда clickhouse-copier будет копировать данные:

CREATE DATABASE test_database ON CLUSTER default_cluster;

Копирование

Создайте конфигурационный файл task1.xml, описывающий для clickhouse-copier задание копирования таблицы data_table1 кластера source_cluster1 в таблицу data_table1_copied кластера default_cluster.

task1.xml
<clickhouse>
    <remote_servers>
        <source_cluster1>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </source_cluster1>
        <default_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm4.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <table_local>
            <cluster_pull>source_cluster1</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>data_table1</table_pull>

            <cluster_push>default_cluster</cluster_push>
            <database_push>test_database</database_push>
            <table_push>data_table1_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}')
            ORDER BY (value)
            </engine>

            <sharding_key>rand()</sharding_key>
        </table_local>
    </tables>
</clickhouse>

Загрузите конфигурацию task1.xml в znode /clickhouse/clickhouse/copier/task1/description кластера ZooKeeper, который указан в keeper.xml.

Из каталога, где находится файл keeper.xml, запустите clickhouse-copier:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task1

Проверка

В результате копирования в кластере default_cluster на всех хостах в базе данных test_database будет создана реплицированная таблица. Это можно проверить с помощью запроса:

SELECT create_table_query FROM system.tables where name = 'data_table1_copied' and database ='test_database';
┌─create_table_query────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE test_database.data_table1_copied (`value` UInt64)                                                                                │
│ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table1_copied', '{replica}') ORDER BY value SETTINGS index_granularity = 8192   │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Данные распределены примерно поровну между двумя шардами кластера default_cluster — в таблице ниже приведены запросы для проверки, которые можно выполнить на хостах каждого шарда.

Запрос Шард 1 Шард 2
SELECT count() FROM test_database.data_table1_copied;
┌─count()─┐
│  499045 │
└─────────┘
┌─count()─┐
│  500955 │
└─────────┘
SELECT * FROM test_database.data_table1_copied ORDER BY value ASC;
┌─value─┐
│     2 │
│     4 │
│     5 │
│     7 │
│     9 │
│    10 │
│    ...
┌─value─┐
│     0 │
│     1 │
│     3 │
│     6 │
│     8 │
│    11 │
│    ...

Чтобы отправлять запросы SELECT на все шарды сразу, можно использовать распределенную таблицу:

CREATE TABLE test_database.distr_table1 ON CLUSTER default_cluster AS test_database.data_table1_copied
ENGINE = Distributed(default_cluster, test_database, data_table1_copied, rand());
SELECT * FROM test_database.distr_table1 WHERE value < 10;
┌─value─┐
│     4 │
└───────┘
┌─value─┐
│     7 │
└───────┘
┌─value─┐
│     2 │
│     5 │
│     9 │
└───────┘
┌─value─┐
│     1 │
└───────┘
┌─value─┐
│     6 │
│     8 │
└───────┘
┌─value─┐
│     0 │
│     3 │
└───────┘

Пример 2. Перешардирование

В этом примере показывается как с помощью clickhouse-copier перешардировать данные — скопировать данные, распределенные по трем шардам кластера source_cluster2 (в каждом шарде одна реплика), в кластер default_cluster с двумя шардами (в каждом шарде две реплики).

Исходные данные

Создайте таблицу data_table2 в кластере source_cluster2 (в базе данных default):

CREATE TABLE data_table2 ON CLUSTER source_cluster2 (value UInt64) ENGINE = MergeTree() ORDER BY value;

Создайте распределенную таблицу, которая будет обращаться к таблице data_table2 и позволит выполнять запросы распределенно на нескольких серверах:

CREATE TABLE distr_table2 ON CLUSTER source_cluster2 AS default.data_table2
ENGINE = Distributed(source_cluster2, default, data_table2, rand());

Вставьте данные, направив запрос INSERT в распределенную таблицу, которая распределит данные по шардам случайным образом:

INSERT INTO distr_table2 SELECT number FROM numbers(1000000);

Примерное распределение данных по шардам приведено в следующей таблице.

Запрос Шард 1 Шард 2 Шард 3
SELECT count() FROM data_table2;
┌─count()─┐
│  332973 │
└─────────┘
┌─count()─┐
│  334408 │
└─────────┘
┌─count()─┐
│  332619 │
└─────────┘

Копирование

Составьте XML-конфигурацию задания на копирование данных для clickhouse-copier (файл task2.xml).

task2.xml
<clickhouse>
    <remote_servers>
        <source_cluster2>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>false</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </source_cluster2>
        <default_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm1.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm2.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <weight>1</weight>
                <replica>
                    <host>dev-adqm3.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>dev-adqm4.ru-central1.internal</host>
                    <port>9000</port>
                </replica>
            </shard>
        </default_cluster>
    </remote_servers>

    <max_workers>2</max_workers>

    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <settings>
        <connect_timeout>3</connect_timeout>
        <distributed_foreground_insert>1</distributed_foreground_insert>
    </settings>

    <tables>
        <table_local>
            <cluster_pull>source_cluster2</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>data_table2</table_pull>

            <cluster_push>default_cluster</cluster_push>
            <database_push>test_database</database_push>
            <table_push>data_table2_copied</table_push>

            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/data_table2_copied', '{replica}')
            ORDER BY (value)
            </engine>

            <sharding_key>rand()</sharding_key>
        </table_local>
    </tables>
</clickhouse>

Загрузите конфигурацию task2.xml в znode /clickhouse/clickhouse/copier/task2/description.

Запустите clickhouse-copier из каталога, где хранится keeper.xml:

$ clickhouse-copier --config keeper.xml --task-path /clickhouse/copier/task2

Проверка

Проверьте, что в базу данных test_database добавлена таблица data_table2_copied:

SHOW tables FROM test_database;
┌─name───────────────┐
│ data_table1_copied │
│ data_table2_copied │
└────────────────────┘

Посмотрите, как 1 миллион строк данных из исходной таблицы распределился по шардам кластера назначения.

Запрос Шард 1 Шард 2
SELECT count() FROM test_database.data_table2_copied;
┌─count()─┐
│  500171 │
└─────────┘
┌─count()─┐
│  499829 │
└─────────┘
SELECT * FROM test_database.data_table2_copied limit 10;
┌─value─┐
│     3 │
│     5 │
│     7 │
│     8 │
│    11 │
│    13 │
│    14 │
│    18 │
│    20 │
│    22 │
└───────┘
┌─value─┐
│     0 │
│     1 │
│     2 │
│     4 │
│     6 │
│     9 │
│    10 │
│    12 │
│    15 │
│    16 │
└───────┘
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней