Greenplum vs Citus. Part 1

greenplum citus part1 cat dark
greenplum citus part1 cat light

This article opens a series of materials dedicated to a comparative review of distributed DBMS technologies.

For this review, our team has selected a number of distributed DBMSs to compare in terms of how they implement similar features at different levels of detail, as well as to make a high-level comparison based on various criteria.

The choice of these DBMSs is determined by our current interests and needs for comparison. Although some of the technologies can be classified as distributed SQL engines (for example, Apache Impala and Apache Trino).

As our development team works on Greenplum, we will focus on comparing it with other database management systems. The peculiarity of these articles will be a relatively deep dive into the details of those aspects that will be compared.

Objectives of this series of articles

  • To introduce Greenplum users to similar technologies. The article is aimed at experienced users, as the explanations will provide details on the technologies being compared; it is assumed that a reader is familiar with the implementation details of similar functionality in Greenplum.

  • To provide an idea of ​​what underlies the technologies being compared and which features, advantages, and, possibly, disadvantages they have.

Completeness of coverage

  • I will highlight the aspects that interested me as an author who has knowledge of some parts of Greenplum and explores the structure of similar parts in the technologies under discussion. I do not claim to be an expert in these technologies, consider my theses as the result of research.

  • Where possible, I will provide comparative explanations of how similar concepts are implemented in the technology under consideration.

  • It didn’t seem necessary to reprint a readme from GitHub and other sources, but some basic concepts needed to be introduced. I will explain these concepts in my own words according to my understanding of the subject. For more details, please visit specialized resources.

Let’s start the series of articles with a comparison with a solution such as Citus.

Why Citus?

  1. Conceptually, Citus and Greenplum are quite close. Yes, the approach of implementing Citus as a PostgreSQL extension against the "patched" PostgreSQL in Greenplum brings some strengths to Citus, but at the same time limits its capabilities. We will discuss this further.

  2. Apparently, the developers of Citus were initially focused on solving the problem of horizontal scaling existing solutions for PostgreSQL end users: add shards, create tables distributed across a cluster/shards, redistribute data between them, and get benefits without migrating the existing solution to any other technological stack. These ideas can be seen in the types of Citus planners: from relatively simple ones that are focused on redirecting queries to specific shards, to more advanced planners tailored for analytics. These more advanced Citus planners aimed at handling analytical loads are interesting for comparison with Greenplum in their common niche.

  3. Greenplum and Citus can be easily compared based on their similar concepts: distributed and reference (reference in Citus, replicated in Greenplum) tables, reading execution plans, and the nuances of the planner (planner/optimizer) and direct execution of queries (executor).

Citus from user point of view

For a user from the PostgreSQL world, this technology is an extension (PostgreSQL extension) installed on individual instances of a PostgreSQL server. This extension provides the ability to transform individual PostgreSQL instances into a distributed cluster that will host parts of table data (shards), which the user can distribute based on a specific attribute (sharding key).

The extension also adds a columnar storage as a separate extension. Users can manage all aspects of controlling Citus cluster using UDF functions.

If you compare Citus with Greenplum in terms of user perception, some similarities can be seen, but the underlying approaches are, in fact, completely different.

Citus provides a distributed engine on top of initially self-contained PostgreSQL instances. Who and how administers these instances is largely outside the scope of Citus' interests and responsibilities.

Greenplum, similarly using separate instances on segments and master, still considers segment instances to be closed to some extent, but also bears greater responsibility for them (at least from the point of view of built-in HA).

In Citus, you can connect to any node and start working with distributed tables (due to metadata synchronization within 2PC). The exceptions are operations that can only be performed from the coordinator. These are various manipulations with tables of Citus itself, for example, the creation of distributed tables, some ALTER operations, operations with schemes, etc.

In Greenplum, in terms of connections, we are limited to the master. The possibility of connecting, for example, to a separate segment within a special session does not count in this case.

From a user’s point of view, I would call this a strong feature of Citus. However, this approach and its specific implementation in Citus have a downside, but more on that later.

An undoubted advantage in terms of usage is the availability of different PostgreSQL versions for Citus. Due to its implementation as a PostgreSQL extension, it has access to the latest versions of the core. Currently, Greenplum only supports PostgreSQL version 12 in 7X.

Among the specifics of using Citrus, there is also the need to know about the presence of service shard tables hidden from users, which are located in the same schemas as distributed tables. In fact, the pg_class system catalog contains obviously more rows than are visible to users.

Our large corporate users may have hundreds of thousands of tables (mostly partitioned tables). Splitting tables into shards will further increase the number of files, as well as the total amount of metadata. In addition, in the general case, the extension has to hide these tables, including in pg_class. Moreover, it does this at a fairly low level when parsing the query tree, discarding such tables. The developers claim the following: "[…​] so you might expect to see them when connecting to a worker node and running \d. While this was previously the case, it caused confusion among users and also breaks tools like pg_dump ".

For example, for an initial partitioned table with two partitions, creating a distributed table with the default number of shards will create 66 shard tables (distributed across the cluster nodes):

\dt
               List of relations
 Schema |       Name        | Type  |  Owner
--------+-------------------+-------+----------
 andrey | part_table_part01 | table | postgres
 andrey | part_table_part02 | table | postgres
(2 rows)
SELECT create_distributed_table('part_table', 'id');
 create_distributed_table
--------------------------

(1 row)
SELECT logicalrelid, COUNT(*) AS shard_table_count FROM pg_dist_shard GROUP BY logicalrelid;
   logicalrelid    | shard_table_count
-------------------+-------------
 part_table        |          32
 part_table_part01 |          32
 part_table_part02 |          32
(3 rows)

Citus architecture and key concepts

Citus works with several types of tables:

  • Tables managed by Citus. Once Citus realizes that this is a distributed table it manages (it understands this from the presence of an entry in the pg_dist_partition internal metadata table, see an example of such an entry below), planning queries to these tables is then carried out independently.

  • Original, so-called shell tables. A user can convert a PostgreSQL table to a Citus table by calling Citus extension functions (for example, create_distributed_table).

An example of an entry in pg_dist_partition:

SELECT logicalrelid, partmethod, partkey, colocationid FROM pg_dist_partition LIMIT 1;
 logicalrelid | partmethod |                                                         partkey                                                          | colocationid
--------------+------------+--------------------------------------------------------------------------------------------------------------------------+--------------
 dist_table   | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} |            1
(1 row)

In this case, the name of the logical table (former "regular" PostgreSQL local table) is contained in the logicalrelid field. For this table, the distribution is performed by hash (the partmethod column) and the partkey column contains service information on the distribution key.

In Greenplum, such division does not exist. For append optimized tables, there are so-called aux tables, but they are not hidden in any way and are visible to users and/or administrator of a database. As a result, there is no need for explicit transformation and implicit copying of the original data by calling any special functions.

