Leader election in ZooKeeper

NOTE
  • The concepts of storage and the main components of Kafka are described in the Storage concepts in Kafka article.

  • The concepts and the main components of ZooKeeper are described in the ZooKeeper article.

ZooKeeper is a centralized service used to store configuration information, provide distributed synchronization of Kafka brokers.

The atomic messaging system at the heart of ZooKeeper is based on the principles of the consensus algorithm.

Consensus algorithm — a set of principles and rules, due to which all nodes participating in the cluster automatically come to a consensus on the current state of the network.

ZooKeeper implements one of the consensus algorithm protocols — ZAB (ZooKeeper Atomic BroadCast).

Consensus algorithm protocol

The ZAB protocol ensures that ZooKeeper replication occurs in order, and is also responsible for selecting lead nodes and recovering any failed nodes.

The main provisions of ZAB are described below.

Message exchange

In the messaging part, ZAB uses the following concepts:

  • Packet is a sequence of bytes sent over the FIFO channel.

  • Message is a sequence of bytes that will be atomically broadcast to all ZooKeeper servers. The message is placed in the offer and agreed upon before it is delivered.

  • Proposal is a unit of agreement. Proposal are agreed by exchanging packets when there is a quorum of ZooKeeper servers.

The proposal approval messaging model used by ZooKeeper is shown below.

Messaging model
Messaging model
Messaging model
Messaging model

To communicate with replicas, proposal approval uses a model in which each new record requires at least three transactions: a proposal (propose), an acknowledgment (ack), and a commit.

Message delivery principles

The ZAB protocol involves the creation of FIFO channels (in each session, the server executes client requests one by one in the order in which they are received) of the point-to-point type between servers. The TCP protocol is used for communication, providing the following properties:

  • Reliable delivery — if message m is delivered by one server, it will eventually be delivered by all servers.

  • Common order — if message a is delivered before message b by one server, then a will be delivered before b by all servers. If messages a and b are delivered, then either a will be delivered before b or b will be delivered before a.

  • Causal order — if message b is sent after message a was delivered by sender b, message a must be ordered before b. If the sender sends c after sending b, c must be ordered after b.

  • Ordered delivery — data is delivered in the same order as it was sent, and message m is only delivered after all messages sent before m have been delivered. The implication of this is that if the m message is lost, all messages after m will be lost.

  • No message after close — once a FIFO channel is closed, no messages will be received from it.

The use of timeouts ensures that consensus is reached in the presence of failures.

Controller epoch

Every cluster has one leader node and the remaining nodes are followers.

The ZooKeeper transaction ID — zxid — is used to guarantee the overall order of the offers.

Zxid consists of two parts: the epoch number and the transaction counter.

Zxid is a 64-bit number — the upper 32 bits for the epoch number and the lower 32 bits for the transaction counter.

A new epoch number represents a new leader. As a result of each transaction in the epoch of each leader, a unique zxid is assigned.

Below is the general sequence for a leader election.

Leader election in ZooKeeper
Leader election in ZooKeeper
Leader election in ZooKeeper
Leader election in ZooKeeper
  1. A proposal (propose) of a new epoch arrives from the server, and each proposal is assigned its own zxid.

  2. Every node acknowledges (ack) the offer only if it does not know any other leader with a higher epoch number in zxid, or if the epoch is the same, with a higher transaction counter. Before a leader can be chosen, it must collect votes from a quorum of nodes. Quorums to determine the leader must be (n/2+1), where n is the number of servers that make up the ZooKeeper service.

  3. After the quorum is confirmed, the new leader commits the creation of a new epoch and sets the next zxid to use.

  4. The follower will record the formation of a new epoch.

Each phase of the controller epoch is described below.

Controller epoch
Controller epoch
Controller epoch
Controller epoch

Each epoch consists of three phases, and each node can be in one of these three phases at any given time:

  • Leader election — leader election phase based on current quorum configurations. There can be at most one leader at any given time. The end of the phase comes after the confirmation of the new epoch by all followers.

  • Synchronization — synchronization phase, during which the new leader synchronizes the available replicas with the previous epoch and with all the followers as a leader. The end of the phase occurs after the quorum of followers has recognized that they are in sync with the leader.

  • Broadcast — translation phase, the normal mode of operation in which the leader continues to offer new client requests. The end of the phase occurs after the leader failure.

Kafka controller election logic

Kafka controller is a broker, which is responsible for:

  • maintaining a list of synchronized replicas (ISR);

  • choosing a new log (partition) leader from the ISR when the current leader fails;

  • partition management;

  • assignment of partitions to users.

