Как и зачем мы сделали Spark-коннектор к Greenplum

Всем привет! Меня зовут Андрей, я работаю системным архитектором в Arenadata. В этой статье расскажу, как и зачем мы сделали свой инструмент для обмена данными между Arenadata DB (аналитическая MPP-СУБД на базе Greenplum) и фреймворком для распределенной обработки данных Apache Spark (входит в экосистему Arenadata Hadoop).

spark connector dark
spark connector light

Поскольку Greenplum и Hadoop — это базовые элементы нашего стека, нам часто приходится сталкиваться с задачами обработки и передачи информации между ними. Ранее мы решали эту задачу частично с помощью Platform Extension Framework (PXF), но это лишает нас всех преимуществ, которые предлагает Spark, а они довольно существенны.

Мы решили написать ADB Spark Connector на базе HTTP-сервера, реализующего протокол gpfdist. Почему выбрали именно такой путь и на какой архитектуре в итоге остановились, расскажу ниже в статье.

Немного теории

Arenadata DB — это распределенная СУБД, построенная на базе Greenplum, которая относится к классу MPP-систем. MPP (massively parallel processing, массово-параллельные вычисления) — класс СУБД, распределяющих хранимые данные на множестве вычислительных узлов для параллельной обработки больших объемов данных. Каждый узел имеет собственное хранилище и вычислительные ресурсы, позволяющие выполнять часть общего запроса к базе. Логика разбиения таблицы на сегменты задается ключом (полем) дистрибуции. Для каждой отдельной колонки в таблице можно задать свой тип и уровень сжатия.

mpp dark
Архитектура MPP
mpp light
Архитектура MPP

MPP DB хорошо подходят в качестве аналитического хранилища данных, потому что:

  • масштабируются горизонтально;

  • хорошо держат OLAP-нагрузку;

  • позволяют загружать данные с большой скоростью.

Рассмотрим способы, с помощью которых можно обмениваться данными с ADB:

  1. C помощью JDBC-драйвера через мастер ADB. Этот способ доступен "из коробки" с помощью стандартного Spark JDBC-коннектора.

    Преимущества Недостатки

    Готовый Spark JDBC Source

    "Бутылочное горлышко" при выгрузке через единственную master-ноду

    Отсутствует необходимость поддержки новых версий API

    Простая возможность запуска кастомного SQL-кода

  2. C помощью COPY на мастере.

    Преимущества Недостатки

    Нативное представление в JDBC-драйвере Postgres

    "Бутылочное горлышко" при выгрузке через единственную master-ноду

    Master-нода берет на себя ответственность по распределению записей (согласно ключу распределения)

    Простая возможность запуска кастомного SQL-кода

  3. C помощью распределенного COPY.

    Преимущества Недостатки

    Возможность параллельной записи на сегменты

    Необходимость реализовать алгоритм распределения записей по сегментам согласно ключу распределения

    Необходимость следить за закрытием ресурсов на всех машинах Spark-воркеров

  4. С помощью gpfdist — HTTP-сервера для параллельного обмена данными с сегментами GP.

    Преимущества Недостатки

    Готовое решение от основного контрибьютора

    Необходимость устанавливать gpfdist на всех машинах Spark-воркеров

    Запись напрямую на сегменты, минуя мастер, что увеличивает пропускную способность чтения и записи

    Отсутствие контроля процесса загрузки записей

    Создание вручную именованных каналов

    Нет необходимости поддерживать свой код при изменении протокола

    Необходимость следить за закрытием ресурсов на всех машинах Spark-воркеров

  5. С помощью HTTP-сервера, реализующего протокол gpfdist (readable, writable).

    Преимущества Недостатки

    Параллельная запись на сегменты, минуя master-ноду

    Необходимость реализации протокола gpfdist, а также доработка при изменении для различных версий Greenplum

    Отсутствие необходимости создания и уничтожения ресурсов, в том числе именованных каналов на Spark-воркерах

    Полный контроль над управлением задачи перегрузки данных

    Возможность реализации алгоритмов гибкого партиционирования при считывании из Greenplum в Spark

