Обзор ADQM Spark 3 Connector

Arenadata QuickMarts (ADQM) — столбцовая система управления базами данных для онлайн-обработки запросов, которая основана на открытой (open-source) СУБД ClickHouse.

Apache Spark 3 — фреймворк с открытым исходным кодом для реализации распределенной обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop.

ADQM Spark 3 Connector предоставляет возможность высокоскоростного параллельного обмена данными между Apache Spark 3 и Arenadata QuickMarts.

Архитектура

Каждое приложение Spark 3 состоит из управляющего процесса — драйвера (driver) — и набора распределенных worker-процессов — исполнителей (executors). Схема взаимодействия представлена ниже.

Архитектура ADQM Spark 3 Connector
Архитектура ADQM Spark 3 Connector
Архитектура ADQM Spark 3 Connector
Архитектура ADQM Spark 3 Connector

Чтение данных

Для загрузки таблицы ADQM в Spark 3 на первичном этапе коннектор инициализирует драйвер через строку подключения, которая задана опцией spark.adqm.url. При этом если указана опция spark.adqm.cluster.name, драйвер, подключаясь к spark.adqm.url с помощью JDBC, также считывает метаинформацию о всех шардах и репликах указанного кластера ADQM, что позволяет в дальнейшем эффективно в параллель считывать данные с таблиц шардов по имеющимся исполнителям Spark 3.

Данные в ADQM хранятся на шардах. Приложение Spark 3, используя коннектор, может получать данные с каждого шарда в одну или несколько партиций Spark 3. В случае чтения из Distributed-таблиц, чтение происходит в параллель на каждом из шардов из соответствующей MergeTree-таблицы.

На текущий момент существует три режима партиционирования:

ПРИМЕЧАНИЕ

Информация справедлива для случаев, когда опция spark.adqm.cluster.name указана и не указана.

  • По shard_id. Данные из таблиц ADQM считываются и распределяются по партициям Spark 3 в соответствии с номером шарда. Количество партиций Spark 3 соответствует числу активных шардов ADQM. Этот режим партиционирования не требует дополнительных параметров и используется по умолчанию.

  • По указанному столбцу и указанному количеству партиций. Данные разбиваются по партициям Spark 3 согласно диапазонам значений указанного столбца на указанное число партиций. Количество партиций Spark 3 соответствует количеству диапазонов. Необходимо указать параметры spark.adqm.partition.column и spark.adqm.partition.count (см. Опции ADQM Spark 3 Connector). Данный режим имеет ограничения: в качестве столбца могут быть использованы только поля таблицы типа integer или date/time.

  • По указанному столбцу. Данные разбиваются по партициям Spark 3 в соответствии с уникальными значениями указанного столбца. Количество партиций Spark 3 соответствует количеству уникальных значений в указанном столбце. Необходимо установить параметр spark.adqm.partition.column (см. Опции ADQM Spark 3 Connector). Рекомендуется использовать данный режим при небольшом, ограниченном наборе значений.

При такой архитектуре на каждый исполнитель Spark 3 назначается задача по обработке данных и соответствующая ему партиция из ранее созданных. Обмен данными выполняется в параллельном режиме для каждой партиции с каждым шардом (если указана опция spark.adqm.cluster.name — с каждым шардом данного кластера, в ином случае — с шардами, указанными в опции spark.adqm.url).

Алгоритм чтения данных
Алгоритм чтения данных
Алгоритм чтения данных
Алгоритм чтения данных

Запись данных

Для загрузки таблицы из Spark 3 в ADQM на первичном этапе коннектор инициализирует драйвер через строку подключения spark.adqm.url. При этом если указана опция spark.adqm.cluster.name, драйвер, подключаясь к spark.adqm.url с помощью JDBC, также считывает метаинформацию о всех шардах и репликах указанного кластера ADQM. В зависимости от режима записи производится подготовка каждого шарда к загрузке данных (создание или очистка таблиц).

