How and why we developed a Spark connector for Greenplum

spark connector dark
spark connector light

Hello everyone! My name is Andrey and I work as a system architect at Arenadata. In this article, I will tell you how and why we developed our own tool for exchanging data between Arenadata DB (an analytical MPP DBMS based on Greenplum) and Apache Spark (a framework for distributed processing of data that is a part of the Arenadata Hadoop ecosystem).

Since Greenplum and Hadoop are the basic elements of our stack, we often have to deal with tasks of processing and transferring information between them. Previously, we solved this problem partially using Platform Extension Framework (PXF) — but this deprives us of all benefits that Spark offers, which are quite significant.

We decided to create ADB Spark Connector based on an HTTP server that implements the gpfdist protocol. In the article below, I will tell you why we chose this course and which architecture we eventually settled on.

A little theory

Arenadata DB is a distributed DBMS built on Greenplum, which belongs to the class of MPP systems. MPP (massively parallel processing) is a class of DBMS that distribute stored data across multiple computing nodes for parallel processing large amounts of data. Each node has its own storage and computing resources that allow it to execute a part of the overall database query. The logic for dividing a table into segments is specified by a distribution key (field). For each individual column in a table, you can set its own compression type and level.

MPP architecture
MPP architecture
MPP architecture
MPP architecture

MPP databases are well suited as analytical data storages because they:

  • scale horizontally;

  • handle OLAP load well;

  • allow you to download data at high speed.

Let’s look at the ways to exchange data with ADB:

  1. Using the JDBC driver through the ADB master — standard Spark JDBC connector supports this method out of the box.

    Benefits Disadvantages

    Ready Spark JDBC Source

    Bottleneck when loading through a single master node

    No need to support new API versions

    Easy to run custom SQL code

  2. Using COPY on the master.

    Benefits Disadvantages

    Native representation in the Postgres JDBC driver

    Bottleneck when loading through a single master node

    Master node is responsible for distributing records (according to the distribution key)

    Easy to run custom SQL code

  3. Using distributed COPY.

    Benefits Disadvantages

    Possibility of parallel writing to segments

    Need to implement an algorithm for distributing records between segments according to the distribution key

    Need to monitor the release of resources on all Spark worker machines

  4. Using gpfdist — an HTTP server for parallel data exchange with GP segments.

    Benefits Disadvantages

    Ready-made solution from the main contributor

    Need to install gpfdist on all Spark worker machines

    Writing directly to segments, bypassing the master, which increases the read and write bandwith

    Lack of control over the process of loading records

    Creating named pipes manually

    No need to maintain your code when the protocol changes

    Need to monitor the release of resources on all Spark worker machines

  5. Using an HTTP server that implements the gpfdist protocol (readable, writable).

    Benefits Disadvantages

    Writing in parallel to segments, bypassing the master

    Need to implement the gpfdist protocol and modify it for different versions of Greenplum

    No need to create and destroy resources, including named pipes on Spark workers

    Full control over data overload task management

    Ability to implement flexible partitioning algorithms when reading from Greenplum into Spark

Select a base for the connector

We decided to create a connector based on an HTTP server that implements the gpfdist protocol and developed two implementations of the protocol based on:

  1. Akka-HTTP

  2. Finagle

The version with Finagle performed better when there were multiple simultaneous sessions from ADB segments, so we settled on it.

Now let’s look at this task from the Apache Spark side. The high-level architecture of the Spark query compilation process follows the traditional DBMS approach that involves converting a query into a logical plan, optimizing the logical plan into a physical one, and executing it on the worker nodes.

Spark query compilation
Spark query compilation
Spark query compilation
Spark query compilation

Let’s look at the tools Spark provides for creating connectors to various data sources.

Data Source API was introduced in Spark 1.3 along with the Dataframe abstraction. Since that release, Spark has undergone great changes. With version 2.0, a new DataSet abstraction came and API was rethought as a result. API v1 limitations:

  • mixed from low- and high-level APIs;

  • difficult to do Push-down operators;

  • difficult to transmit information about partitions;

  • no support for transactional writing;

  • no support for column mode;

  • no support for streaming.

With version 2.3, new API (known as Data Source v2) was released. It does not have restrictions mentioned above and its characteristic feature is the transition of Scala traits to Java interfaces for better compatibility.

Let’s take a closer look at what happens at each of the presented stages in terms of Data Source API v2.

A data source is represented as a DataSourceV2Relation instance in a logical plan. During planning, DataSourceV2Strategy converts a relation to a DataSourceV2ScanExec instance. The latter creates a DataSourceRDD instance, which is used to actually compute results from multiple partitions in parallel.

Logical plan — at this stage, the goal is to create a DataSourceV2Relation instance from the options provided by the client. The process is initiated by loading a Dataset.

Physical plan — at this stage, the goal is to convert DataSourceV2Relation to DataSourceV2ScanExec. The process is initiated by performing an action on the Dataset created in the previous step.

Query execution — at this stage, the goal is to get data from the partitions of RDD created by DataSourceV2ScanExec and collect rows from all partitions.

Based on the above, we decided to implement a connector using Datasource API v2.

Architecture

Each Spark application consists of a controlling process (driver) and a set of distributed worker processes (executors). A general component diagram of the interaction between a Spark application and a Greenplum cluster is shown below.

Scheme of interaction between Spark and Greenplum
Scheme of interaction between Spark and Greenplum
Scheme of interaction between Spark and Greenplum
Scheme of interaction between Spark and Greenplum

Data reading

Loading data into Spark from Greenplum consists of several steps:

  1. The Spark driver initialization.

  2. The Spark driver uses JDBC to establish a connection with the Greenplum master and obtain the necessary metadata about the cluster and table:

    • number of active segments;

    • table scheme and type;

    • table distribution key;

    • query plan.

  3. Depending on the selected partitioning strategy (which will be discussed later), the number of partitions is calculated and conditions used when loading data into partitions from segments are generated.

  4. Each Spark executor is assigned a data processing task and a corresponding partition.

Data reading algorithm
Data reading algorithm
Data reading algorithm
Data reading algorithm

Data exchange is performed via the mechanism of writable external tables, for each segment simultaneously. Currently, the following partitioning strategies are implemented:

  • According to gp_segment_id. The data is read and distributed across Spark partitions according to its distribution by segments in Greenplum.

  • According to the specified column and the number of partitions. For the specified column, the minimum and maximum values are requested and conditions for reading data from segments are generated. Thus, the data of the corresponding range is loaded into the Spark partitions.

  • According to the specified column. The data is divided according to the unique values of the specified column. In this case, the number of Spark partitions corresponds to the number of unique values of the specified column.

  • According to the specified number of partitions. The data is divided according to the default hash function into the specified number of partitions.

  • According to the specified hash function and the number of partitions. Similar to the previous strategy, with the desired hash function specified.

Data writing

Loading data into Greenplum from Spark occurs in several stages:

  1. The Spark driver initialization.

  2. The Spark driver uses JDBC to establish a connection with the Greenplum master.

  3. Depending on the writing mode, the Spark driver performs initialization actions for loading in Greenplum (creates or clears tables).

  4. Each Spark worker uploads data to Greenplum from its assigned partition.

Data writing algorithm
Data writing algorithm
Data writing algorithm
Data writing algorithm

Data exchange occurs via the mechanism of readable external tables, for each segment simultaneously.

The following writing modes are currently supported:

  • overwrite — either the target table is dropped completely and recreated, or all data is cleaned up.

  • append — data is added to the target table.

  • Writing to a new table — fails if the target table already exists (errorIfExists).

Feature summary

  • Reading data from Greenplum.

  • Writing data to Greenplum using various writing modes:

    • overwrite

    • append

    • errorIfExists

  • Automatic data schema generation.

  • Flexible partitioning.

  • Additional options for creating a target table in Greenplum.

  • Support for push-down operators:

    • pruning columns;

    • push-down filters.

  • Extraction of additional metadata from Greenplum:

    • data distribution scheme;

    • statistics.

  • Optimized execution of count expressions in a query.

  • Execution of custom SQL queries through the Greenplum master.

  • Support for batch mode when loading into Spark.

Analogues

Currently, there is the Spark Greenplum connector from Pivotal.

We have conducted a comparative analysis of the functionality and performance of two connectors. The results are in the table below.

Comparison of ADB Spark and Pivotal Spark Greenplum connectors
Feature Pivotal Spark Greenplum Arenadata ADB Spark

Supported Spark versions

  • 2.3.X

  • 2.4.X

  • 3.0.X

  • 2.3.X

  • 2.4.X

Supported data types

  • numeric

  • string

  • date/time

  • numeric

  • string

  • date/time

  • intervals

  • arrays

Push-down filtering

+

+

Pruning column

+

+

Partitioning

2 ways

5 ways

Writing modes

  • Append

  • Overwrite

  • ErrorIfExists

  • Ignore

  • Append

  • Overwrite

  • ErrorIfExists

Batch mode support in Spark

 — 

+

Collecting statistics to build a query plan in Catalyst

 — 

+

Executing a custom SQL query through the master

 — 

+

Performance

20x faster than JDBC

20x faster than JDBC

Thus, we have created a high-performance two-way ADB Spark connector, which, in turn, has a number of functional advantages compared to its analogue from Pivotal.

A detailed description of the connector with examples of use can be found in the documentation.

Found a mistake? Seleсt text and press Ctrl+Enter to report it