Выбор основы для коннектора

Мы решили написать коннектор на основе HTTP-сервера, реализующего протокол gpfdist. Разработали две реализации протокола на базе:

  1. Akka-HTTP

  2. Finagle

Версия с Finagle показала себя лучше при наличии множества одновременных сессий от сегментов ADB, поэтому остановились на ней.

Теперь взглянем на эту задачу со стороны Apache Spark. Высокоуровневая архитектура процесса компиляции запросов Spark соответствует традиционному подходу СУБД, который заключается в преобразовании запроса в логический план, оптимизации логического плана в физический и его выполнении на рабочих узлах.

plan dark
Компиляция запросов в Spark
plan light
Компиляция запросов в Spark

Рассмотрим средства, предоставляемые в Spark для написания коннекторов к различным источникам данных.

Data Source API был представлен в Spark 1.3 вместе с абстракцией Dataframe. Со времени этого релиза Spark претерпел большие изменения. С версией 2.0 пришла новая абстракция DataSet, и в связи с этим API был переосмыслен. Ограничения API v1:

  • смешанное из API низкого и высокого уровня;

  • cложно делать Push-down операторы;

  • сложно передавать информацию о партициях;

  • нет поддержки транзакционной записи;

  • нет поддержки колоночного режима;

  • нет поддержки стриминга.

С версией 2.3 выпустили и новый API, известный как Data Source v2. В нем нет вышеперечисленных ограничений, и его характерной особенностью является переход Scala-трейтов к Java-интерфейсам для лучшей совместимости.

Рассмотрим подробней, что происходит с точки зрения Data Source API v2 на каждом из представленных этапов.

Источник данных представлен в логическом плане экземпляром DataSourceV2Relation. Во время планирования DataSourceV2Strategy преобразует отношение в экземпляр DataSourceV2ScanExec. Последний создает экземпляр DataSourceRDD, который используется для фактического параллельного вычисления результатов из нескольких разделов.

Логический план — на этом этапе цель состоит в том, чтобы создать DataSourceV2Relation из опций, предоставленных клиентом. Процесс инициируется загрузкой Dataset.

Физический план — на этом этапе цель состоит в том, чтобы преобразовать DataSourceV2Relation в DataSourceV2ScanExec. Процесс инициируется выполнением действия с Dataset, созданным на предыдущем этапе.

Выполнение запроса — на этом этапе цель состоит в том, чтобы получить данные из партиций RDD, созданного DataSourceV2ScanExec, и собрать строки из всех разделов.

На основании вышеизложенного, было принято решение реализовать коннектор с помощью Datasource API v2.

Архитектура

Каждое Spark-приложение состоит из управляющего процесса-драйвера и набора распределенных рабочих процессов-исполнителей. Общая компонентная диаграмма взаимодействия Spark-приложения и Greenplum-кластера приведена ниже.

arch dark
Схема взаимодействия Spark и Greenplum
arch light
Схема взаимодействия Spark и Greenplum

Чтение данных

Загрузка данных в Spark из Greenplum состоит из нескольких этапов:

  1. Инициализация Spark-драйвера.

  2. Spark-драйвер при помощи JDBC устанавливает соединение с мастером Greenplum для получения необходимых метаданных о кластере и таблице:

    • количество активных сегментов;

    • схема и тип таблицы;

    • ключ распределения таблицы;

    • план запроса.

  3. В зависимости от выбранной стратегии партиционирования (речь о которых пойдет далее) происходит вычисление количества партиций, а также генерация условий, используемых при загрузке данных с сегментов в партиции.

  4. На каждый Spark-исполнитель назначается задача по обработке данных и соответствующая ей партиция.

read dark
Алгоритм чтения данных
read light
Алгоритм чтения данных