Only one broker can be a controller at a time. Controller selection is done automatically by ZooKeeper in the following sequence:

  1. Every broker tries to create an ephemeral znode called /controller (name may vary) in ZooKeeper.

  2. The first broker to create this ephemeral znode will assume the role of controller, and every subsequent request from the broker will receive a node already exists message.

  3. Once a controller is installed, it is assigned a "controller epoch".

  4. The current controller epoch is passed around the cluster, and if the broker receives a controller request from an older controller epoch, it is ignored.

  5. If a controller fails, a new controller election occurs in accordance with the ZAB protocol leader election principles described by above, and a new controller epoch is created and passed to the cluster.

Kafka replicated log leader

Leader of a journal (partition) is a broker that works with clients. It is the leader that works with message producers. Requests to the leader are made by followers — brokers that store a replica of all these partitions. Consumers can read messages from the leader and from the followers.

The ZAB distributed consensus algorithm is also used to negotiate between message writing and reading sequence and managing partition replication in Kafka for availability and system reliability.

The following describes how, in general, leader selection and partition replication management in Kafka is automatically performed using the ZooKeeper service.

Electing a leader and working with replicas
Electing a leader and working with replicas
Electing a leader and working with replicas
Electing a leader and working with replicas
  1. After a topic is created, the Kafka controller chooses a partition leader among the active brokers using a znode created in ZooKeeper. Active brokers are those which ephemeral znodes are created in ZooKeeper and which send control signals (heartbeat) to the ZooKeeper server to maintain the session. To track the status of brokers, the controller node subscribes (watches) to broker nodes.

  2. Having received the ID of the new leader, the controller writes information about the new leader to the ZooKeeper storage and sends the data about the new leader to each broker that hosts a replica of this partition.

  3. The producer connects to ZooKeeper and receives the ID of the partition leader where it will write data.

  4. The producer writes a message to the topic on the partition leader.

  5. The interaction between the leader of the partition and the followers is divided into three phases:

    • propose — the leader proposes to create replication to followers.

    • ack — the leader receives a response from the followers about the creation of a replica.

    • commit — the leader commits the creation of an ISR when it receives a replica creation message from all followers.

      Followers that contain replicas of all recorded messages — ISR replicas — are candidate leaders if the incumbent leader fails.

  6. The leader sends data about the written partition (metadata) and about the created ISRs to the controller.

  7. The controller writes information about the created ISRs to the ZooKeeper repository, as well as partition metadata.

  8. The consumer connects to ZooKeeper and gets the ID of the leader or followers for the requested partition, as well as information about the offset from which to read.

  9. The consumer requests (pull) messages from the leader or followers.

  10. The leader receives the offset data and passes it to ZooKeeper.

The following describes how leader election is performed in Kafka in the event of a failure of the partition leader.

Partition leader failure
Partition leader failure
Partition leader failure
Partition leader failure
  1. If the leader fails, its ephemeral znode is deleted.

  2. The controller receives information from its znode about the failure of the leader.

  3. The controller selects a new leader from the number of ISR replicas.

  4. Having received the ID of the new leader, the controller records the information about the new leader in the ZooKeeper storage and sends the data about the new leader to each broker that hosts a replica of this partition.

Further work with replicas, producer, and consumer is performed in the same way as described for the general case above.

This leader election is clean — it guarantees no data loss.

NOTE

After broker failures, uneven distribution can occur, and most partition replica leaders will end up on the same cluster node. In this case, you can manually rebalance the cluster using the /usr/lib/kafka/bin/kafka-leader-election.sh script with the --election-type option set to PREFERRED on the host where the Kafka service is installed.

The following describes the development of events in Kafka in the event of failure of the broker-follower.

Follower failure
Follower failure
Follower failure
Follower failure
  1. The leader waits for a response about creating a replica (ack) from the follower after the specified time has elapsed (the response time is determined by the Kafka broker parameter replica.lag.time.max.ms, by default 30000 ms).

  2. When time expires, the leader broker considers the replica to be out of sync and removes it from the ISR.

  3. Information about ISR removal in broker controller and ZooKeeper. This follower ceases to be a candidate for the leader of the partition.

Further work with replicas, producer, and consumer is performed in the same way as described for the general case above.

CAUTION

If you enable the Kafka broker option unclean.leader.election.enable, new leaders for partitions will be created not from the ISR list. This will lose any out-of-sync messages that were sent to the old leader. In the case when there is no ISR, except for the leader that has become inaccessible, and it is impossible to return the leader to the network, this is the only option to leave the topic available to consumers. For a separate topic, this can be done using the /usr/lib/kafka/bin/kafka-configs.sh script using the --add-config option with the unclean.leader.election.enable=true parameter.

Found a mistake? Seleсt text and press Ctrl+Enter to report it