Tables managed by Citus have the following types:

  • Shell table. If a user converts a regular PostgreSQL table to a Citus table, the original table becomes a shell table hidden from the user. This table, along with Citus metadata, will be used in planning to convert queries to local tables into a distributed plan that eventually works with Citus shard tables.

  • Shard table. Essentially, it is a regular PostgreSQL table that contains data from a shell table. Data from the shell table is copied at the stage of creating the Citus table. By default, the shell table is divided into 32 shard tables, but the number of shards can be changed. As mentioned earlier, shard tables are not visible to users (including in catalog queries), but there is a GUC parameter that controls this visibility: citus.override_table_visibility.

  • Distributed table. A table distributed by some sharding key. Choosing a sharding key is the main (and, in my opinion, limiting) element of effective work with Citus. A sharding key determines the placement of a record in a specific shard. A shard has an associated range, which determines which keys will be placed on that shard (this information is contained in the pg_dist_shard service table, see the example below). There are several sharding schemes, but for brevity, we will consider the distribution by hash (hash-distributed tables). Tables with the same range will be placed on the same nodes to ensure so-called co-located placement. Shards with the same hash range are placed on the same nodes to ensure locality of table join operations by key/hash. This condition is also met in the case of data redistribution (rebalance).

    Data distribution by hash
    Data distribution by hash
    Data distribution by hash
    Data distribution by hash

    This shard placement will allow Citus to implement the most efficient join scheme without redistributing table data and provide a user with the most complete set of possible data operations within SQL queries. What happens when a user does not want or cannot support this placement scheme in practice — we will consider this in a separate section dedicated to the details of planning and executing queries. Yes, a distribution key can consist of only one not null column, since the create_distributed_table function takes the distribution_column argument that implies a single column. Also, for tables related in some way (logically), you can specify one placement group.

  • Reference table. A table with a single shard that is replicated to all nodes in a cluster. As the name suggests, such tables are suitable for metrics in terms of data storages. They are a direct analogue of replicated tables in Greenplum.

  • Local table. Do not confuse with a regular PostgreSQL table. To support the ability for a user to query from any node in the cluster that uses a PostgreSQL table on the coordinator node, Citus requires the table metadata to be available on the other nodes. It is also used in scenarios of FK constraints between local tables and reference tables.

  • Single shard table. It is used for schema-based sharding, will not be discussed in detail in this article.

Features of query planning

Citus implements so-called multi-level approach to planning queries. Citus integrates into the planning process by using the callback functions available to extensions: planner_hook, set_rel_pathlist_hook, get_relation_info_hook, set_join_pathlist_hook.

The entry point for planning from the Citus perspective is the distributed_planner implementation. For the Citus planner to be included in the process, at least one Citus table should be used in a query. Whether a table corresponds to this requirement is determined by the pg_dist_partition internal system table.

The planner is divided into several levels, which are activated depending on the structure (one could even say, on the conditional "complexity") of a query:

  • fast path planner

  • router planner

  • recursive planner

  • logical planner/optimizer

Each next level implements more complex or high-level (grouping, sorting, etc.) approaches to query planning.

Fast path planner

Fast path planner is limited to planning queries that essentially affect a single shard (or use reference tables).

Also, a query:

  • should not be INSERT …​ SELECT;

  • should not use CTE, subqueries, operations of the UNION/UNION ALL type;

  • shard selection predicate should appear only in the WHERE condition and be the only condition on the comparison of the dist_key = const form, where dist_key is the distribution key;

  • for newer versions of PostgreSQL, MERGE queries also fall within these restrictions.

This planner type is focused on the OLTP load, when a query can be redirected to a single shard. In this planning option, standard_planner is not called. According to the documentation, developers wanted to avoid unnecessary overhead associated with planning by the standard PostgreSQL planner. This option is well suited for schemes using sharding by some key for so-called Multi-tenant applications. For example, when all operations with tables are limited to a certain shard that corresponds to the user id.

Router planner

Router planner extends fast path planner with the ability to query multiple tables, but still within the boundaries of a single shard, according to filtering conditions on dist_key. As described, Citus stores the actual table data in separate shard tables, which are generally not visible to users. At this stage, the planner replaces the original distributed table visible to users with shard tables, taking into account their distribution across the nodes. This type of planner can already plan subqueries and CTE.

For example, for the case where the a and b tables are sharded by a single column c1, the following query can be planned as follows:

EXPLAIN (COSTS OFF, VERBOSE ON) SELECT a.c1,
  (SELECT COUNT(*) FROM b
   WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a WHERE a.c1 = 1;
                           QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive)
   Output: remote_scan.c1, remote_scan.count
   Task Count: 1 (1)
   Tasks Shown: All
   ->  Task
         Query: SELECT c1, (SELECT count(*) AS count FROM public.b_102332 b WHERE ((b.c1 OPERATOR(pg_catalog.=) 1) AND (b.c1 OPERATOR(pg_catalog.=) a.c1))) AS count FROM public.a_102300 a WHERE (c1 OPERATOR(pg_catalog.=) 1)
         Node: host=citus_host port=5432 dbname=citus_db
         ->  Seq Scan on public.a_102300 a
               Output: a.c1, (SubPlan 1)
               Filter: (a.c1 = 1)
               SubPlan 1 (2)
                 ->  Aggregate
                       Output: count(*)
                       ->  Result
                             One-Time Filter: (a.c1 = 1)
                             ->  Seq Scan on public.b_102332 b (3)
                                   Output: b.c1, b.c2, b.c3
                                   Filter: (b.c1 = 1)

What you can pay attention to:

In the line marked with the number 3, you can see Seq Scan on the actual physical table public.b_102332 (shard data). As we mentioned earlier, this table is generally not visible to users, but it appears in the plan as one of 32 shards of the b table.

Since the data in the b table is sharded by the c1 column and the b.c1 = 1 condition is specified in the filter, the planner can use router planner (fast path planner cannot be used because the query contains a subquery) and redirects the query to only one shard.

In line 2, you can see that this part of the query is planned as a subplan (SubPlan 1) and, from the perspective of the actual execution of the query, can be performed by one task (line 1, Task Count: 1). We will talk about tasks (Task) in more detail in the section on the next type of planner — recursive planning.

If we compare the same query in Greenplum, we can see the following similarities and differences:

                          QUERY PLAN
----------------------------------------------------------------
 Gather Motion 32:1  (slice1; segments: 32) (1)
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result
               Output: (count()), a.c1
               ->  Hash Left Join (2)
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     ->  Seq Scan on tpcds.a
                           Output: a.c1
                           Filter: (a.c1 = 1)
                     ->  Hash (3)
                           Output: (count()), b.c1
                           ->  GroupAggregate
                                 Output: count(), b.c1 (4)
                                 Group Key: b.c1
                                 ->  Sort
                                       Output: b.c1
                                       Sort Key: b.c1
                                       ->  Seq Scan on tpcds.b (5)
                                             Output: b.c1
                                             Filter: (b.c1 = 1) (6)
 Optimizer: Pivotal Optimizer (GPORCA)

Greenplum does not distinguish this scenario in terms of redirection to a single segment — the plan includes Gather Motion 32:1 (line marked with the number 1), which collects data from segments from underlying nodes. Due to the filter (line 6), the amount of data can be reduced in this case, but formally the plan will not differ for any number of rows. The tables in Greenplum for this query are distributed on a.c1 and b.c1, and this allows the planner, after collecting data from the Seq Scan node (line 5), to immediately calculate the count(), b.c1 aggregation function (line 4), then build a hash table from this data (line 3), and perform a local (within the segment) Hash Left Join (line 2).

