Leader election in ZooKeeper
NOTE
|
ZooKeeper is a centralized service used to store configuration information, provide distributed synchronization of Kafka brokers.
The atomic messaging system at the heart of ZooKeeper is based on the principles of the consensus algorithm.
Consensus algorithm — a set of principles and rules, due to which all nodes participating in the cluster automatically come to a consensus on the current state of the network.
ZooKeeper implements one of the consensus algorithm protocols — ZAB (ZooKeeper Atomic BroadCast).
Consensus algorithm protocol
The ZAB protocol ensures that ZooKeeper replication occurs in order, and is also responsible for selecting lead nodes and recovering any failed nodes.
The main provisions of ZAB are described below.
Message exchange
In the messaging part, ZAB uses the following concepts:
-
Packet is a sequence of bytes sent over the FIFO channel.
-
Message is a sequence of bytes that will be atomically broadcast to all ZooKeeper servers. The message is placed in the offer and agreed upon before it is delivered.
-
Proposal is a unit of agreement. Proposal are agreed by exchanging packets when there is a quorum of ZooKeeper servers.
The proposal approval messaging model used by ZooKeeper is shown below.
To communicate with replicas, proposal approval uses a model in which each new record requires at least three transactions: a proposal (propose), an acknowledgment (ack), and a commit.
Message delivery principles
The ZAB protocol involves the creation of FIFO channels (in each session, the server executes client requests one by one in the order in which they are received) of the point-to-point type between servers. The TCP protocol is used for communication, providing the following properties:
-
Reliable delivery — if message
m
is delivered by one server, it will eventually be delivered by all servers. -
Common order — if message
a
is delivered before messageb
by one server, thena
will be delivered beforeb
by all servers. If messagesa
andb
are delivered, then eithera
will be delivered beforeb
orb
will be delivered beforea
. -
Causal order — if message
b
is sent after messagea
was delivered by senderb
, messagea
must be ordered beforeb
. If the sender sendsc
after sendingb
,c
must be ordered afterb
. -
Ordered delivery — data is delivered in the same order as it was sent, and message
m
is only delivered after all messages sent beforem
have been delivered. The implication of this is that if them
message is lost, all messages afterm
will be lost. -
No message after close — once a FIFO channel is closed, no messages will be received from it.
The use of timeouts ensures that consensus is reached in the presence of failures.
Controller epoch
Every cluster has one leader node and the remaining nodes are followers.
Below is the general sequence for a leader election.
-
A proposal (propose) of a new epoch arrives from the server, and each proposal is assigned its own zxid.
-
Every node acknowledges (ack) the offer only if it does not know any other leader with a higher epoch number in zxid, or if the epoch is the same, with a higher transaction counter. Before a leader can be chosen, it must collect votes from a quorum of nodes. Quorums to determine the leader must be
(n/2+1)
, wheren
is the number of servers that make up the ZooKeeper service. -
After the quorum is confirmed, the new leader commits the creation of a new epoch and sets the next zxid to use.
-
The follower will record the formation of a new epoch.
Each phase of the controller epoch is described below.
Each epoch consists of three phases, and each node can be in one of these three phases at any given time:
-
Leader election — leader election phase based on current quorum configurations. There can be at most one leader at any given time. The end of the phase comes after the confirmation of the new epoch by all followers.
-
Synchronization — synchronization phase, during which the new leader synchronizes the available replicas with the previous epoch and with all the followers as a leader. The end of the phase occurs after the quorum of followers has recognized that they are in sync with the leader.
-
Broadcast — translation phase, the normal mode of operation in which the leader continues to offer new client requests. The end of the phase occurs after the leader failure.
Kafka controller election logic
Kafka controller is a broker, which is responsible for:
-
maintaining a list of synchronized replicas (ISR);
-
choosing a new log (partition) leader from the ISR when the current leader fails;
-
partition management;
-
assignment of partitions to users.
Only one broker can be a controller at a time. Controller selection is done automatically by ZooKeeper in the following sequence:
-
Every broker tries to create an ephemeral znode called
/controller
(name may vary) in ZooKeeper. -
The first broker to create this ephemeral znode will assume the role of controller, and every subsequent request from the broker will receive a
node already exists
message. -
Once a controller is installed, it is assigned a "controller epoch".
-
The current controller epoch is passed around the cluster, and if the broker receives a controller request from an older controller epoch, it is ignored.
-
If a controller fails, a new controller election occurs in accordance with the ZAB protocol leader election principles described by above, and a new controller epoch is created and passed to the cluster.
Kafka replicated log leader
Leader of a journal (partition) is a broker that works with clients. It is the leader that works with message producers. Requests to the leader are made by followers — brokers that store a replica of all these partitions. Consumers can read messages from the leader and from the followers.
The ZAB distributed consensus algorithm is also used to negotiate between message writing and reading sequence and managing partition replication in Kafka for availability and system reliability.
The following describes how, in general, leader selection and partition replication management in Kafka is automatically performed using the ZooKeeper service.
-
After a topic is created, the Kafka controller chooses a partition leader among the active brokers using a znode created in ZooKeeper. Active brokers are those which ephemeral znodes are created in ZooKeeper and which send control signals (heartbeat) to the ZooKeeper server to maintain the session. To track the status of brokers, the controller node subscribes (watches) to broker nodes.
-
Having received the ID of the new leader, the controller writes information about the new leader to the ZooKeeper storage and sends the data about the new leader to each broker that hosts a replica of this partition.
-
The producer connects to ZooKeeper and receives the ID of the partition leader where it will write data.
-
The producer writes a message to the topic on the partition leader.
-
The interaction between the leader of the partition and the followers is divided into three phases:
-
propose — the leader proposes to create replication to followers.
-
ack — the leader receives a response from the followers about the creation of a replica.
-
commit — the leader commits the creation of an ISR when it receives a replica creation message from all followers.
Followers that contain replicas of all recorded messages — ISR replicas — are candidate leaders if the incumbent leader fails.
-
-
The leader sends data about the written partition (metadata) and about the created ISRs to the controller.
-
The controller writes information about the created ISRs to the ZooKeeper repository, as well as partition metadata.
-
The consumer connects to ZooKeeper and gets the ID of the leader or followers for the requested partition, as well as information about the offset from which to read.
-
The consumer requests (pull) messages from the leader or followers.
-
The leader receives the offset data and passes it to ZooKeeper.
The following describes how leader election is performed in Kafka in the event of a failure of the partition leader.
-
If the leader fails, its ephemeral znode is deleted.
-
The controller receives information from its znode about the failure of the leader.
-
The controller selects a new leader from the number of ISR replicas.
-
Having received the ID of the new leader, the controller records the information about the new leader in the ZooKeeper storage and sends the data about the new leader to each broker that hosts a replica of this partition.
Further work with replicas, producer, and consumer is performed in the same way as described for the general case above.
This leader election is clean — it guarantees no data loss.
NOTE
After broker failures, uneven distribution can occur, and most partition replica leaders will end up on the same cluster node. In this case, you can manually rebalance the cluster using the /usr/lib/kafka/bin/kafka-leader-election.sh script with the |
The following describes the development of events in Kafka in the event of failure of the broker-follower.
-
The leader waits for a response about creating a replica (ack) from the follower after the specified time has elapsed (the response time is determined by the Kafka broker parameter
replica.lag.time.max.ms
, by default 30000 ms). -
When time expires, the leader broker considers the replica to be out of sync and removes it from the ISR.
-
Information about ISR removal in broker controller and ZooKeeper. This follower ceases to be a candidate for the leader of the partition.
Further work with replicas, producer, and consumer is performed in the same way as described for the general case above.
CAUTION
If you enable the Kafka broker option |