Обзор ADQM Spark 3 Connector
Arenadata QuickMarts (ADQM) — столбцовая система управления базами данных для онлайн-обработки запросов, которая основана на открытой (open-source) СУБД ClickHouse.
Apache Spark 3 — фреймворк с открытым исходным кодом для реализации распределенной обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop.
ADQM Spark 3 Connector предоставляет возможность высокоскоростного параллельного обмена данными между Apache Spark 3 и Arenadata QuickMarts.
Архитектура
Каждое приложение Spark 3 состоит из управляющего процесса — драйвера (driver) — и набора распределенных worker-процессов — исполнителей (executors). Схема взаимодействия представлена ниже.
Чтение данных
Для загрузки таблицы ADQM в Spark 3 на первичном этапе коннектор инициализирует драйвер через строку подключения, которая задана опцией spark.adqm.url
. При этом если указана опция spark.adqm.cluster.name
, драйвер, подключаясь к spark.adqm.url
с помощью JDBC, также считывает метаинформацию о всех шардах и репликах указанного кластера ADQM, что позволяет в дальнейшем эффективно в параллель считывать данные с таблиц шардов по имеющимся исполнителям Spark 3.
Данные в ADQM хранятся на шардах. Приложение Spark 3, используя коннектор, может получать данные с каждого шарда в одну или несколько партиций Spark 3. В случае чтения из Distributed-таблиц, чтение происходит в параллель на каждом из шардов из соответствующей MergeTree-таблицы.
На текущий момент существует три режима партиционирования:
ПРИМЕЧАНИЕ
Информация справедлива для случаев, когда опция |
-
По shard_id. Данные из таблиц ADQM считываются и распределяются по партициям Spark 3 в соответствии с номером шарда. Количество партиций Spark 3 соответствует числу активных шардов ADQM. Этот режим партиционирования не требует дополнительных параметров и используется по умолчанию.
-
По указанному столбцу и указанному количеству партиций. Данные разбиваются по партициям Spark 3 согласно диапазонам значений указанного столбца на указанное число партиций. Количество партиций Spark 3 соответствует количеству диапазонов. Необходимо указать параметры
spark.adqm.partition.column
иspark.adqm.partition.count
(см. Опции ADQM Spark 3 Connector). Данный режим имеет ограничения: в качестве столбца могут быть использованы только поля таблицы типа integer или date/time. -
По указанному столбцу. Данные разбиваются по партициям Spark 3 в соответствии с уникальными значениями указанного столбца. Количество партиций Spark 3 соответствует количеству уникальных значений в указанном столбце. Необходимо установить параметр
spark.adqm.partition.column
(см. Опции ADQM Spark 3 Connector). Рекомендуется использовать данный режим при небольшом, ограниченном наборе значений.
При такой архитектуре на каждый исполнитель Spark 3 назначается задача по обработке данных и соответствующая ему партиция из ранее созданных. Обмен данными выполняется в параллельном режиме для каждой партиции с каждым шардом (если указана опция spark.adqm.cluster.name
— с каждым шардом данного кластера, в ином случае — с шардами, указанными в опции spark.adqm.url
).
Запись данных
Для загрузки таблицы из Spark 3 в ADQM на первичном этапе коннектор инициализирует драйвер через строку подключения spark.adqm.url
. При этом если указана опция spark.adqm.cluster.name
, драйвер, подключаясь к spark.adqm.url
с помощью JDBC, также считывает метаинформацию о всех шардах и репликах указанного кластера ADQM. В зависимости от режима записи производится подготовка каждого шарда к загрузке данных (создание или очистка таблиц).
На текущий момент поддерживаются следующие режимы записи:
-
append
. Дополнительные данные записываются в целевую таблицу. При указании опцииspark.adqm.cluster.name
осуществляется попытка записи в каждый из шардов кластера, в ином случае — только в те шарды, которые указаны вspark.adqm.url
. Если на каком-либо из шардов отсутствует таблица, в которую предполагается дозапись, процесс загрузки завершается ошибкой. В этом режиме на всех шардах должна быть предварительно создана целевая таблица. -
errorIfExists
. Если целевая таблица существует, появляется ошибка. В этом режиме предполагается, что целевой таблицы нет, она создается в процессе загрузки. Необходимо указать информацию о движке создаваемой целевой таблицы с помощью опцииspark.adqm.create.table.engine
. Если указана опцияspark.adqm.cluster.name
, то целевая таблица с указанным движком создается на каждом из шардов кластера, в ином случае — целевая таблица создается на каждом из шардов, заданных в опцииspark.adqm.url
. -
overwrite
. Данные в таблице перезаписываются. В зависимости отspark.adqm.table.truncate
целевая таблица либо удаляется полностью, либо в ней все данные очищаются перед процессом записи. В случае полного удаления таблицы процесс записи аналогичен режимуerrorIfExists
, в случае очистки — аналогичен режимуappend
.
Также доступен ряд опций spark.adqm.create.table
, позволяющих указать дополнительные настройки при создании целевой таблицы (см. Опции ADQM Spark 3 Connector).
Далее для каждой партиции Spark 3 создается задача по загрузке данных. Обмен данных выполняется в параллельном режиме с каждым шардом (если указана опция spark.adqm.cluster.name
— с каждым из шардов данного кластера, в ином случае — с шардами, указанными в опции spark.adqm.url
).
Поддерживаемые движки таблиц
Движок таблицы (table engine) — сущность ADQM, которая определяет, как данные хранятся и каким образом их записывать, откуда их читать и какой к ним имеется доступ, а также задает параметры запросов, индексов и репликации (см. Движки таблиц в документации ADQM).
ADQM Spark 3 Connector поддерживает следующие движки таблиц ADQM:
-
Движки семейства MergeTree. Предназначены для быстрой записи огромного объема данных по частям относительно заданных правил. Поддерживается основной движок MergeTree, а также его специальные версии:
-
AggregatingMergeTree;
-
CollapsingMergeTree;
-
VersionedCollapsingMergeTree;
-
ReplacingMergeTree;
-
SummingMergeTree;
-
ReplicatedMergeTree;
-
ReplicatedAggregatingMergeTree;
-
ReplicatedCollapsingMergeTree;
-
ReplicatedVersionedCollapsingMergeTree;
-
ReplicatedReplacingMergeTree;
-
ReplicatedSummingMergeTree.
-
-
Движок Distributed. Не хранит данные, а позволяет обрабатывать запросы распределенно на нескольких серверах, выполняя параллельное чтение автоматически (подробнее о создании и использовании таблиц типа Distributed в статье Шардирование документации ADQM).
Поддерживаемые типы данных
Соответствия типов данных при перемещениях данных представлены в таблицах ниже.
ADQM | Spark 3 |
---|---|
Date |
DateType |
DateTime |
TimestampType |
DateTime64 |
TimestampType |
Decimal |
DecimalType |
Decimal128 |
DecimalType |
Decimal256 |
DecimalType |
Decimal32 |
DecimalType |
Decimal64 |
DecimalType |
Float32 |
FloatType |
Float64 |
DoubleType |
IPv4 |
LongType |
Int16 |
IntegerType |
Int32 |
IntegerType |
Int64 |
LongType |
Int8 |
ShortType |
String |
StringType |
UInt16 |
IntegerType |
UInt32 |
LongType |
UInt64 |
LongType |
UInt8 |
ShortType |
UUID |
StringType |
float8 |
DoubleType |
Spark 3 | ADQM |
---|---|
BinaryType |
String |
BooleanType |
UInt8 |
ByteType |
Int8 |
DateType |
Date |
DecimalType |
Decimal |
DoubleType |
Float64 |
FloatType |
Float32 |
IntegerType |
Int32 |
LongType |
Int64 |
ShortType |
Int16 |
StringType |
String |
TimestampType |
DateTime64 |