Шардирование

Шардирование — это принцип проектирования базы данных, при котором части одной таблицы размещаются на разных шардах. Шард — узел кластера, который может состоять из одной или нескольких реплик. Реплики — это серверы, на которых дублируются данные в рамках шарда. Запрос на чтение или запись в шард может быть отправлен на любую его реплику, выделенного мастера нет.

Шардирование обычно используется в следующих случаях:

  • Увеличивается количество данных и частота запросов к базе данных.

  • Система требует все больше ресурсов, но вертикальное масштабирование кластера, которое предполагает обновление аппаратного обеспечения имеющихся серверов, не может быть использовано, так как существуют физические пределы — по количеству ядер на процессор, количеству процессоров, объему памяти и т.д.

 
Шардирование позволяет:

  • Преодолеть технические ограничения. Если данные не помещаются на одном сервере или текущая инфраструктура хранения данных работает на пределе возможностей оборудования, можно горизонтально масштабировать кластер, то есть добавить новые серверы к существующему стеку и распределить данные по нескольким шардам.

  • Повысить доступность. Шардирование позволяет смягчить последствия сбоев. Если данные хранятся на одном сервере, отказ этого сервера может привести к потере доступа ко всем данным. А отказ, например, одного шарда из пяти оставляет доступными 80% данных таблицы.

  • Увеличить скорость выполнения запросов. Запросы конкурируют с друг другом за вычислительные ресурсы серверов кластера, что может приводить к снижению скорости обработки запросов. В шардированном кластере, где запросы к одной и той же таблице могут выполняться параллельно, устраняется конкуренция за общие ресурсы, что позволяет сократить время обработки запроса.

Распределенные таблицы

Шардирование данных реализуется с помощью распределенных таблиц — таблиц на движке Distributed. Распределенная таблица не хранит данные. Данные хранятся в локальных таблицах, которые находятся на серверах каждого шарда. Распределенная таблица обеспечивает маршрутизацию запросов к локальным таблицам, другими словами, позволяет обрабатывать запросы распределённо на нескольких серверах.

Создание распределенной таблицы

Распределенную таблицу можно создавать где угодно, в том числе на серверах, не входящих в шарды, на которых хранятся таблицы с данными. Но стандартный способ — размещение распределенной таблицы на всех шардах, на которых размещаются таблицы с данными.

Пример запроса на создание распределенной таблицы на всех серверах кластера:

CREATE TABLE distributed_table ON CLUSTER cluster_name AS table_name
ENGINE = Distributed(cluster_name, database, table_name [, sharding_key][, policy_name]);

При использовании движка Distributed требуется передать следующие параметры:

  • имя кластера в конфигурационном файле сервера;

  • имя базы данных для таблицы, к которой будет обращаться распределенная таблица (имя базы данных должны быть одинаковым на всех серверах кластера);

  • имя таблицы, к которой будет обращаться распределенная таблица (имя таблицы должны быть одинаковым на всех серверах кластера);

  • не обязательно — ключ шардирования, на основе которого данные будут распределяться по шардам при выполнении запросов INSERT;

  • не обязательно — имя политики для хранения временных файлов при асинхронной отправке.

Конфигурация кластера

Движок Distributed требует описание кластера в конфигурационном файле каждого сервера (config.xml). Кластер, заданный в конфигурационном файле — это логическая сущность, объединяющая серверы. Один сервер может участвовать в нескольких логических кластерах. Таким образом обеспечивается гибкость распределения данных между серверами.

Кластеры задаются следующим образом:

<remote_servers>
    <cluster_name>
        <shard>
            <weight>2</weight>
            <internal_replication>false</internal_replication>
            <replica>
                <priority>1</priority>
                <host>host_01_1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>host_01_2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>1</weight>
            <internal_replication>false</internal_replication>
            <replica>
                <host>host_02_1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>host_02_2</host>
                <port>9000</port>
            </replica>
        </shard>
    </cluster_name>
</remote_servers>

В данном примере задан распределенный кластер с именем cluster_name, состоящий из двух шардов. Каждый шард содержит две реплики. В качестве параметров для каждой реплики указываются host (адрес удаленного сервера) и port (TCP-порт для межсерверного взамодействия, обычно 9000).

Кластер может включать один шард (в этом случае обработку запроса называют удалённой, а не распределённой) или несколько шардов. В каждом шарде можно указать от одной до произвольного числа реплик. Число реплик для каждого шарда в составе кластера может быть разным.

Для шарда могут быть заданы следующие необязательные параметры:

  • weight(по умолчанию, 1) — вес шарда при записи данных. Данные будут распределяться по шардам в количестве, пропорциональном весу шардов. Например, есть два шарда — у первого вес 2, у второго вес 1. В этом случае доля строк, отправляемых на первый шард, будет составлять 2/3, а на второй — 1/3.

  • internal_replication — определяет, как записывать данные в реплики:

    • true — данные записываются в первую доступную реплику. Этот вариант следует использовать, если данные записываются в реплицируемую таблицу, которая будет сама управлять репликацией данных. Данные, записанные на реплику, будут скопированы на другие реплики шарда в асинхронном режиме.

    • false (по умолчанию) — данные записываются на все реплики, то есть распределенная таблица занимается репликацией данных самостоятельно. Это менее надежно, чем использование реплицируемых таблиц, так как в этом случае идентичность реплик не контролируется (со временем данные на них могут начать немного различаться).

Также можно настроить алгоритм балансировки нагрузки на реплики, то есть указать предпочтения, на какую из реплик в первую очередь отправлять запрос. Для этого можно задать параметр реплики priority в конфигурации кластера (меньшее значение — больший приоритет реплики, по умолчанию — 1) или использовать настройку load_balancing.

Конфигурирование кластера через ADCM

 
Сконфигурировать логический кластер ADQM можно в интерфейсе ADCM, используя параметры в секции Cluster configuration на странице конфигурирования сервиса ADQMDB. Для этого можно:

  • Указать значение Replication factor перед установкой ADQM — на основе указанного фактора репликации все хосты ADQM будут автоматически объединены в логический кластер.

  • Вручную добавить и настроить необходимые кластеры с помощью визуальных элементов параметра Cluster Configuration.

После установки ADQM или перезапуска сервиса ADQMDB описание кластера автоматически добавится в конфигурационный файл. Подробнее в статье Конфигурирование логических кластеров в интерфейсе ADCM.

Запись данных на кластер

Есть два способа записывать данные на кластер.

Запись напрямую в локальные таблицы

Вы можете самостоятельно определять, на какие серверы какие данные записывать, и выполнять запись непосредственно на каждый шард (то есть отправлять запросы INSERT напрямую в те таблицы, которые хранят данные). Это решение является наиболее гибким и оптимальным, поскольку вы можете использовать любую схему шардирования, отвечающую требованиям предметной области, и записывать данные на разные шарды полностью независимо.

Запись в распределенную таблицу

Вы можете отправлять запрос INSERT в распределенную таблицу. В этом случае таблица будет сама распределять вставляемые данные по серверам. Для этого необходимо задать ключ шардирования. Если шард всего один, то запись работает и без указания ключа шардирования, так как в этом случае он не имеет смысла.

Ключ шардирования может быть задан как произвольное выражение, возвращающее целое число. Например, в качестве ключа шардирования можно использовать:

  • столбец таблицы UserID — для распределения данных по идентификатору пользователя (данные одного пользователя будут расположены на одном шарде);

  • функцию toDayOfWeek — для распределения данных по шардам в зависимости от дня недели определенной даты (например, если в таблице есть поле saleDate, содержащее дату продажи, а кластер содержит 7 шардов);

  • выражение rand() — для случайного распределения данных.

Для выбора на какой шард записывается строка данных, используется остаток от деления ключа шардирования на суммарный вес шардов. Строка данных отправляется на шард, если остаток от деления входит в интервал от prev_weights до prev_weights + weight, где prev_weights - сумма весов шардов с меньшим номером, а weight — вес этого шарда. Например, есть 2 шарда — у одного вес 2, у другого вес 1. Сумма весов всех шардов 2 + 1 = 3. В этом случае возможные остатки от деления ключа шардирования на суммарный вес шардов — 0, 1 и 2. Данные будут записываться на первый шард, если остаток 0 или 1 (диапазон [0, 2)), и на второй шард — если остаток 2 (диапазон [2, 3)).

Простой остаток от деления является довольно ограниченным решением для шардирования и подходит не для всех случаев. Он подходит для среднего и большого объёма данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и больше). В последнем случае лучше использовать схему шардирования, учитывающую требования предметной области, и не использовать возможность записи в распределенные таблицы.

Чтение из распределенной таблицы

В отличие от запросов INSERT, запросы SELECT отправляютcя на все шарды кластера, вне зависимости от того, каким образом данные распределены по шардам (это поведение можно поменять с помощью настройки optimize_skip_unused_shards). При чтении для каждого из шардов выбирается одна из доступных реплик. Шарды обрабатывают запрос на чтение (максимум обработки выполняется на удалённых серверах, чтобы передавать меньше данных по сети) и отправляют частично агрегированные результаты в распределенную таблицу, которая объединит все полученные данные и отправит полный результат пользователю.

Перешардирование

Встроенной возможности автоматического перешардирования в ClickHouse нет. Но для перераспределения данных можно использовать утилиту clickhouse-copier, которая копирует данные из таблиц одного кластера (source cluster) в таблицы другого кластера (destination cluster). Для этого необходимо:

  1. Создать файл конфигурации заданий на копирование. В этом файле описать:

    • кластер с исходными данными и кластер с данными для перешардирования (в секции remote_servers);

    • задачи на шардирование (в секции tables);

    • новый ключ шардирования (в секции sharding_key).

  2. Создать файл конфигурации с параметрами соединения с ZooKeeper.

  3. Создать задачу в ZooKeeper.

  4. Запустить clickhouse-copier для копирования данных.

  5. Создать новую распределенную таблицу, переключиться со старого кластера на новый.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней