ADB-TO-ADB connector

adb to adb 01 dark
adb to adb 01 light

Introduction

Based on the experience of our product development and support team, users who operate large volumes of company data often use several separate Greenplum clusters.

The motivation for such a decision can be diverse: organizational — different teams, which are the owners of business data, build their data models, process them according to their needs; technical — clusters distributed across various data centers, etc. However, sooner or later, the problem of using data from "neighboring" clusters arises. These can be either one-time scenarios for single queries or the development and deployment of more complex ETL processes. The implementation of such mechanisms can also be different, with their own advantages and disadvantages, based on existing capabilities and limitations.

This article describes the details of our connector implementation for running so-called heterogeneous queries across different Arenadata DB and/or Greenplum clusters — a task that our development team was working on in 2023. This connector allows you to combine different ADB clusters in queries, and at the same time take advantage of establishing connections between segments.

Let’s talk about everything in order.

Existing approaches and solutions

The point of providing the ability to perform heterogeneous queries is to give a user the tools to access data in such a way that, from a data usage perspective, it appears as if it is located in the same data management system. Also, to do this as efficiently and transparently as possible.

What instruments do we have at our disposal?

Platform Extension Framework (PXF)

The first thing that comes to mind is the PXF service, which is widespread among users. PXF is a framework for combining different data sources. Its advantages include:

  • A time-tested solution based on open-source code with the ability to customize it to suit your needs.

  • A set of connectors to popular data sources available out of the box (the Hadoop stack, data sources available via JDBC, cloud storages).

  • The ability to configure monitoring.

There are also a number of disadvantages:

  • The need to support a separate solution based on its own stack.

  • Resources are usually allocated on the same servers where the DBMS itself is deployed.

  • Multiple type conversion and transfers of the same data on the way from representation in the DBMS to the types that PXF itself operates on.

Postgres_fdw extension

The postgres_fdw extension is a foundation or, you could say, a source of inspiration for many other existing connectors.

The connector is physically a PostgreSQL extension, which in the case of Greenplum is installed on the master host.

Initially, the extension is focused on working with remote PostgreSQL, modified within the Greenplum upstream for functioning in a distributed DBMS environment. Interaction is organized through a standard protocol of the libpq library, which allows you to transparently work with Greenplum (of course, in a "master-master" style).

The master-master interaction in postgres_fdw
The "master-master" interaction in postgres_fdw
The master-master interaction in postgres_fdw
The "master-master" interaction in postgres_fdw

In the diagram above, the local cluster (indicated in blue) is represented by a master and a pair of segments, the remote cluster is represented by a master (indicated in green).

For a SELECT query, the interaction process schematically looks like this:

  1. A user creates a SELECT query.

  2. The query goes through the standard stages of parsing, analysis, and planning.

  3. The extension establishes a connection with a remote master, opens a transaction and sends the query (taking into account some nuances that not all filtering conditions (predicates), for example, can be sent to a remote cluster).

  4. The response is processed and either returned to the user, or, in the case of more complex queries, the result is processed in some other way.

A distinctive feature of this approach is the work in the "master-master" style. In this case, segments are involved only in transferring data to/from master. So, there aren’t any interactions between segments of both clusters.

In the case of INSERT queries, everything still goes through the master, but the mechanism of prepared statements is used.

Processing INSERT in postgres_fdw
Processing INSERT in postgres_fdw
Processing INSERT in postgres_fdw
Processing INSERT in postgres_fdw

What are the advantages for those who decide to use the postgres_fdw extension to organize cluster communication:

  • Open-source solution — you can just compile and install it (with the CREATE EXTENSION statement on master).

  • Work in the "master-master" style is stable. Below I will explain what this means using greenplum_fdw as an example.

  • Newer versions of the connector that support the extended FDW interface can partially support push-down of more complex constructions in the form of connections, etc. I will briefly explain this at the end of the article.

There are obvious disadvantages:

  • There is no support for parallel data transfer where segments connect to each other directly.

  • If you try to call DELETE or UPDATE for a table on a remote cluster, the connector will return an error.

Greenplum_fdw extension

From postgres_fdw let’s move on to the big guns — to greemplum_fdw, which is included in the paid enterprise version of Greenplum. I want to note that in this case we were only able to test the beta version, and perhaps some problems inherent in beta were fixed in subsequent versions.

What’s wrong with greenplum_fdw?

The first inconvenience is the hard setting of the number of segments (workers basically). For example, a local cluster is deployed on three primary segments.

Local cluster
Local cluster
Local cluster
Local cluster

The remote Greenplum cluster does not match the number of segments — there are four of them.

Remote cluster
Remote cluster
Remote cluster
Remote cluster

If, when declaring a foreign table on the local cluster, you do not specify the required number of segments (or set it to a number that does not match the number of the remote cluster), then the query fails with an error.

Error when the number of segments does not match
Error when the number of segments does not match
Error when the number of segments does not match
Error when the number of segments does not match

Setting the required number of segments is not a big problem, although getting this information from the owners of another cluster may not always be convenient. Worse, if the cluster configuration changes and a reconfiguration is not performed, queries will stop working until ALTER SERVER is called on the local cluster and the required number of segments is configured.

A more nasty situation may arise if we want to transfer some data from a local cluster to a remote one by inserting it into a foreign table.

Let’s say there is a table on the local cluster with 1100 records, and we request an insert into the table on the remote cluster.

For such an insert request into a foreign table, the planner creates a plan that looks something like this, with a Seq Scan plan node (sequential reading a table’s data) located at the bottom, which receives data from the local table. Then, Redistribute Motion of the data between the segments is performed, the next step is inserting from each segment separately.

Execution plan for INSERT
Execution plan for INSERT
Execution plan for INSERT
Execution plan for INSERT

For experimental purposes, let’s add a notify message of the transaction commit step at the segment level (the NOTICE level message).

The NOTICE messages display a transaction commit event for a given segment
The NOTICE messages display a transaction commit event for a given segment
The NOTICE messages display a transaction commit event for a given segment
The NOTICE messages display a transaction commit event for a given segment

As we can see below, the database managed to commit transactions for two segments, but let’s imagine that some error occurred with the third transaction.

The transaction for process with pid=114235 received an error
The transaction for process with pid=114235 received an error
The transaction for process with pid=114235 received an error
The transaction for process with pid=114235 received an error

While we were doing one INSERT, which should be atomic as users of a relational DBMS with ACID support expect, we get 742 records in the remote table.

Violation of the atomicity principle in greenplum_fdw
Violation of the atomicity principle in greenplum_fdw
Violation of the atomicity principle in greenplum_fdw
Violation of the atomicity principle in greenplum_fdw

I propose to figure out what is the reason for this situation.

Let’s model it as follows. In our case, we have a local cluster of three segments and a master and a remote cluster of a master and two segments.

An INSERT diagram for scenario with an error of insertion in greenplum_fdw
An INSERT diagram for scenario with an error of insertion in greenplum_fdw
An INSERT diagram for scenario with an error of insertion in greenplum_fdw
An INSERT diagram for scenario with an error of insertion in greenplum_fdw
  1. An INSERT query comes from a user.

  2. It goes through the stages of query processing, establishes a connection, and opens individual transactions from segments.

  3. These transactions operate on a master of a remote cluster.

It is important to understand that it’s how postgres_fdw is designed and implemented. It contains an internal pool of connections where a transaction begins when a connection is established. It is committed separately, without information about the status of its neighbors. In our case, at some point, one of the transactions receives an error, and we get the result mentioned above.

As a result, the advantages of the beta version of greenplum_fdw include support for SELECT queries in MPP style, but possible problems with data consistency during insertion cast doubt on the possibility of using the connector as a bidirectional one.

At the same time, a user does not have the opportunity to control these risks: when choosing a "master to master" strategy for insertion, SELECT queries will also have a similar strategy. This will negate the benefit of using a connector that supports parallel work across segments.

Also, SELECT queries are quite strictly tied to configurations in terms of the number of segments.