Обмен данными происходит с помощью механизма writable-внешних таблиц, одновременно для каждого сегмента. На текущий момент реализованы следующие стратегии партиционирования:

  • По gp_segment_id. Данные считываются и распределяются по партициям Spark в соответствии с их распределением по сегментам в Greenplum.

  • По указанной колонке и указанному количеству партиций. Для указанной колонки запрашиваются минимальное и максимальное значения и генерируются условия считывания данных из сегментов. Таким образом в Spark-партиции загружаются данные соответствующего диапазона.

  • Только по указанной колонке. Данные разбиваются в соответствии с уникальными значениями указанной колонки. В таком случае количество Spark-партиций соответствует количеству уникальных значений указанной колонки.

  • Только по указанному количеству партиций. Данные разбиваются согласно некоторой хеш-функции (по умолчанию) на указанное количество партиций.

  • По указанной хеш-функции и указанному количеству партиций. Аналогично предыдущему пункту, с указанием желаемой хеш-функции.

Запись данных

Выгрузка данных из Spark в Greenplum происходит в несколько этапов:

  1. Инициализация Spark-драйвера.

  2. Spark-драйвер при помощи JDBC устанавливает соединение с мастером Greenplum.

  3. В зависимости от режима записи Spark-драйвер производит инициализирующие действия (создание или очистка таблиц) в Greenplum для загрузки.

  4. Каждый из Spark-исполнителей выгружает данные из назначенной ему партиции в Greenplum.

write dark
Алгоритм записи данных
write light
Алгоритм записи данных

Обмен данными происходит с помощью механизма readable-внешних таблиц, одновременно для каждого сегмента.

На текущий момент поддерживаются следующие режимы записи:

  • Перезапись (overwrite) — либо целевая таблица удаляется полностью и пересоздается, либо происходит truncate.

  • Добавление (append) — данные добавляются в целевую таблицу.

  • Запись в новую таблицу — завершение с ошибкой, если целевая таблица уже существует (errorIfExists).

Резюмируем функциональные возможности

  • Чтение данных из Greenplum.

  • Запись данных в Greenplum с помощью различных режимов записи:

    • overwrite

    • append

    • errorIfExists

  • Автоматическое формирование схемы данных.

  • Гибкое партиционирование.

  • Дополнительные опции при создании целевой таблицы в Greenplum.

  • Поддержка push-down операторов:

    • отсекание колонок;

    • push-down фильтры.

  • Извлечение дополнительных метаданных из Greenplum:

    • схема распределения данных;

    • статистика.

  • Оптимизация выполнения count-выражений в запросе.

  • Выполнение произвольного SQL-запроса через мастер Greenplum.

  • Поддержка пакетного (batch) режима при загрузке в Spark.

Аналоги

На текущий момент существует Spark Greenplum-коннектор от Pivotal.

Мы провели сравнительный анализ функциональных возможностей и производительности двух коннекторов. Результаты — в таблице ниже.

Сравнение коннекторов ADB Spark и Pivotal Spark Greenplum
Характеристика Pivotal Spark Greenplum Arenadata ADB Spark

Поддерживаемые версии Spark

  • 2.3.X

  • 2.4.X

  • 3.0.X

  • 2.3.X

  • 2.4.X

Поддерживаемые типы данных

  • числовые

  • строковые

  • дата/время

  • числовые

  • строковые

  • дата/время

  • интервалы

  • массивы

Push-down фильтрация

+

+

Pruning column

+

+

Партиционирование

2 способа

5 способов

Режимы записи

  • Append

  • Overwrite

  • ErrorIfExists

  • Ignore

  • Append

  • Overwrite

  • ErrorIfExists

Поддержка batch-режима в Spark

 — 

+

Сбор статистики для построения плана запроса в Catalyst

 — 

+

Выполнение произвольного SQL-запроса через master

 — 

+

Производительность

20x быстрее, чем JDBC

20x быстрее, чем JDBC

Таким образом, нам удалось создать высокопроизводительный двунаправленный ADB Spark коннектор, который, в свою очередь, обладает рядом функциональных преимуществ относительно аналога от Pivotal.

Подробное описание коннектора с примерами использования можно посмотреть в документации.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней