Cruise Control overview
Architecture
Cruise Control is a tool designed for managing high-load Kafka clusters or clusters with a large number of hosts, allowing for the assessment of broker states as well as performing load balancing.
Working with Cruise Control in ADS is available starting with 3.9.0.1.b1.
The Metrics Reporter interface collects metrics from the Kafka broker via JMX and stores them in a topic defined by the metric.reporter.topic
parameter.
Load Monitor collects metrics from the topic and creates Cluster Load Model, which Analyzer uses to generate a proposal for optimization based on the goals set by the user, while Anomaly Detector identifies anomalies that can be corrected during the cluster’s self-healing.
Cluster Load Model is a software model reflecting the current distribution of cluster replicas with granularity of load data for various resources.
The user can obtain a generated proposal by making a GET
request to the Cruise Control REST API, and using a POST
request to initiate the rebalance of the Kafka cluster. Before launching the rebalance, the user can edit the goals and/or configure the cluster’s self-healing if necessary.
Executor is responsible for implementing the optimization proposals received from the analyzer.
The load metrics processed by the monitor are also sent to the connected storage, where they are saved for cluster recovery. In ADS, these are special topics in the Kafka cluster (defined by the *.metric.sample.store.topic
parameters). When Cruise Control is restarted, it will load metrics from this storage to populate the load monitor.
Below are the detailed descriptions of the Cruise Control components.
Load Monitor
To create Cluster Load Model within Load Monitor, an up-to-date Metrics Sampler is used in conjunction with the cluster’s metadata.
Metrics for all partitions of the Kafka cluster are assigned to special threads called Metric Fetcher Threads using Partition Assignor, which provides:
-
the same number of partition metrics for all threads;
-
assignment of partition metrics of one topic to one thread.
Threads are part of various Sampling Tasks.
There are three types of sampling tasks:
-
Metric Sampling Task
-
Bootstrap Task
-
Linear Model Training Task
Tasks are coordinated using the Metric Fetcher Manager module. It creates a certain number of threads to execute each task.
All metric samplers are organized using the Metric Sample Aggregator module. Each metric sampler is placed in load window according to the timestamp of the sample.
Load Window is the aggregated information about metrics for a specified time interval (time window). Such an interval (window) is defined by the parameter partition.metrics.window.ms
(broker.metrics.window.ms
for Kafka broker metrics).
The number of supported windows is determined by the num.partition.metrics.windows
parameter (num.broker.metrics.window
for Kafka broker metrics).
For example, if the time window is 1 hour and the user has configured storage for 168 windows, Cruise Control will store hourly load information for 7 days.
The number of samples in a single load window is determined by the metric.sampling.interval.ms
parameter.
For example, if a user specifies a time window of 1 hour and a sampling interval of 5 minutes, each window should contain 12 metric samples.
Using the parameter min.samples.per.partition.metrics.window
, the user can define the minimum number of samples in each window at which these samples will be included in Cluster Load Model.
When requested, Metric Sample Aggregator returns all aggregated data of the load window — workload snapshot in the Load Monitor as part of Cluster Load Model.
Analyzer
Analyzer generates proposals based on user-defined optimization goals and Cluster Load Model obtained from Load Monitor.
The proposal is an optimization plan for rebalancing the Kafka cluster. It describes a series of actions aimed at improving resource utilization in the cluster and balancing the load among brokers, such as redistributing partitions and leader elections.
Cruise Control allows you to set hard and soft goals.
A hard goal is a goal that must be achieved. Such goals should be placed in the hard.goals
list.
A soft goal is a goal that may not be achieved if it allows all hard goals to be reached.
There is a set of goals that the user can add and remove from the goals
list to get the expected balancing result. Users can create their own goals and add them to Cruise Control.
Below is a description of all existing goals.
Goal | Description |
---|---|
BrokerSetAwareGoal |
BrokerSet — a subset of brokers in the cluster. This goal limits the movement of replicas within the BrokerSet |
RackAwareGoal |
The goal ensures that all replicas of each partition are assigned with regard to the rack |
RackAwareDistributionGoal |
The goal allows placing multiple replicas of a partition in one rack, providing that the replicas of each partition are perfectly evenly distributed across the racks |
ReplicaCapacityGoal |
When this goal is set, the system will try to ensure that the number of replicas of all brokers in the cluster is lower than the specified value |
DiskCapacityGoal |
The goal guarantees that the use of disk resources will be below the specified threshold. This goal will be achieved if the |
NetworkInboundCapacityGoal |
The goal ensures that the volume of the incoming network used by each broker is below the established threshold. The volume is set using the |
NetworkOutboundCapacityGoal |
The goal ensures that the volume of the outgoing network used by each broker is below the established threshold. The volume is set using the parameter |
CpuCapacityGoal |
The goal ensures that CPU load on each broker is below the set threshold; it is set using the |
ReplicaDistributionGoal |
When this goal is set, the system will try to ensure that all brokers in the cluster have the same number of replicas |
PotentialNwOutGoal |
The goal ensures that the potential output capacity of the network (when all replicas become leaders) on each broker does not exceed the outgoing bandwidth of the broker’s network |
DiskUsageDistributionGoal |
When this goal is set, the system will try to ensure that the usage spread among all disks within a broker is kept within a certain range. This goal will be achieved if the |
NetworkInboundUsageDistributionGoal |
When this goal is set, the system will try to ensure that the distribution of incoming network traffic among all brokers is even |
NetworkOutboundUsageDistributionGoal |
When this goal is set, the system will try to ensure that the distribution of outgoing network traffic among all brokers is even |
CpuUsageDistributionGoal |
When setting this goal, the system will try to ensure that the CPU utilization variance among all brokers is uniform |
TopicReplicaDistributionGoal |
A goal that ensures the even distribution of replicas of a single topic across the entire cluster |
LeaderReplicaDistributionGoal |
The goal that ensures an even distribution of leader replicas across all brokers in the cluster |
LeaderBytesInDistributionGoal |
When this goal is set, the system will try to ensure that the incoming network traffic to the leaders of the partitions in the cluster is balanced |
KafkaAssignerDiskUsageDistributionGoal |
The goal that ensures the assignment of all replicas of each partition taking into account the rack |
KafkaAssignerEvenRackAwareGoal |
When setting this goal, the system will attempt to ensure an equal number of replicas on all brokers in the cluster |
PreferredLeaderElectionGoal |
When this goal is set, the system will try to make the first response in the list of responses the leading response for all partitions of the topic |
Anomaly Detector
Anomaly Detector detects various anomalies and, after self-healing is enabled (using the self.healing.enabled
parameter), initiates an attempt to automatically fix certain types of failures. The goals used for self-healing are specified in the self.healing.goals
list.
The anomalies are described below.
Anomaly | Description |
---|---|
Broker failures |
A failure of a non-empty broker or exit from the cluster. This leads to the emergence of offline replicas/under-replicated partitions. In this case, Cruise Control sends a notification, and if self-healing is enabled for this type of anomaly, Cruise Control initiates an operation to relocate all offline replicas to other functioning brokers in the cluster. Since this can also occur during normal cluster failures, the anomaly detector provides a configurable grace period before before the notification is triggered and the cluster is restored. The anomaly detector provides a customizable grace period before the notification is triggered and the cluster is restored. |
Goal violations |
The optimization goal is violated. In this case, Cruise Control will send a notification, and if self-healing is enabled for this type of anomaly, Cruise Control will proactively try to remedy the goal violation by automatically analyzing the load and implementing optimization proposals |
Disk failure |
The failure of one of the non-empty disks. Note that this is only related to the Kafka broker operating on JBOD. In this case, Cruise Control will send a notification, and if self-healing is enabled for this type of anomaly, Cruise Control will initiate the operation to move all standalone replicas to other operational brokers in the cluster |
Metric anomaly |
One of the metrics collected by Cruise Control detects an anomaly (for example, a sharp increase in log cleanup time metric). In this case, Cruise Control sends a notification. Currently, there is no standardized self-recovery operation for metric anomalies, as different metric anomalies require different remediation methods. A user can define their own anomaly and remediation operation by implementing their own MetricAnomaly and MetricAnomalyFinder |
Topic anomaly |
One or several topics in the cluster violate user-defined properties (for example, some disk partitions are too large). In this case, Cruise Control will send a notification. Currently, there is no standardized self-recovery operation for topics' anomalies, as different anomalies in topics require different correction methods. The user can define their own anomaly and correction operation by implementing their own TopicAnomaly and TopicAnomalyFinder methods |
Executor
Executor is responsible for executing optimization proposals received from the analyzer. The executor can be interrupted while executing proposals. The executor ensures that the execution is resource-aware and does not overload any broker. Some parameters can control partition movements during rebalancing, such as num.concurrent.partition.movements.per.broker
or max.num.cluster.partition.movements
.
NOTE
For information on the installation, configuration, and use of Cruise Control in ADS, refer to the Cruise Control usage example article. |