In fact, the Citus plan seems to be more optimal for this scenario, since a query is immediately redirected to the required shard (which is 1/32th of the total data amount of the entire table by default), without waiting for other nodes. Of course, this affects the execution time:

// Citus

$ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;'
real	0m0.577s
user	0m0.003s
sys	0m0.005s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;'
real	0m1.378s
user	0m0.001s
sys	0m0.004s

However, the situation changes significantly in the case of such a query (in this case, a more advanced recursive planner is already being used, which will be discussed in the next section.):

SELECT a.c1,
  (SELECT COUNT(*)
   FROM b
   WHERE b.c1 > 100 AND b.c1 = a.c1)
FROM a WHERE a.c1 > 19990000;
                           QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=12) (actual time=18601.915..18603.189 rows=10000 loops=1)
   Task Count: 32
   Tuple data received from nodes: 117 kB
   Tasks Shown: One of 32
   ->  Task
         Tuple data received from node: 3852 bytes
         Node: host=citus_host port=5432 dbname=citus_db
         ->  Seq Scan on a_102300 a  (cost=0.00..3948635.00 rows=308 width=12) (actual time=187.565..18594.736 rows=321 loops=1)
               Filter: (c1 > 19990000)
               Rows Removed by Filter: 626133
               SubPlan 1 (1)
                 ->  Aggregate  (cost=12783.81..12783.82 rows=1 width=8) (actual time=57.572..57.572 rows=1 loops=321) (2)
                       ->  Seq Scan on b_102332 b  (cost=0.00..12783.81 rows=1 width=0) (actual time=57.549..57.567 rows=1 loops=321)
                             Filter: ((c1 > 100) AND (c1 = a.c1))
                             Rows Removed by Filter: 626453 (3)
             Planning Time: 0.261 ms
             Execution Time: 18595.255 ms
 Planning Time: 3.022 ms
 Execution Time: 18604.370 ms

Greenplum has a more advanced plan, just look at the lines marked with the numbers 1 and 3:

                                  QUERY PLAN
------------------------------------------------------------------------------
Gather Motion 32:1  (slice1; segments: 32) (actual time=116.323..406.430 rows=1010000 loops=1)
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result (actual time=116.722..141.599 rows=31929 loops=1)
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result (actual time=116.722..137.731 rows=31929 loops=1)
               Output: (count()), a.c1
               ->  Hash Left Join (actual time=116.721..134.325 rows=31929 loops=1) (1)
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     Executor Memory: 39454kB  Segments: 32  Max: 1248kB (segment 13)
                     work_mem: 39454kB  Segments: 32  Max: 1248kB (segment 13)  Workfile: (0 spilling)
                     Extra Text: (seg13)  Hash chain length 1.1 avg, 4 max, using 30053 of 262144 buckets.Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
                     ->  Seq Scan on tpcds.a (actual time=40.173..44.500 rows=31929 loops=1)
                           Output: a.c1
                           Filter: (a.c1 > 19990000)
                     ->  Hash (actual time=76.270..76.270 rows=31929 loops=1)
                           Output: (count()), b.c1
                           ->  HashAggregate (actual time=65.918..69.498 rows=31929 loops=1)
                                 Output: count(), b.c1 (2)
                                 Group Key: b.c1 (3)
                                 Executor Memory: 55335kB  Segments: 32  Max: 1738kB (segment 0)
                                 Extra Text: (seg13)  Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
                                 ->  Seq Scan on tpcds.b (actual time=54.383..59.257 rows=31929 loops=1)
                                       Output: b.c1
                                       Filter: ((b.c1 > 100) AND (b.c1 > 19990000)) (4)
 Planning time: 6.827 ms
   (slice0)    Executor memory: 151K bytes.
   (slice1)    Executor memory: 7933K bytes avg x 32 workers, 7941K bytes max (seg0).  Work_mem: 1248K bytes max.
 Memory used:  122880kB
 Optimizer: Pivotal Optimizer (GPORCA)
 Execution time: 464.998 ms

The first significant difference is that Greenplum calculated the COUNT aggregate and grouped the results by the b.c1 column (lines 2 and 3). The second difference is that GPORCA pushed the filter condition (b.c1 > 19990000) deeper into the plan (line 4), which significantly reduced the data selection from the b table. At the same time, Hash Left Join was used for joining (line 1) — actually, passing through the a table with the condition (a.c1 > 19990000), we get the desired result from the hash table (line 3).

What Citus does: in fact, the Seq Scan node of sequential reading on the a table calls the SubPlan 1 subplan (line 1) for each row, and the filter condition (c1 > 100) of reading on the b table (line 3) leads to a full read of the b table (discarding the first 100 rows does not count). This happens 321 times, based on the value of the loops=321 counter (line 2), for each of the 32 shards.

It is not difficult to imagine that this would lead to a significant difference in execution time. Even for a small number of rows in the a and b tables (21 million rows each), the difference in time was:

// Citus

$ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null
real	0m16.556s
user	0m0.013s
sys	0m0.004s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null
real	0m0.319s
user	0m0.002s
sys	0m0.004s

If tables are not sharded by the same column (in my case, I recreated the tables and sharded one of them by a.c1 and the second one by b.c2), then the attempt to execute such a query will fail:

SELECT a.c1,
  (SELECT COUNT(*) FROM b
   WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a
WHERE a.c1 = 1;
ERROR:  complex joins are only supported when all distributed tables are co-located and joined on their distribution columns

Greenplum, in turn, does not experience any difficulties here — as we said earlier, the plan will be generally similar to the original one (for the case of distribution by one column), but the Redistribute Motion node is added (the line marked with the number 1):

                                        QUERY PLAN
------------------------------------------------------------------------------------------
 Gather Motion 32:1  (slice2; segments: 32)
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result
               Output: (count()), a.c1
               ->  Hash Left Join
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     ->  Seq Scan on public.a
                           Output: a.c1
                           Filter: (a.c1 = 1)
                     ->  Hash
                           Output: (count()), b.c1
                           ->  GroupAggregate
                                 Output: count(), b.c1
                                 Group Key: b.c1
                                 ->  Sort
                                       Output: b.c1
                                       Sort Key: b.c1
                                       ->  Redistribute Motion 32:32  (slice1; segments: 32) (1)
                                             Output: b.c1
                                             Hash Key: b.c1
                                             ->  Seq Scan on public.b
                                                   Output: b.c1
                                                   Filter: (b.c1 = 1)
 Optimizer: Pivotal Optimizer (GPORCA)

To summarize the fast path planner and router planner planners:

  • The main limitations of these planners are that they can only plan queries within a specific shard, or they can execute a query individually on each shard and merge the results "at the top" of the plan. In fact, these queries can contain subqueries that can be executed independently on shards.

  • Complex joins are not supported (for example, outer join), subqueries with joins are also not supported.

The following advanced planner of the recursive planner type removes these limitations.

Recursive planner

Recursive planner implements:

  • Ability to join tables that are defined as colocated according to their shard distribution keys.

  • Ability to plan subqueries/CTE separately (in isolation) so that manipulation of the results of these subqueries can be reduced to local operations within the shard.

This type of planner tries to consider each possible part of a query for its isolated execution in the pushdown version. The essence of this action is as follows: conditionally, the planner receives a query tree, which is analyzed in depth. During planning, all subqueries and CTE are checked for their ability to be executed directly on the worker nodes. If this is impossible for a given subquery/CTE, then the regular PostgreSQL planner (planner) is called for it. The result is added to the list of subquery plans. An important point is that these subplans are launched before the distributed plan is processed. The results of processing these subplans are written to temporary files on the disk. We will consider this process in detail a little later.

With this approach, developers solve the main problem — the ability to support most types of SQL queries (yes, there are exceptions: for example, the lack of support for GROUPING SETS, CUBE, ROLLUP).

By receiving and materializing the intermediate results of a subquery on each node/shard, Citus can attach this data to distributed tables using any key.

It is worth noting that, unlike Greenplum, recursive and modifying CTE are not supported unless a query is limited to a single shard:

WITH RECURSIVE h AS
(
  SELECT a.c1, a.c3, 0 AS level FROM a
  WHERE a.c2 IS NULL
  UNION ALL
  SELECT a.c1, a.c3, level + 1
  FROM a, h
  WHERE a.c1 = h.c1
)
SELECT * FROM h;
ERROR:  recursive CTEs are only supported when they contain a filter on the distribution column

Let’s look at an example of how this planner works on a couple of queries:

SELECT COUNT(*)
FROM
  (SELECT *
   FROM a
   ORDER BY a.c3
   LIMIT 10000000) AS foo;
                                        QUERY PLAN
------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual time=3399.533..3399.535 rows=1 loops=1) (1)
   ->  Distributed Subplan 3_1 (2)
         Subplan Duration: 38294.62 ms
         Intermediate Data Size: 238 MB
         Result destination: Write locally
         ->  Limit (actual time=22744.153..24169.593 rows=10000000 loops=1) (3)
               ->  Sort (actual time=22744.151..23669.158 rows=10000000 loops=1) (4)
                     Sort Key: remote_scan.c3
                     Sort Method: external merge  Disk: 430592kB
                     ->  Custom Scan (Citus Adaptive) (actual time=15420.113..17028.325 rows=20000000 loops=1)
                           Task Count: 32
                           Tuple data received from nodes: 210 MB
                           Tasks Shown: One of 32
                           ->  Task
                                 Tuple data received from node: 6713 kB
                                 Node: host=citus_host port=5432 dbname=citus_db
                                 ->  Limit (actual time=172.560..383.698 rows=624917 loops=1)
                                       ->  Sort (actual time=172.558..313.678 rows=624917 loops=1) (5)
                                             Sort Key: c3
                                             Sort Method: external merge  Disk: 13504kB
                                             ->  Seq Scan on a_102317 a (actual time=0.016..68.522 rows=624917 loops=1) (6)
                                     Planning Time: 0.045 ms
                                     Execution Time: 500.327 ms
         Planning Time: 0.000 ms
         Execution Time: 24575.273 ms
   Task Count: 1
   Tuple data received from nodes: 8 bytes
   Tasks Shown: All
   ->  Task
         Tuple data received from node: 8 bytes
         Node: host=citus_host port=5432 dbname=citus_db (7)
         ->  Aggregate (actual time=3357.556..3357.556 rows=1 loops=1)
               ->  Function Scan on read_intermediate_result intermediate_result (actual time=2233.802..2988.018 rows=10000000 loops=1) (8)
             Planning Time: 0.051 ms
             Execution Time: 3398.382 ms
 Planning Time: 1.654 ms
 Execution Time: 41694.215 ms

Recursive planner decides to plan a subquery (line marked with the number 1) using a subplan (line with the number 2), collect the result of executing the subplan on one node, and calculate the final aggregate function based on the intermediate data (line 7).

I propose to analyze why the planner makes such decisions and how the intermediate storage of results is carried out.

According to the pushdown rules for subqueries, the presence of LIMIT requires that a subquery be formatted as a separate subplan. If we consider the plan from the bottom up, the lines from Seq Scan (line 6) are fed to the input of the Sort node (line 5), then the LIMIT condition is applied. These actions are performed in parallel on all shards of the table (by default, equal to 32). Thus, at the output of these plan nodes, LIMIT N rows come from each shard, sorted by a.c3. However, in order to ensure that a user receives exactly the LIMIT N rows as expected, the planner has to sort the data from all shards before applying the final LIMIT condition (lines 3 and 4). Otherwise, the user might receive results that are different from expectations. These results are cached on the query coordinator node and become available to the rest of the plan through the pg_catalog.read_intermediate_result function. The mechanism of the read_intermediate_result function is described in the Query execution features section.

The Greenplum planner plans the query like this:

                      QUERY PLAN
------------------------------------------------------
Aggregate (actual time=4350.773..4350.773 rows=1 loops=1)
   ->  Limit (actual time=254.085..3491.350 rows=10000000 loops=1) (1)
         ->  Gather Motion 32:1  (slice1; segments: 32) (actual time=254.081..2457.549 rows=10000000 loops=1) (2)
               Merge Key: c3 (3)
               ->  Limit (actual time=240.614..352.447 rows=657684 loops=1)
                     ->  Sort (actual time=240.609..285.729 rows=657684 loops=1)
                           Sort Key: c3 (4)
                           Sort Method:  top-N heapsort  Memory: 1310496kB
                           ->  Seq Scan on a (actual time=0.078..110.649 rows=657714 loops=1)
 Planning time: 27.504 ms
   (slice0)    Executor memory: 2105K bytes.
   (slice1)    Executor memory: 40994K bytes avg x 32 workers, 40994K bytes max (seg0).  Work_mem: 40953K bytes max.
 Memory used:  122880kB
 Optimizer: Pivotal Optimizer (GPORCA)
 Execution time: 4400.946 ms

Here, it is worth paying attention to the Gather Motion 32:1 step and its parameterization as Merge Key: c3 (lines 2 and 3). The GPORCA planner, knowing that the results from each segment will come in order according to a.c3 (line 4), has the ability to do, in fact, a merge join and apply LIMIT on the results of this set of rows (line 1). This step corresponds formally to the Limit node and Sort under it (lines 3 and 4 in the Citus plan).

The key difference between the plans is that Citus executes Distributed Subplan for each shard (line 2). Within this plan, the total result from the shards is sorted (line 4) and limited by the Limit node (line 3). The results are collected on the query coordinator and materialized as files, which are read by the read_intermediate_result function on the coordinator (line 8).

Greenplum does this within Gather Motion on the fly and shows the best results in practice:

// Citus

$ time psql -d citus_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null

real	0m40.617s
user	0m0.002s
sys	0m0.003s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null

real	0m2.892s
user	0m0.003s
sys	0m0.004s

The same type of planner allows you to plan queries with HAVING in subqueries, but it is not able to handle related (correlated) queries, unlike Greenplum:

SELECT foo.c1, foo.cnt, a.c3
FROM (SELECT b.c1, COUNT(*) AS cnt FROM b GROUP BY b.c1) foo
INNER JOIN a
  ON a.c1 = foo.c1
GROUP BY foo.c1, foo.cnt, a.c3
HAVING foo.cnt >= (SELECT COUNT(*) FROM b WHERE b.c1 >= foo.cnt);
ERROR:  Subqueries in HAVING cannot refer to outer query

Greenplum copes with this task without any problems:

                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
Gather Motion 32:1  (slice2; segments: 32)
   ->  HashAggregate
         Group Key: b.c1, (count()), a.c3
         ->  Hash Join
               Hash Cond: (b.c1 = a.c1)
               ->  Result
                     Filter: ((count()) >= COALESCE(((SubPlan 1)), '0'::bigint))
                     ->  Result
                           ->  HashAggregate
                                 Group Key: b.c1
                                 ->  Seq Scan on b
                           SubPlan 1  (slice2; segments: 32)
                             ->  Aggregate
                                   ->  Result
                                         Filter: (b_1.c1 >= (count()))
                                         ->  Materialize
                                               ->  Broadcast Motion 32:32  (slice1; segments: 32)
                                                     ->  Seq Scan on b b_1
               ->  Hash
                     ->  Seq Scan on a
 Optimizer: Pivotal Optimizer (GPORCA)

Even with fairly simple examples, Citus raises questions about what I need to do with the schema and data distribution in order to ensure that the query will be still executed. The GPORCA planner seems more advanced in practice.

For example, for correlated subqueries:

CREATE TABLE sales(brand TEXT, size INT, price NUMERIC(10,2), store INT);
CREATE TABLE
SELECT COUNT(*)
FROM sales s1
WHERE  s1.size > 32 OR s1.price > (SELECT avg(s2.price) FROM sales s2 WHERE s2.brand = s1.brand);
 count
-------
     0
(1 row)
SELECT COUNT(*)
FROM sales s1
WHERE s1.seller
IN (SELECT s2.seller
        FROM sales s2
        WHERE s2.price = (SELECT min(s3.price) FROM sales s3 WHERE s3.brand = s1.brand));
ERROR:  complex joins are only supported when all distributed tables are co-located and joined on their distribution columns

For the first query, large tables can be partitioned by some fields in addition to sharding.

Logical Planner & Optimizer

A detailed discussion of this planning level is outside the scope of this article. The tasks at this level are partly similar to the tasks solved by groupping_planner in PostgreSQL: these are operations such as GROUP BY, ORDER BY, and LIMIT, as well as aggregating functions.

Join operations are also optimized at this level. The main goal of this optimization is to reduce the number of joins that lead to data redistribution. Apparently, due to the specifics of redistribution based on writing intermediate results to a file and the COPY protocol (more details later), cost-based estimates are not performed.

Query execution features

Citus builds its execution plan based on the implementation of the CustomScan node.

I propose to analyze a query execution process on a simple query, but touching on some interesting points such as executing subplans, storing intermediate results in files and broadcast copying them to nodes.

First, the query itself and its plan:

SELECT COUNT(DISTINCT b.c2)
FROM (SELECT DISTINCT a.c2 FROM a)
AS foo, b
WHERE foo.c2 = b.c2;
                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
 Aggregate
   Output: count(DISTINCT remote_scan.count)
   ->  Custom Scan (Citus Adaptive) (1)
         Output: remote_scan.count
         ->  Distributed Subplan 24_1 (2)
               ->  HashAggregate (3)
                     Output: remote_scan.c2
                     Group Key: remote_scan.c2
                     ->  Custom Scan (Citus Adaptive) (4)
                           Output: remote_scan.c2
                           Task Count: 32
                           Tasks Shown: One of 32
                           ->  Task
                                 Query: SELECT DISTINCT c2 FROM public.a_102809 a WHERE true
                                 Node: host=citus_host port=5432 dbname=citus_db
                                 ->  HashAggregate
                                       Output: c2
                                       Group Key: a.c2
                                       ->  Seq Scan on public.a_102809 a
                                             Output: c1, c2, c3
         Task Count: 32
         Tasks Shown: One of 32 (5)
         ->  Task
               Query: SELECT worker_column_1 AS count FROM (SELECT b.c2 AS worker_column_1 FROM (SELECT intermediate_result.c2 FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer)) foo, public.b_102841 b WHERE (foo.c2 OPERATOR(pg_catalog.=) b.c2)) worker_subquery GROUP BY worker_column_1
               Node: host=citus_host port=5432 dbname=citus_db
               ->  HashAggregate
                     Output: b.c2 (6)
                     Group Key: b.c2
                     ->  Hash Join
                           Output: b.c2  (7)
                           Hash Cond: (intermediate_result.c2 = b.c2)
                           ->  Function Scan on pg_catalog.read_intermediate_result intermediate_result
                                 Output: intermediate_result.c2
                                 Function Call: read_intermediate_result('24_1'::text, 'binary'::citus_copy_format)
                           ->  Hash
                                 Output: b.c2
                                 ->  Seq Scan on public.b_102841 b
                                       Output: b.c2

We already know that such a query leads to the creation and execution of a subplan (line marked 1), the use of the subplan results in the Hash Join (line marked 6), and reading of intermediate results in the inner part of the join (line 7).

Citus is built into the query execution process at the following extension points: ExecutorStart_hook, ExecutorRun_hook, ExplainOneQuery_hook, prev_ExecutorEnd, ExecutorEnd_hook.

Query execution process
Query execution process
Query execution process
Query execution process

Let’s start our countdown from the ExecutorStart_hook implementation of the CitusExecutorRun callback function. For a query like ours, the PostgreSQL engine calls the CitusExecutorRun function in ExecutorRun.

CitusExecutorRun, in addition to auxiliary Citus-specific tasks like skipping constraints check on the coordinator for ALTER TABLE queries (this is delegated to nodes), solves the main task — search for scan state nodes (CitusScanState) that were previously added by one of the planners.

CitusScanState is a "container" structure that includes:

  • CustomScan node state (CustomScanState);

  • pointer to the distributed execution plan (DistributedPlan);

  • tuple storage for the results of executing a distributed plan (Tuplestorestate);

  • pointer to the PreExecScan callback function (more on that later) and several other fields.

The plan (to be more precise, the PlanState nodes) is traversed in depth, and the found instances of CitusScanState are added to the list.

For our case, this list contains one Custom Scan (Citus Adaptive) node (line 1) and an associated CustomScanState instance. For each of the found nodes and the associated distributed execution plan for distributed tables, locks of all partitions are taken, and all subplans are executed, starting from the top one. In our case, this is the Distributed Subplan 24_1 subplan.

Receiving results on nodes is carried out through the DestReceiver instance. The coordinator node creates the so-called RemoteFileDestReceiver.

Its tasks include:

  • establishing connections with nodes using the libpq protocol;

  • sending the COPY command to these nodes (more on this later);

  • receiving tuples;

  • preparing tuples data for the COPY command and sending these rows to nodes (BroadcastCopyData).

Once RemoteFileDestReceiver is initialized, Citus begins executing the subplan (line 2). For this purpose, the portal mechanism (Portal) is used. The previously planned part of the HashAggregate query is sent to the portal (line 3). The portal takes as one of its arguments an instance of DestReceiver, which is RemoteFileDestReceiver.

During the processing of the query by the portal, the subquery again gets planned, a regular call of ExecutorStart_hook occurs, which in turn calls the CitusExecutorRun function. By analogy with the main query, it finds and executes the CustomScan node nested in the subquery (line 4).

Further, the plan goes to the input of the ExecutePlan PostgreSQL kernel function. Since the top-level node of the plan is the HashAggregate aggregate function node (line 3, our COUNT(DISTINCT)), the control is transferred to the PostgreSQL kernel (the nodeAgg.c module), which in turn calls the underlying node, which is the CustomScan Citus node.

To perform scanning tasks (that is, directly retrieving rows), the CustomScan implementation uses the so-called AdaptiveExecutor. A full consideration of this module will take another similar article, so, we will consider only the main tasks related to our query.

At the top level, the main task of AdaptiveExecutor is to execute a list of tasks. An example of a task would be a query to a shard (essentially, to a table). In our case, the planner defined 32 tasks (according to the number of shards in the table).

Also, AdaptiveExecutor creates a TupleStore to temporarily store obtained results.

How the read_intermediate_result function works:

The task of read_intermediate_result is to present the intermediate result in COPY format, obtained as a result of a broadcast execution of a subplan, or shuffling (Repartitioning, more on this later) as a set of rows. The file with the intermediate results is parsed and converted into the column formats that the caller passed as parameters. For example, in the case of our query (line 5) read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer) the first argument 24_1 is the identifier of the result row set, the second argument specifies the format ( 'binary' ). The next identifier is used to obtain the name of the file in the pgsql_job_cache directory.

Then, the ReadFileIntoTupleStore function converts rows from the COPY format (PostgreSQL implementation of the copy.c module is used) into Tuplestorestate. Further, the results are presented in the form of tuples.

Where are intermediate results stored:

Intermediate results of query execution are stored in the base/pgsql_job_cache directory on each node. If these results are used in a distributed transaction, then this is the directory base/pgsql_job_cache/<user id>_<coordinator node id>_<transaction number>/. If a distributed transaction is not utilized, this path is base/pgsql_job_cache/<user id>_<process id>/.

The example of location:

$ pwd
/var/lib/pgsql/14/data/base/pgsql_job_cache/10_0_237
$ ll
total 178660
-rw------- 1 postgres postgres 182944635 Aug 27 02:14 1_1.data

Repartitioning

How does Citus make a join that is not based on a sharding key when the tables are not distributed using that key?

IMPORTANT

A prerequisite for such joins is that GUC citus.enable_repartition_joins is set to on. Otherwise, Citus raises the following error: ERROR: the query contains a join that requires repartitioning. By default, enable_repartition_joins is set to off.

Consider the following query for the case when the a table is distributed by the a.c1 key, and the b table is distributed by the b.c3 key (to simplify the analysis, we will first distribute each table across two shards):

SELECT a.c2, COUNT(*)
FROM a
INNER JOIN b ON a.c1 = b.c1
GROUP BY a.c2
LIMIT 100;
                                             QUERY PLAN
----------------------------------------------------------------------------------------------------
 Limit (actual time=88944.560..88944.563 rows=1 loops=1)
   Output: remote_scan.c2, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))
   ->  HashAggregate (actual time=88944.559..88944.560 rows=1 loops=1)
         Output: remote_scan.c2, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)
         Group Key: remote_scan.c2
         Batches: 1  Memory Usage: 40kB
         ->  Custom Scan (Citus Adaptive) (actual time=88944.536..88944.539 rows=12 loops=1)
               Output: remote_scan.c2, remote_scan.count
               Task Count: 12
               Tuple data received from nodes: 132 bytes
               Tasks Shown: None, not supported for re-partition queries
               ->  MapMergeJob
                     Map Task Count: 2
                     Merge Task Count: 12
               ->  MapMergeJob
                     Map Task Count: 2
                     Merge Task Count: 12
 Planning Time: 3.422 ms
 Execution Time: 88944.605 ms

To perform such a join, the repartitioning mechanism is used. It is based on partitioning into jobs with a single-level hierarchy, where a certain set of jobs is dependent on the main one. Essentially, this set of tasks is a variation of subplans, the results of which are redistributed between nodes according to the keys. The difference from ordinary subplans is that, in tasks, there is no stage of combining results (meaning the merge stage); in subplans, the result, in turn, is always completely redistributed among the nodes (broadcast). Jobs are included in a distributed execution plan and executed in two stages.

At the first stage, each of the 4 shards of both tables (for service, a separate libpq connection is used) receives a query of the form:

SELECT
	partition_index,
	'repartition_2198637379588_1' || '_' || partition_index::text , (1)
	rows_written
FROM pg_catalog.worker_partition_query_result(
	'repartition_2198637379588_1',
	'SELECT c2 AS column1, c1 AS column2 FROM public.a_102080 a WHERE true', (2)
	1,
	'hash',
	'{-2147483648,-1789569707,-1431655766,-1073741825,-715827884,-357913943,-2,357913939,715827880,1073741821,1431655762,1789569703}'::text[], (3)
	'{-1789569708,-1431655767,-1073741826,-715827885,-357913944,-3,357913938,715827879,1073741820,1431655761,1789569702,2147483647}'::text[], (4)
	true,
	true,
	true
)
WHERE rows_written > 0

The most important things in these queries:

  • The name of the file is subsequently formed from the line marked with number 1. This file will contain intermediate results of the query from line number 2.

  • Lines 3 and 4 contain ranges of hash values according to which the task will be split into parts (Merge Task Count), you can see that the arrays — partition_min_values (the first argument-array ) and partition_max_values (the second) — contain 12 elements each.

Internally, this results in a call to the worker_partition_query_result function. The task of this function is to obtain the results of executing this query and write them to local files (for the node) according to the sharding scheme and the sharding key. DestReceiver instances are used to receive and write results to intermediate files. For each result file, its own recipient is created. The total number of files for one shard is set by the value of the Merge Task Count parameter, in our case, it equals to 12. When the recipients are ready, the portal, which executes the query, is started (PortalRun).

After performing this operation for 1 of the 4 shards, the contents of the pgsql_job_cache directory for the corresponding distributed transaction will be something like this:

$ ll
total 56
drwx------ 2 citus citus 4096 sep 12 16:38 ./
drwx------ 3 citus citus 4096 sep 12 16:04 ../
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_0.data
-rw------- 1 citus citus 1653 sep 12 16:38 repartition_2198637379588_2_10.data
-rw------- 1 citus citus 1517 sep 12 16:38 repartition_2198637379588_2_11.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_1.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_2.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_3.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_4.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_5.data
-rw------- 1 citus citus 1262 sep 12 16:38 repartition_2198637379588_2_6.data
-rw------- 1 citus citus 1500 sep 12 16:38 repartition_2198637379588_2_7.data
-rw------- 1 citus citus 1347 sep 12 16:38 repartition_2198637379588_2_8.data
-rw------- 1 citus citus 1177 sep 12 16:38 repartition_2198637379588_2_9.data

Thus, for one table that participates in the repartitioning process, the number of intermediate files for this stage will be equal to Map Task Count (Table Shard Count) * Merge Task Count.

You can notice that for the same table you can see the following directory contents for 2 nodes:

  • node #1

    $ ll | grep 594_
    -rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_10.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_11.data
    -rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data
    -rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data
    -rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data
    -rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data
    -rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_6.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_7.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_8.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_9.data
  • node #2

    $ ll | grep 594_
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_0.data
    -rw------- 1 citus citus 1653 sep 12 17:16 repartition_2198637379594_2_10.data
    -rw------- 1 citus citus 1517 sep 12 17:16 repartition_2198637379594_2_11.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_1.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_2.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_3.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_4.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_5.data
    -rw------- 1 citus citus 1262 sep 12 17:16 repartition_2198637379594_2_6.data
    -rw------- 1 citus citus 1500 sep 12 17:16 repartition_2198637379594_2_7.data
    -rw------- 1 citus citus 1347 sep 12 17:16 repartition_2198637379594_2_8.data
    -rw------- 1 citus citus 1177 sep 12 17:16 repartition_2198637379594_2_9.data

Based on the file names and their content, we can conclude that the set of data does not intersect and each directory contains its own portion of data, which at the next stage is redistributed to make a local join using keys.

At the second stage, the fetch_intermediate_results function is used, the task of which is to read the intermediate results from other nodes and save these results as intermediate local ones. Before this, data is copied to the intermediate data directory to allow local joins. For example, after a call like this:

SELECT bytes
FROM fetch_intermediate_results(ARRAY['repartition_2198637379594_2_2']::text[],'localhost',8003) bytes;

For the table above, the contents of the intermediate directory will be:

$ ll | grep 594_
-rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_10.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_11.data
-rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data
-rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data
-rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data
-rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data
-rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_6.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_7.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_8.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_9.data
-rw------- 1 citus citus 1500 sep 12 17:34 repartition_2198637379594_2_2.data   <====== data of another shard

To connect to other nodes, the libpq protocol and the COPY command are used. At this stage, the join is actually performed using already local data.

Then, the read_intermediate_result function, already familiar to us, uses these results.

Schematically, the described process looks like the follows.

Repartitioning
Repartitioning
Repartitioning
Repartitioning

Distributed transactions, locks, and isolation

This brings us to the interesting and complex topic of visibility of changes by different transactions, locks, and distributed transactions themselves. The topic is comprehensive, with a lot of nuances. As part of this article, I would like to show the main differences using a couple of queries as an example.

To begin with, the general architecture of Citus is in this part:

Citus uses the standard libpq protocol and, accordingly, libpq connections for connections between nodes. Connections are cached and reused. To achieve a better parallelization when executing commands, multiple connections to one node are used, if possible.

An important nuance is that with such an organization of data access, for example, for transactions involving several tables after a write operation to a shard, all transactions working with shards of this group must use the same connection.

Otherwise, other commands within the same transaction would not be able to see the data that has not yet been committed. A similar requirement arises when writing to a reference table, after which read commands from this table are executed, including join operations with distributed tables (distributed). This is not always possible, in general. The documentation provides an example of such a case if GUC citus.multi_shard_modify_mode is not set to sequential:

BEGIN; (1)
DELETE FROM dist_table; (2)
INSERT INTO reference_table VALUES (1,2);
SELECT * FROM dist_table JOIN reference_table ON (x = a); (3)
ERROR:  cannot perform query with placements that were modified over multiple connections

The DELETE operation (line marked 2) within a general multi-command transaction (line 1) affects multiple shards. If this operation is performed in parallel, multiple connections will be involved. Thus, given the requirement for reference tables, it becomes impossible to use one connection for the join command (line 3).

In general, Citus has to carefully handle connections in many aspects: on the one hand, control the number of parallel connections so as not to overload the nodes, on the other hand, comply with data visibility rules and avoid deadlocks. The last two aspects are covered by a separate Placement connection tracking module. Its tasks include tracking which groups of shards were affected by the DML/DDL/SELECT operations of a given transaction and deciding which connections to use for further operations.

The module follows these rules:

  • it is allowed to use different connections (i.e., parallelization of command execution becomes possible) for sequential (in terms of the order of expressions in the transaction) SELECT queries;

  • it is allowed to use different connections for DML queries after SELECT queries;

  • in other cases, only the same connection is allowed.

For these purposes, Greenplum implements a mechanism for reading and writing processes within a group of related processes (gangs). When creating connections between QE and QD, the latter specifies a parameter that determines whether the process will be a reader or a writer. Only one writing process is allowed per group of related processes, and it will also perform writing tasks as part of a distributed transaction. Thus, only this process can change the state of the database and, as a rule, it is also responsible for managing locks within a transaction (LockHolder). To me, such a clear separation seems to be a more transparent approach.

Regarding protocols for interaction between nodes, in Greenplum, the libpq protocol is patched, in particular, for distributed processing (mainly changes concern the interaction of QD and QE). The interface for processing commands received via libpq has been expanded, for example, with commands such as:

  • 'M' — command for executing queries (CdbDispatchPlan) by segments (QE) planned on the master (QD);

  • 'T' — distributed transaction protocol control command (DtxProtocolCommand).

Several types of interconnect are available: tcp, ufpifc, and ic proxy. The central element of such interaction are the Motion nodes including the Gather Motion, Redistribute Motion, and Broadcast Motion variations, which are responsible for transferring data between segments.

Thus, Citus relies entirely on standard libpq, while Greenplum uses its own variant of the libpq protocol to send queries to segments, while interconnect is responsible for moving data. Its own implementation of the protocol means difficulties with infusing changes of the PostgreSQL upstream version, but this concerns the general disadvantage of Greenplum, as it is significantly tuned for PostgreSQL MPP.

If comparing the Motion nodes with their closest conceptual analogues — the fetch_intermediate_results, read_intermediate_result, and worker_partition_query_result implementations described above, in Greenplum there is no data transfer among files from node to node, use of the COPY command, etc. The motivation for the decision made by the creators of Citus is clear — maximum use of the tools available in PostgreSQL. The effectiveness of such an implementation is an open question. It seems to me that sending Motion nodes data directly "to the network" is a more efficient approach.

From my perspective, as analogues of query execution parallelization, the closest analogue of Greenplum slices (which are independently executed parts of the plan, the "bridge" between which are various kinds of Motion nodes) are tasks (Tasks) and jobs (Jobs).

Transaction management

For writing transactions that affect shards on different nodes, Citus uses PostgreSQL implementation of two-phase commit (2PC) with some modifications for a distributed environment (related to HA and auto-recovery of 2PC transactions after a failure).

The approach itself and the features of its implementation in Citus are described in a lot of articles. It is based on the PREPARE TRANSACTION, COMMIT PREPARED, and ROLLBACK PREPARED statements that PostgreSQL offers for external transaction managers. I propose to compare and highlight the advantages of implementing a similar, but, in fact, more advanced mechanism in Greenplum.

In my opinion, the main drawback of the implementation of Citus distributed transactions is the lack of guarantees of compliance with visibility rules for committed transactions within the entire cluster.

To demonstrate the possible consequences of such a situation, I added a delay to the block of code that sends COMMIT PREPARED commands to the working nodes according to the 2PC protocol implementation. The sequence of queries is as follows:

BEGIN;
INSERT INTO a VALUES (1, 'foo', 1);
INSERT INTO a VALUES (2, 'foo', 2);
COMMIT;
SELECT * FROM a;
 c1 | c2  | c3