На текущий момент поддерживаются следующие режимы записи:

  • append. Дополнительные данные записываются в целевую таблицу. При указании опции spark.adqm.cluster.name осуществляется попытка записи в каждый из шардов кластера, в ином случае — только в те шарды, которые указаны в spark.adqm.url. Если на каком-либо из шардов отсутствует таблица, в которую предполагается дозапись, процесс загрузки завершается ошибкой. В этом режиме на всех шардах должна быть предварительно создана целевая таблица.

  • errorIfExists. Если целевая таблица существует, появляется ошибка. В этом режиме предполагается, что целевой таблицы нет, она создается в процессе загрузки. Необходимо указать информацию о движке создаваемой целевой таблицы с помощью опции spark.adqm.create.table.engine. Если указана опция spark.adqm.cluster.name, то целевая таблица с указанным движком создается на каждом из шардов кластера, в ином случае — целевая таблица создается на каждом из шардов, заданных в опции spark.adqm.url.

  • overwrite. Данные в таблице перезаписываются. В зависимости от spark.adqm.table.truncate целевая таблица либо удаляется полностью, либо в ней все данные очищаются перед процессом записи. В случае полного удаления таблицы процесс записи аналогичен режиму errorIfExists, в случае очистки — аналогичен режиму append.

Также доступен ряд опций spark.adqm.create.table, позволяющих указать дополнительные настройки при создании целевой таблицы (см. Опции ADQM Spark 3 Connector).

Далее для каждой партиции Spark 3 создается задача по загрузке данных. Обмен данных выполняется в параллельном режиме с каждым шардом (если указана опция spark.adqm.cluster.name — с каждым из шардов данного кластера, в ином случае — с шардами, указанными в опции spark.adqm.url).

Алгоритм записи данных
Алгоритм записи данных
Алгоритм записи данных
Алгоритм записи данных

Поддерживаемые движки таблиц

Движок таблицы (table engine) — сущность ADQM, которая определяет, как данные хранятся и каким образом их записывать, откуда их читать и какой к ним имеется доступ, а также задает параметры запросов, индексов и репликации (см. Движки таблиц в документации ADQM).

ADQM Spark 3 Connector поддерживает следующие движки таблиц ADQM:

  • Движки семейства MergeTree. Предназначены для быстрой записи огромного объема данных по частям относительно заданных правил. Поддерживается основной движок MergeTree, а также его специальные версии:

    • AggregatingMergeTree;

    • CollapsingMergeTree;

    • VersionedCollapsingMergeTree;

    • ReplacingMergeTree;

    • SummingMergeTree;

    • ReplicatedMergeTree;

    • ReplicatedAggregatingMergeTree;

    • ReplicatedCollapsingMergeTree;

    • ReplicatedVersionedCollapsingMergeTree;

    • ReplicatedReplacingMergeTree;

    • ReplicatedSummingMergeTree.

  • Движок Distributed. Не хранит данные, а позволяет обрабатывать запросы распределенно на нескольких серверах, выполняя параллельное чтение автоматически (подробнее о создании и использовании таблиц типа Distributed в статье Шардирование документации ADQM).

Поддерживаемые типы данных

Соответствия типов данных при перемещениях данных представлены в таблицах ниже.

Перемещение из ADQM в Spark 3
ADQM Spark 3

Date

DateType

DateTime

TimestampType

DateTime64

TimestampType

Decimal

DecimalType

Decimal128

DecimalType

Decimal256

DecimalType

Decimal32

DecimalType

Decimal64

DecimalType

Float32

FloatType

Float64

DoubleType

IPv4

LongType

Int16

IntegerType

Int32

IntegerType

Int64

LongType

Int8

ShortType

String

StringType

UInt16

IntegerType

UInt32

LongType

UInt64

LongType

UInt8

ShortType

UUID

StringType

float8

DoubleType

Перемещение из Spark 3 в ADQM
Spark 3 ADQM

BinaryType

String

BooleanType

UInt8

ByteType

Int8

DateType

Date

DecimalType

Decimal

DoubleType

Float64

FloatType

Float32

IntegerType

Int32

LongType

Int64

ShortType

Int16

StringType

String

TimestampType

DateTime64

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней