How and why we developed a Spark connector for Greenplum
Hello everyone! My name is Andrey and I work as a system architect at Arenadata. In this article, I will tell you how and why we developed our own tool for exchanging data between Arenadata DB (an analytical MPP DBMS based on Greenplum) and Apache Spark (a framework for distributed processing of data that is a part of the Arenadata Hadoop ecosystem).
Since Greenplum and Hadoop are the basic elements of our stack, we often have to deal with tasks of processing and transferring information between them. Previously, we solved this problem partially using Platform Extension Framework (PXF) — but this deprives us of all benefits that Spark offers, which are quite significant.
We decided to create ADB Spark Connector based on an HTTP server that implements the gpfdist protocol. In the article below, I will tell you why we chose this course and which architecture we eventually settled on.
A little theory
Arenadata DB is a distributed DBMS built on Greenplum, which belongs to the class of MPP systems. MPP (massively parallel processing) is a class of DBMS that distribute stored data across multiple computing nodes for parallel processing large amounts of data. Each node has its own storage and computing resources that allow it to execute a part of the overall database query. The logic for dividing a table into segments is specified by a distribution key (field). For each individual column in a table, you can set its own compression type and level.
MPP databases are well suited as analytical data storages because they:
-
scale horizontally;
-
handle OLAP load well;
-
allow you to download data at high speed.
Let’s look at the ways to exchange data with ADB:
-
Using the JDBC driver through the ADB master — standard Spark JDBC connector supports this method out of the box.
Benefits Disadvantages Ready Spark JDBC Source
Bottleneck when loading through a single master node
No need to support new API versions
Easy to run custom SQL code
-
Using COPY on the master.
Benefits Disadvantages Native representation in the Postgres JDBC driver
Bottleneck when loading through a single master node
Master node is responsible for distributing records (according to the distribution key)
Easy to run custom SQL code
-
Using distributed COPY.
Benefits Disadvantages Possibility of parallel writing to segments
Need to implement an algorithm for distributing records between segments according to the distribution key
Need to monitor the release of resources on all Spark worker machines
-
Using gpfdist — an HTTP server for parallel data exchange with GP segments.
Benefits Disadvantages Ready-made solution from the main contributor
Need to install gpfdist on all Spark worker machines
Writing directly to segments, bypassing the master, which increases the read and write bandwith
Lack of control over the process of loading records
Creating named pipes manually
No need to maintain your code when the protocol changes
Need to monitor the release of resources on all Spark worker machines
-
Using an HTTP server that implements the gpfdist protocol (readable, writable).
Benefits Disadvantages Writing in parallel to segments, bypassing the master
Need to implement the gpfdist protocol and modify it for different versions of Greenplum
No need to create and destroy resources, including named pipes on Spark workers
Full control over data overload task management
Ability to implement flexible partitioning algorithms when reading from Greenplum into Spark
Select a base for the connector
We decided to create a connector based on an HTTP server that implements the gpfdist protocol and developed two implementations of the protocol based on:
-
Akka-HTTP
-
Finagle
The version with Finagle performed better when there were multiple simultaneous sessions from ADB segments, so we settled on it.
Now let’s look at this task from the Apache Spark side. The high-level architecture of the Spark query compilation process follows the traditional DBMS approach that involves converting a query into a logical plan, optimizing the logical plan into a physical one, and executing it on the worker nodes.
Let’s look at the tools Spark provides for creating connectors to various data sources.
Data Source API was introduced in Spark 1.3 along with the Dataframe abstraction. Since that release, Spark has undergone great changes. With version 2.0, a new DataSet abstraction came and API was rethought as a result. API v1 limitations:
-
mixed from low- and high-level APIs;
-
difficult to do Push-down operators;
-
difficult to transmit information about partitions;
-
no support for transactional writing;
-
no support for column mode;
-
no support for streaming.
With version 2.3, new API (known as Data Source v2) was released. It does not have restrictions mentioned above and its characteristic feature is the transition of Scala traits to Java interfaces for better compatibility.
Let’s take a closer look at what happens at each of the presented stages in terms of Data Source API v2.
A data source is represented as a DataSourceV2Relation instance in a logical plan. During planning, DataSourceV2Strategy converts a relation to a DataSourceV2ScanExec instance. The latter creates a DataSourceRDD instance, which is used to actually compute results from multiple partitions in parallel.
Logical plan — at this stage, the goal is to create a DataSourceV2Relation instance from the options provided by the client. The process is initiated by loading a Dataset.
Physical plan — at this stage, the goal is to convert DataSourceV2Relation to DataSourceV2ScanExec. The process is initiated by performing an action on the Dataset created in the previous step.
Query execution — at this stage, the goal is to get data from the partitions of RDD created by DataSourceV2ScanExec and collect rows from all partitions.
Based on the above, we decided to implement a connector using Datasource API v2.
Architecture
Each Spark application consists of a controlling process (driver) and a set of distributed worker processes (executors). A general component diagram of the interaction between a Spark application and a Greenplum cluster is shown below.
Data reading
Loading data into Spark from Greenplum consists of several steps:
-
The Spark driver initialization.
-
The Spark driver uses JDBC to establish a connection with the Greenplum master and obtain the necessary metadata about the cluster and table:
-
number of active segments;
-
table scheme and type;
-
table distribution key;
-
query plan.
-
-
Depending on the selected partitioning strategy (which will be discussed later), the number of partitions is calculated and conditions used when loading data into partitions from segments are generated.
-
Each Spark executor is assigned a data processing task and a corresponding partition.
Data exchange is performed via the mechanism of writable external tables, for each segment simultaneously. Currently, the following partitioning strategies are implemented:
-
According to gp_segment_id. The data is read and distributed across Spark partitions according to its distribution by segments in Greenplum.
-
According to the specified column and the number of partitions. For the specified column, the minimum and maximum values are requested and conditions for reading data from segments are generated. Thus, the data of the corresponding range is loaded into the Spark partitions.
-
According to the specified column. The data is divided according to the unique values of the specified column. In this case, the number of Spark partitions corresponds to the number of unique values of the specified column.
-
According to the specified number of partitions. The data is divided according to the default hash function into the specified number of partitions.
-
According to the specified hash function and the number of partitions. Similar to the previous strategy, with the desired hash function specified.
Data writing
Loading data into Greenplum from Spark occurs in several stages:
-
The Spark driver initialization.
-
The Spark driver uses JDBC to establish a connection with the Greenplum master.
-
Depending on the writing mode, the Spark driver performs initialization actions for loading in Greenplum (creates or clears tables).
-
Each Spark worker uploads data to Greenplum from its assigned partition.
Data exchange occurs via the mechanism of readable external tables, for each segment simultaneously.
The following writing modes are currently supported:
-
overwrite — either the target table is dropped completely and recreated, or all data is cleaned up.
-
append — data is added to the target table.
-
Writing to a new table — fails if the target table already exists (errorIfExists).
Feature summary
-
Reading data from Greenplum.
-
Writing data to Greenplum using various writing modes:
-
overwrite
-
append
-
errorIfExists
-
-
Automatic data schema generation.
-
Flexible partitioning.
-
Additional options for creating a target table in Greenplum.
-
Support for push-down operators:
-
pruning columns;
-
push-down filters.
-
-
Extraction of additional metadata from Greenplum:
-
data distribution scheme;
-
statistics.
-
-
Optimized execution of count expressions in a query.
-
Execution of custom SQL queries through the Greenplum master.
-
Support for batch mode when loading into Spark.
Analogues
Currently, there is the Spark Greenplum connector from Pivotal.
We have conducted a comparative analysis of the functionality and performance of two connectors. The results are in the table below.
Feature | Pivotal Spark Greenplum | Arenadata ADB Spark |
---|---|---|
Supported Spark versions |
|
|
Supported data types |
|
|
Push-down filtering |
+ |
+ |
Pruning column |
+ |
+ |
Partitioning |
2 ways |
5 ways |
Writing modes |
|
|
Batch mode support in Spark |
— |
+ |
Collecting statistics to build a query plan in Catalyst |
— |
+ |
Executing a custom SQL query through the master |
— |
+ |
Performance |
20x faster than JDBC |
20x faster than JDBC |
Thus, we have created a high-performance two-way ADB Spark connector, which, in turn, has a number of functional advantages compared to its analogue from Pivotal.
A detailed description of the connector with examples of use can be found in the documentation.