Monitoring queries in Greenplum
What is the article about?
Hello everyone. My name is Dmitry and I am a system architect at Arenadata. I am designing and developing ADB Control — a query monitoring system for Arenadata DB.
When you work with different databases, it is often necessary to monitor the execution of current queries. This is mainly related to administration or analytics tasks. Monitoring tools that allow you to manage and monitor the execution of queries greatly assist in this. I’ll talk about the challenges we faced when designing and implementing a query monitoring system for Arenadata DB.
Arenadata DB is an open-source analytical distributed MPP DBMS built on the Greenplum DBMS. In this regard, monitoring queries becomes very complicated because queries are executed on many segments. In this case, the load on each segment depends on the uniformity of data distribution and the types of operations performed on it. A direct analogue of ADB Control is the VMware Tanzu Greenplum Command Center system, which is a part of the paid version of the platform.
I’ll talk about the second version of ADB Control. The functional requirements in the new version, for the most part, remained the same, but the non-functional requirements were significantly increased. These requirements can be briefly expressed by the well-known triad: performance, fault tolerance, extensibility. This led to profound changes in the architecture — the system was essentially rewritten from scratch.
The result is a solution for detailed monitoring of a distributed data processing system.
Architecture analysis
First, we analyzed the system architecture of the ADB Control’s first version. After studying the working of the system and the code base, we compiled a system architecture diagram.
Where:
-
Extensions — set of Arenadata DB extensions (written in C) that collect metrics on states of queries, locks, spill files, etc.
-
Agent — service for receiving and processing metrics (written in C++). Collects messages consisting of several UDP packets. A single agent instance is installed on each host (i.e. one agent processes metrics from all segments within one host).
-
Router — reverse proxy for routing UI requests and metrics.
-
Web Server — the main application that is responsible for receiving, filtering, processing, and storing metrics. It also processes requests from the Web UI (which is a part of the Web Server) and sends updates to it using websockets.
-
Database — PostgreSQL instance that stores all query data.
The process of processing metrics is shown in the image below.
Simplified operation of the system can be described as follows:
-
The Extensions module receives metrics of various types, generates messages in the protobuf format based on them, divides each message into parts no larger than a UDP datagram, and sends them to an agent on a local host via a UDS or UDP socket.
-
Agent reconstructs protobuf messages from received datagrams and forwards them to the Web Server via HTTP 1.1.
-
Web Server processes each message individually:
-
Sends the received metric to the Web UI.
-
Writes new data to the database asynchronously.
-
The main issues of this architecture:
-
There is a non-scalable application (Web Server) for processing metrics received from a horizontally scalable system. Moreover, the same application processes user requests and sends updates to the Web UI via websockets.
-
Agent receives and parses packets in a single thread, which affects the performance of the agent itself and segments on a host due to synchronous operation when sending via a UDS socket.
-
Agent sends messages to the Web Server synchronously, which can impact agent and segment performance if the Web Server is unavailable because the agent’s internal message queues quickly fill up.
-
Each metric generates a separate transaction to be written to the database.
-
If there are active user sessions, many metrics are sent to the Web UI via websockets one at a time.
-
A significant part of metrics is simply ignored on the Web Server side. For example, query state messages — only states received from the coordinator segment are taken into account. When running tests, it turned out that less than 10% (82963) of the total number of messages (872906) were processed. As a result, we have an extra load on the network and all system components.
There are also other problems in this scheme, but solving them does not greatly affect the original goal.
A little about the extension implementation
I mentioned the Extensions component, but did not specify how it works and what reasons lie behind the features of its operation.
Extensions are two Arenadata DB components: gpperfmon and gpadcc (Arenadata DB extension). They collect internal Arenadata DB telemetry, which includes:
-
query plan;
-
query execution parameters in general;
-
operation parameters of query nodes;
-
database object locks;
-
temporary query data uploaded to disk, etc.
Metrics are collected using two approaches: push and pull models.
Hooks
Data collection in the push model is performed by hooks. In the case of Arenadata DB (and PostgreSQL), a hook is a function pointer that can be modified by extensions. A function is called in specific places that can be inspected and the result is sent externally.
Hooks have access to all memory of the process that executes a query. Moreover, this access is safe and does not require locking: a hook is called in the main execution thread. The call in the main thread allows receiving data immediately, at a previously known stage of the query execution. However, this is a disadvantage in the case of ADB Control: each hook instruction takes away CPU time from the query execution process. If act inaccurately, hooks will slow down the system they are intended to monitor.
In ADB Control, most parameters are collected by hooks in a push model. Therefore, the UDP protocol is used for interaction between Extensions and Agent: metrics can be sent without waiting for confirmation of their receipt from the Agent. This ensures that the maximum execution time of each hook is finite.
Some real hooks in PostgreSQL used by ADB Control in practice:
-
ExecutorStart_hook. Starts execution of an SQL command. Allows changing the process for starting execution of an SQL command (after scheduling). ADB Control uses this hook to obtain an SQL command plan and also determines its unique identifier, which will be used further.
-
ExecutorRun_hook. Executes a query. Called once. In ADB Control, it is used to collect query statuses; in fact, a query execution starts when ExecutorStart_hook is called.
-
ExecutorEnd_hook. Completes a query execution. Called only in case of normal completion, when no errors occurred during execution. In case of an abnormal termination, this hook is not called. In ADB Control, it is used to collect query statuses.
Periodic collection of metrics
Pull model is the use of a separate process that collects metrics on demand or periodically. ADB Control uses Background Worker for this — a separate process of an Arenadata DB instance, which is started and stopped together with the main process of the DBMS.
A separate process can collect metrics asynchronously, with minimal impact on system operation. But there is a nuance: metrics collection may require locks (exclusive access to resources — for example, a variable). As a result, monitoring can completely paralyze the system. A disadvantage of the pull model is that parameters are not collected in real time, but with a delay (equal to the period of metrics collection or the delay between sending a request to collect metrics and the start of collection).
Background Worker in ADB Control collects data (the list of locks and other information) using SQL queries. Queries are executed periodically, with a user-defined period. For uniformity, the protocol for interaction with Agent is the same as in hooks.
Data
Next, we began to study the composition and amounts of data taken from Arenadata DB for monitoring tools. Metrics can be transmitted via two types of sockets: UDS and UDP (the latter is used if the former is not available). Due to a single packet size limit for UDP (just under 64 KB for Linux), a proprietary protocol is used to split messages into parts and then reassemble them.
To collect the payload, we disabled data transfer via UDS, ran regression tests, and used the tcpdump utility to collect dumps of files with messages on the master and one segment. The resulting files could then be used as data sources via udpreplay.
Analysis of the dumps showed the following:
-
Not all metrics collected by extensions are needed.
-
The load is uneven over time.
-
The average load was 1650 packets/sec or 1 Kbit/sec, the peak load reached 45000 packets per second or 52 Kbit/sec.
-
Some types of messages can be very large (about 50 MB, these are messages with a query plan as a formatted string).
We have unpleasant consequences:
-
A large number of metrics can be filtered before sending, but we did not do that.
-
When sent via UDP, some packets are lost at the network level during peak loads.
-
If at least one message packet was not delivered (if the size of an entire message does not fit into one UDP packet), then the entire message cannot be restored and already received packets take up memory for a long time.
-
A message is written to UDS synchronously. This means that if the agent takes a long time to process a packet or does not respond for some reason, all segments on this host are waiting for sending to complete at that moment, which slows down the database as a whole.
System requirements
Before you start designing a system, it is important to make a list of requirements that it must meet. Otherwise, you can develop something that will simply stay at the table. But requirements do not always mean restrictions — some of them can be regarded as relaxations, which, in turn, give more freedom in design and implementation. Let’s look at an example of one of these properties.
In our case, the monitoring system should be:
-
fault-tolerant;
-
accessible;
-
high-performance;
-
as less complex as possible;
-
easy to update;
-
provide metrics in a mode as close to real time as possible.
Let me look at the last three points in more detail.
The complexity of a system means many things — it includes the total number of components (both internal and third-party), the interaction between them, the labor intensity of support, and almost inevitable changes in the infrastructure requirements for the system.
Updating the system should also be thought out in terms of data migration, downtime, the ability to return to a previous version, etc.
Since ADB Control is a monitoring system, it is also imposed time requirements. In our case, it can be soft. A delay of 1 second in displaying metrics on the client is not critical for a user, since there is no point in tracking very fast queries online (they can be viewed in the query history) and such delay is insignificant for long queries. This relaxation gives much more freedom in implementation.
What have we done?
As a result, we drew up a concept for a new ADB Control architecture.
Components of the new architecture:
-
JNI library — native library for working with sockets.
-
Extensions — component with extensions. It has not changed since the previous version.
-
Agent — new application implemented in Java that receives and filters metrics from Extensions, balances the load when sending by distributing metrics among Backend Servers based on a unique identifier of a query. Java was chosen for several reasons:
-
Backend server is implemented in Java and the Agent is developed by the same team. Therefore, there is no need to change the context during development, which, among other things, simplifies the implementation of interaction between Agent and Backend server.
-
JRE is already available on Arenadata DB hosts as it powers a tool for import/export data from external systems — PXF.
-
-
Backend Server — new horizontally scalable service for processing and aggregating metrics, also written in Java. It also writes information about queries into the database and sends updates to the Web Server.
-
Service Registry — catalog of ADB Control services registered in the system.
-
Web Server — previous version of Web Server, from which the functionality for processing metrics and writing them to the database was removed.
-
Web UI — client web application has not changed from the previous version.
-
Database — database component remains unchanged from the previous version.
In general, the sequence of receiving and processing metrics has not changed, but if we go into detail, there are quite a lot of distinctive features. Below we will look at some of them:
-
Agent processes received packets asynchronously using an internal queue, where all received packets are added. Messages from the queue are processed by a thread pool, which also increases agent performance.
-
Working with sockets is implemented through a native library and accessing it via JNI. This is due to saving the protocol for the interaction between Extensions and Agent. As mentioned above, it also includes a self-written protocol that is a superstructure over UDP. Experimentally, we found that calls via JNI do this job about 20% better than a similar implementation in Java, since versions before Java 16 did not allow working with datagrams directly.
-
Agent now filters out metrics that are not processed by the system, reducing the load on the network and components.
-
Messaging between Agent and Backend server was replaced with gRPC, which made it possible not only to reuse the protocol buffer, but also to apply multiplexing, additional features of HTTP 2, and asynchronous data sending out of the box.
-
On the Agent side, we implemented balancing the load between multiple Backend Server instances with sharding metrics based on a unique query identifier. This maintains data consistency when adding new Backend Server instances. This functionality also helps to get rid of the implicit bandwidth limitation due to TCP connection multiplexing. When we have multiple Backend Servers, we create one TCP connection per instance, which increases the overall bandwidth.
-
Processing metrics in Backend Server has become batch. A received message is added to an internal queue (according to the message type). A separate background process reads messages from all queues in batches and collapses metrics: it forms one insert/update expression from metrics related to one query. Next, the received metrics are sent asynchronously to the Web Server and written to the database using JDBC batch processing.
Results
We have implemented a system that not only solves the performance problems of the previous version, but is also ready to process much larger amounts of data. The first version could not handle the load if you enable the GUC gp_enable_gpperfmon
to collect information about the progress of a query execution periodically (without this option, the query state comes only at the time it begins to execute and at its completion). In the new version, even with minimal system configuration, enabling gp_enable_gpperfmon
does not lead to problems — this makes possible to monitor the execution status of query nodes in near-online mode using a graphical representation.
Monitoring queries in a distributed system poses many challenges and difficult technical solutions. As an additional complexity, such systems are associated with processing large amounts of data received from an entire cluster (and sometimes multiple clusters). But in general, such complex tasks are very interesting and allow us to improve the monitoring system, providing users with various levels of detail and understanding of the cluster operation. We have a large number of ideas that we are gradually implementing, improving the quality of ADB Control and taking into account our expertise in understanding the work of Greenplum.