Parallel retrieve cursors

To understand the internal structure of the adb_fdw connector, you need to be aware of the underlying mechanism of parallel cursors. Cursors are used to retrieve data row by row, in a procedural style. In this regard, parallel cursors are no different from their "ordinary" counterparts. They are declared on the query dispatcher (in terms of Greenplum — GP_ROLE_DISPATCH), but endpoints (more about them later) are created on segments. Their placement depends on the query, or more precisely, on the result of query planning. It is also important to add that separate backend process used to service an endpoint — a point through which data is exchanged.

Let’s look at three simple examples of parallel cursor declarations that lead to different endpoint placement configurations.

Receive data from the master

This configuration of endpoints is driven by the need to collect data on the master, for example, as in this case for sorting (ORDER BY C1).

Collecting data on the master
Collecting data on the master
Collecting data on the master
Collecting data on the master

Receive data from one of the segments

The second option is to receive data from a single segment. For example, if the planner determines that, in accordance with the distribution key, it is necessary to distribute the plan to a specific segment (WHERE C1=1).

Receiving data from one of the segments directly
Receiving data from one of the segments directly
Receiving data from one of the segments directly
Receiving data from one of the segments directly

Receive data from all segments

The third case is receiving data from all segments. For example, getting all columns without any filtering by WHERE conditions. Endpoints are raised on each segment and data can be returned directly from the segments.

Receiving data from all segments directly
Receiving data from all segments directly
Receiving data from all segments directly
Receiving data from all segments directly

Let’s dive a little deeper into the mechanics of the endpoint process of segments. For example, we have a user session during which a query came to the master. It goes through the usual parsing and planning stage.

How parallel cursor endpoints work
How parallel cursor endpoints work
How parallel cursor endpoints work
How parallel cursor endpoints work

The next step is that the plan or some part of the plan is dispatched to the segment and processed within the backend process.

In our case, since this is a request to create a parallel cursor for the underlying SELECT query, the backend creates the necessary wiring to serve the client’s data receiving channel.

To do this, in particular, a message queue is created in shared memory to exchange data with the separate backend process that is used by the client to receive data. For this purpose, the client starts a special utility connection, within which only the retrieval of parallel cursor data is allowed. In the diagram above, this connection is marked with the setting gp_retrieve_conn = true.

The adb_fdw connector is based on this mechanism.

ADB-TO-ADB connector (adb_fdw)

To create competitive advantages at the current stage of development, some alternative technical solutions have been implemented in our adb_fdw connector.

First, let’s look at it from a user perspective.

The entry point to the external cluster is the SERVER definition — this is the master of the remote cluster. The mpp_execute keyword specified as master does not negate the benefits of MPP for executing SELECT queries. Our connector determines the required configuration of remote segments. The master value is required for the insertion to work only in the "master-master" mode.

You can also set the number of segments or allow the connector to automatically determine the number of segments based on the query plan on a remote cluster, which is quite convenient.

Next, as usual for FDW connectors, a custom user mapping is specified, and a foreign table is declared.

Declaration of SERVER, FOREIGN TABLE, and user mapping
Declaration of SERVER, FOREIGN TABLE, and user mapping
Declaration of SERVER, FOREIGN TABLE, and user mapping
Declaration of SERVER, FOREIGN TABLE, and user mapping

What are the features of our connector?

SELECT queries

Processing SELECT queries in adb_fdw
Processing SELECT queries in adb_fdw
Processing SELECT queries in adb_fdw
Processing SELECT queries in adb_fdw

The cluster has the same configuration: 3 local segments and 2 remote ones. A user makes a SELECT query for three columns without filters. A request in the form of creating a parallel cursor arrives at the remote master. The remote master creates endpoints and returns service information about these endpoints to the local master when it calls the gp_get_endpoints() function.

Since we received information that there are two endpoints, then two worker processes will be enough for us, which are started in the form of backend processes on the local cluster (white rectangles inside green ones in the diagram above). Segments start a service connection — the so-called retrieve session and begin to receive data directly from the segments in parallel, transferring it to the master, since in our case this is required by the query.

INSERT queries

Currently, there is no reliable mechanism to ensure insert integrity during parallel connector operation. We can expect such support in future versions of the database kernel, but there is still no ready-made mechanism.

Processing INSERT queries in adb_fdw
Processing INSERT queries in adb_fdw
Processing INSERT queries in adb_fdw
Processing INSERT queries in adb_fdw

In our connector, the integrity has a high priority, so the connector supports postgres_fdw style insertion in the "master-master" mode.

Future plans

In the recent versions of PostgreSQL and, as a result, in Greenplum 7, the FDW interface for accessing external data sources has been extended with several new functions and new parameters for existing functions.

The essence of these changes is to provide push-down support for both connections and aggregate functions. This takes such connectors to a new level in terms of the ability to build more advanced plans, reduce the amount of transferred data and, in general, speed up query execution.

This interesting and complex topic is the subject of a separate article. Now I would like to briefly touch on the difficulties and pitfalls of these expected features, which we plan to implement in the next versions of the connector as part of Arenadata DB version 7X.

Limited predicate push-down opportunities

Not all predicates can be transferred to a remote cluster. There are restrictions on the possibility of passing an expression with specified sorting rules (COLLATION). The motivation for this type of restriction is that you cannot rely on the same environment on a remote cluster for the non-default sorting rules to work exactly as they do on a local cluster.

With explicitly specified sorting rules (i.e., other than default collation with OID = 100), only expressions that are associated with a declaration for a foreign table (FOREIGN TABLE) are allowed to be sent to the remote cluster. Thus, if a sorting rule for a column is explicitly specified when declaring a foreign table, this condition is a safe condition to send. On the contrary, if some sorting rule other than default is set for an expression, this is considered unsafe and is processed locally. For example, for the following expression comparing a column of a foreign table with a constant for which a sorting rule is specified, the filter is applied locally, after reading the entire table:

adb=# explain select count(c1) from foreign_table as ft where ft.c1 like 'foo' COLLATE "POSIX";
QUERY PLAN
-------------------------------------------------------------------
 Aggregate  (cost=3897.97..3897.98 rows=1 width=8)
   ->  Foreign Scan on ft  (cost=100.00..3897.72 rows=100 width=16)
         Filter: (c1 ~~ 'foo'::text COLLATE "POSIX")
 Optimizer: Postgres-based planner
(4 rows)

In case of a default rule, push-down occurs to the remote cluster:

adb=# explain select count(ft.c1) from foreign_table ft where ft.c1 like 'foo';
                      QUERY PLAN
------------------------------------------------------
 Foreign Scan  (cost=1894.20..1894.23 rows=1 width=8)
   Relations: Aggregate on (public.ft)
 Optimizer: Postgres-based planner
(3 rows)

Also, expressions with volatile functions (VOLATILE) are not allowed to be sent to a remote cluster. The well known examples of functions are random() or now(). For such queries, the condition is processed locally:

adb=# explain select * from foreign_table as ft where ft.c1 >= random();
                          QUERY PLAN
---------------------------------------------------------------
 Foreign Scan on ft  (cost=100.00..4398.60 rows=33392 width=16)
   Filter: ((c1)::double precision >= random())
 Optimizer: Postgres-based planner

Joins push-down

The ability to delegate table joins to an external cluster (the so-called join push-down), if they appear in the query, looks extremely useful. Let’s start with the main restrictions:

  • The basic requirement is that the tables must be associated with the same external server (the SERVER definition).

  • Join push-down is currently only supported for inner (JOIN_INNER), left and right outer joins (JOIN_LEFT, JOIN_RIGHT), and full joins (JOIN_FULL). Further development involves SEMI- and ANTI- joins to support SQL constructs like NOT IN and ANY.

  • Inner and outer parts of a join clause must allow filter and join predicates push-down. If the predicate cannot be passed to the external cluster, then push-down is not possible.

For example, here push-down is possible:

adb=# explain select count(*) from ft_a inner join ft_b on ft_a.id = ft_b.id where ft_a.ts >= current_date::timestamp;
QUERY PLAN
--------------------------------------------------------------
 Foreign Scan  (cost=350.53..350.56 rows=1 width=8)
   Relations: Aggregate on ((public.ft_a) INNER JOIN (public.ft_b))
 Optimizer: Postgres-based planner
(3 rows)

In this case, the expression ft_a.ts >= current_date::timestamp can be converted to a constant, passed in this form to the remote cluster and act as a sampling condition.

However, in case of the following query, the condition ft_a.ts >= current_timestamp cannot be passed, since the current_timestamp function is not IMMUTABLE (to be accurate, it is STABLE). This results in two separate Foreign Scan, a local connection, and applying a condition (Filter) to Foreign Scan:

adb=# explain select count(*) from ft_a inner join ft_b on ft_a.id = ft_b.id where ft_a.ts >= current_timestamp;
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 Aggregate  (cost=664.92..664.93 rows=1 width=8)
   ->  Hash Join  (cost=239.04..658.24 rows=2670 width=0)
         Hash Cond: (ft_a.id = ft_b.id)
         ->  Foreign Scan on ft_a  (cost=100.00..480.00 rows=3333 width=12)
               Filter: (ts >= '2024-04-07 16:49:38.585287+03'::timestamp with time zone)
         ->  Hash  (cost=129.03..129.03 rows=801 width=4)
               ->  Foreign Scan on ft_b  (cost=100.00..129.03 rows=801 width=4)
 Optimizer: Postgres-based planner
(8 rows)

Aggregates push-down

To transfer an aggregate function on a remote cluster for execution, several conditions should be met:

  • If the selection uses a predicate that cannot be sent to a remote cluster, such aggregation can only be performed locally, since the predicate condition must be applied only during the selection process, and not after. For example, the condition ft_a.id >= random() does not allow push-down of the count(*) aggregate function, while ft_a.ts >= current_date::timestamp does.

  • This must be either a simple aggregate function, such as count, min, max, etc., or a function that supports partial aggregation. Also, from the execution context perspective of the partial aggregation function, this should be the initial phase of the partial aggregation.

  • For the second case of partial aggregation, the following conditions should be met:

    • The aggregate function must not be a sorting function (Ordered-Set Aggregate Functions are not supported). Percentiles are included in this category because they require an ordered set of values to calculate them.

    • An aggregate function must not have a final function definition (aggfinalfn). So, its use does not require a calculation phase for the resulting value.

    • If aggfinalfn is declared together with the aggcombinefn function, then only a certain set of such functions are supported (mainly variations of avg and sum).

  • Expressions with GROUPING SETS are not supported.

What is the meaning of such restrictions? For distributed DBMS, calculating simple aggregates is not particularly difficult. Individual values can be counted on segments and the final calculation can be made in the resulting function. It is easy to imagine such a division into subtasks for the min, max, count, and other similar functions.

With the avg function, it is more interesting: for avg, when returning data from a segment, in addition to the calculated average value, the total number of rows for which this average was calculated is also required. This is the only way the final function can calculate the final value. For this reason, avg can be dispatched into segments as an array[count(), sum()] expression (for floating point numbers the expression is a little more complicated).

Results

Compared to competing solutions, the adb_fdw connector has a number of advantages that distinguish it from the considered alternatives:

  • As greenplum_fdw, our solution supports parallel work through segments in SELECT queries where the queries allow it, but does it in a more user-friendly way: there is no need to track the number of segments.

  • The "master-master" interaction with INSERT queries allows us to avoid the data integrity problems that can occur in greenplum_fdw.

  • In adb-fdw, the situation with the current lack of support for DELETE and UPDATE queries is handled more accurately.

Joins and aggregates push-down support in the next version of the adb_fdw connector will expand the scope of its application, reduce the amount of transmitted data, and increase the speed of interaction in the most common use cases.

Found a mistake? Seleсt text and press Ctrl+Enter to report it