----+-----+----
  1 | foo |  1
  2 | foo |  2
(2 rows)

Then, we send the INSERT queries:

BEGIN;
INSERT INTO a VALUES (3, 'foo', 3);
INSERT INTO a VALUES (4, 'foo', 4); (1)
COMMIT; (2)

But since within COMMIT (line marked with the number 2) between these two transactions we have added an artificial (but possible in reality) delay, then it becomes possible to receive the following data in a parallel session:

SELECT * FROM a;
 c1 | c2  | c3
----+-----+----
  1 | foo |  1
  3 | foo |  3
  2 | foo |  2
(3 rows)

The problem is in the line marked 1, where we see that the SELECT query returned the line (3, 'foo', 3) in a parallel session, but did not return (4, 'foo', 4).

Thus, the obvious conclusion follows — Citus, as a distributed DBMS, according to the consistency model of distributed systems, falls into the Read Uncommitted classification.

The reason for this summary is that Citus does not have the concept of a single snapshot (meaning Snapshot for the MVCC implementation in PostgresSQL) for distributed transactions.

Previously prepared transactions on individual nodes are applied at different times. Thus, some subsequent transaction will be able to read data that has been applied on one node, but has not yet been applied to another.

In fact, this violates the principle of atomicity — the user will see only part of the committed transaction data. In my view, this may be a significant disadvantage of Citus.

In Greenplum, this situation is not allowed. The system core tracks information about active distributed transactions. To do this, the distributed transaction identifier (gxid) is stored in the shared process memory of each backend, similar to the standard PostgreSQL local transaction identifiers (xid). This identifier is assigned if the transaction requires 2PC (i.e. the transaction is distributed). The query coordinator (QD) is responsible for generating and assigning the gxid value.

As mentioned above, Greenplum has expanded the set of commands that are processed by backends. In particular, the 'M' and 'T' commands. Handlers of these commands also accept serialized information for distributed transaction processing from QD. A global (distributed) snapshot also includes this data.

Greenplum extends the PostgreSQL structure for the snapshot storage to store the global snapshot information. Further, this snapshot is used to check the visibility of the rows. As stated earlier, only one process in gang can be a writer, so the reader processes assume that the writer process received the snapshot. This also solves problems with the visibility of rows affected by the writing part of the transaction (i.e. something that Citus has to solve by manipulating which libpq connections can be used).

A simplified model for checking row visibility using global snapshots is as follows.

Simplified model for checking row visibility using global snapshots
Simplified model for checking row visibility using global snapshots
Simplified model for checking row visibility using global snapshots
Simplified model for checking row visibility using global snapshots

If the xmin identifier of this row matches to a transaction that is applied (we will leave the visibility rules for not yet applied and frozen transactions outside the brackets in this case), then Greenplum checks the visibility taking into account the visibility snapshot for the distributed transaction.

Using the query example that was described earlier, Greenplum checks whether this transaction is distributed and whether it needs to be checked against a global visibility snapshot. In our case, the distributed transaction does not have a completed status, so this row is not visible to a query in another session.

Functions evaluation

Citus

Citus follows these rules for executing and obtaining the resulting value of functions:

  • Prohibit the use of the stable and volatile functions in RETURNING for INSERT queries.

  • Value calculation, including for the volatile functions, on the query coordinator for INSERT queries.

  • Prohibit the use of volatile functions in UPDATE and DELETE queries.

  • For SELECT queries, Citus does not evaluate function values on the coordinator.

This last point can lead to potential problems. For example, for the following query:

SELECT *
FROM d INNER JOIN r
  ON d.c1 = r.c1
WHERE r.c1 = ROUND(RANDOM());

Citus creates the following plan:

                                  QUERY PLAN
------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)
   Task Count: 32
   Tasks Shown: One of 32 (1)
   ->  Task
         Node: host=10.92.40.201 port=5432 dbname=postgres
         ->  Hash Join (2)
               Hash Cond: (d.c1 = r.c1)
               ->  Seq Scan on d_103166 d
               ->  Hash
                     ->  Seq Scan on r_103198 r (3)
                           Filter: ((c1)::double precision = round(random())) (4)

According to the rules of function calls, it evaluates the condition in WHERE (line marked 4) for each shard (line marked 1). Thus, the following situation is theoretically possible:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Reference table value (r)

{ 0, 1 }

{ 0, 1 }

Let’s say the distributed table d stores 2 rows. On the first shard Shard 1 the attribute value is 0, on the second shard Shard 2 the value is 1. The reference table stores the same 2 rows.

Since, according to the execution plan, a select from table r is performed on each shard according to the coincidence of the calculated value round(random()), i.e. it is either 0 or 1. With some probability, it is possible to receive such tuples at the output of the Seq Scan node (line 3): the first Shard 1 received only a row with the key 1, the second — with the key 0.

However, the planner did pushdown of the Hash Join connection to each shard (line 2). Obviously, in this combination of keys, users are likely to get a result that they would never get, for example, for regular PostgreSQL!

Since the "superposition" of the r projections for both shards is the set { 1, 0 }, which would be matched by keys with the rows 0 and 1 of the distributed table d, in our case it gave an empty set after joining results from shards:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Reference table selected keys with random() selection

{ 1 }

{ 0 }

local d and r join result

{ ∅ }

{ ∅ }

after custom scan

{ ∅ }

Greenplum

In turn, Greenplum plans the query to match the "result identity" criteria to the undistributed storage. An important point is that it builds once a select with the volatile function random() within the query to the replicated one (analogue of the Citus reference table) and distributes it into segments through Redistribute Motion (line marked with number 2).

                                     QUERY PLAN
------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice2; segments: 3)
   ->  Hash Join (1)
         Hash Cond: (d.c1 = r.c1)
         ->  Seq Scan on d
         ->  Hash
               ->  Redistribute Motion 1:3  (slice1; segments: 1) (2)
                     Hash Key: r.c1
                     ->  Result
                           ->  Seq Scan on r (3)
                                 Filter: ((c1)::double precision = round(random()))
 Optimizer: Postgres query optimizer

In this way, Greenplum can ensure the correctness of the Hash Join (line 1), thereby corresponding to the possible select results of an undistributed DBMS:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Replicated table value (r)

{ 0, 1 }

{ 0, 1 }

Since the results of Seq Scan (line 3) will be identical for all segments, each segment will be able to match each one using its own key and give the correct result:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Replicated table selected keys with random() selection

{ 0, 1 }

local d and r join result

{ 0 }

{ 1 }

after gather motion

{ 0, 1 }

As a result, in Citus, in such a combination of used tables, there is some probability of obtaining incorrect sampling results from the point of view of compliance with the undistributed DBMS.

Conclusion of the first part

In this part, we touched on the Citus basic concepts from a user and architectural point of view. We looked at planners, nuances of their work, and compared them with similar parts of Greenplum.

In the next part, we will look at the process of rebalancing shards — a rather convenient solution from a practical point of view to the problem of horizontal DBMS scaling. We will also compare approaches to controlling resource allocation.

And for the best part, we will consider and analyze the results of the comparative TPC-DS tests Greenplum vs Citus.

Keep in touch!

Found a mistake? Seleсt text and press Ctrl+Enter to report it