Выбор лидера в ZooKeeper

ПРИМЕЧАНИЕ
  • Концепции хранения и основные компоненты Kafka описаны в статье Концепции хранения в Kafka.

  • Концепции и основные компоненты ZooKeeper описаны в статье ZooKeeper.

ZooKeeper — централизованный сервис, использующийся для хранения информации о конфигурации, обеспечения распределенной синхронизации брокеров Kafka.

Атомарная система обмена сообщениями, лежащая в основе ZooKeeper, основана на принципах алгоритма консенсуса.

Алгоритм консенсуса — совокупность принципов и правил, благодаря которым все участвующие в кластере узлы (ноды) автоматически приходят к консенсусу о текущем состоянии сети.

ZooKeeper реализует один из протоколов алгоритма консенсуса — ZAB (ZooKeeper Atomic BroadCast).

Протокол алгоритма консенсуса

Протокол ZAB гарантирует, что репликация ZooKeeper выполняется по порядку, а также отвечает за выбор ведущих узлов и восстановление любых отказавших узлов.

Ниже описаны основные положения ZAB.

Обмен сообщениями

В части обмена сообщениями ZAB использует следующие понятия:

  • Пакет (packet) — последовательность байтов, отправляемых по каналу FIFO.

  • Сообщение (message) — последовательность байтов, которая будет атомарно транслироваться на все серверы ZooKeeper. Сообщение помещается в предложение и согласовывается до того, как оно будет доставлено.

  • Предложение (proposal) — единица согласования. Предложения согласовываются путем обмена пакетами при наличии кворума серверов ZooKeeper.

Ниже показана модель обмена сообщениями, основанная на согласовании предложений, принятая в ZooKeeper.

Модель обмена сообщениями
Модель обмена сообщениями
Модель обмена сообщениями
Модель обмена сообщениями

Для связи с репликами при согласовании предложений используется модель, в которой для каждой новой записи требуется не менее трех транзакций: предложение (propose), подтверждение (ack) и фиксация (commit).

Принципы доставки сообщений

Протокол ZAB предполагает создание каналов FIFO (в каждой сессии сервер исполняет запросы клиента поочередно в порядке поступления) типа "точка-точка" между серверами. Для связи используется протокол TCP, обеспечивая следующие свойства:

  • Надежная доставка — если сообщение m доставляется одним сервером, оно в конечном итоге будет доставлено всеми серверами.

  • Общий заказ — если сообщение a доставлено до сообщения b одним сервером, то a будет доставлено до b всеми серверами. Если сообщения a и b доставлены, то либо a будет доставлено раньше b, либо b будет доставлено раньше a.

  • Причинный порядок — если сообщение b отправляется после того, как сообщение a было доставлено отправителем b, сообщение a должно быть заказано до b. Если отправитель отправляет c после отправки b, c должно быть заказано после b.

  • Заказанная доставка — данные доставляются в том же порядке, в котором они были отправлены, и сообщение m доставляется только после того, как все сообщения, отправленные до m, были доставлены. Следствием этого является то, что если сообщение m потеряно, все сообщения после m будут потеряны.

  • Нет сообщения после закрытия — как только канал FIFO закрыт, от него не будут приниматься никакие сообщения.

Использование тайм-аутов обеспечивает достижения консенсуса при наличии сбоев.

Эпоха контроллера

Каждый кластер имеет один ведущий узел (Leader), а остальные узлы являются последователями (Follower).

Для гарантии общего порядка предложений используется идентификатор транзакции ZooKeeper - zxid.

Zxid состоит из двух частей: номера эпохи и счетчика транзанкции.

Zxid является 64-битным числом — старшие 32 бита для номера эпохи и младшие 32 бита для счетчика транзакции.

Новый номер эпохи представляет новый лидер. В результате каждой транзакции в эпоху каждого лидера присваивается свой уникальный zxid.

Ниже приведена общая последовательность выбора лидера.

Выбор лидера в ZooKeeper
Выбор лидера в ZooKeeper
Выбор лидера в ZooKeeper
Выбор лидера в ZooKeeper
  1. От сервера поступает предложение (propose) новой эпохи и каждому предложению присваивается свой zxid.

  2. Каждый узел подтверждает (ack) предложение только в том случае, если он не знает ни одного другого лидера с более высоким номером эпохи в составе zxid, или, если эпоха такая же, с более высоким счетчиком транзакции. Прежде чем лидер будет выбран, он должен собрать голоса от кворума узлов. Кворумы для определения лидера должны иметь размер (n/2+1), где n — количество серверов, составляющих сервис ZooKeeper.

  3. После подтверждения кворума новый лидер совершает (commit) создание новой эпохи и и устанавливает следующий zxid для использования.

  4. Последователь зафиксирует становление новой эпохи.

Ниже описаны все фазы эпохи контроллера.

Эпоха контроллера
Эпоха контроллера
Эпоха контроллера
Эпоха контроллера

Каждая эпоха состоит из трех фаз, и каждый узел может находиться в одной из этих трех фаз в любой момент времени:

  • Leader election — фаза выбора лидера по текущим конфигурациям кворума. В любой момент времени может быть не более одного лидера. Окончание фазы наступает после подтверждения новой эпохи всем последователями.

  • Synchronization — фаза синхронизации, во время которой новый лидер синхронизирует имеющиеся реплики с предыдущей эпохой и со всеми последователями (followers) как лидер. Окончание фазы наступает после того, как кворум последователей признал, что они синхронизированы с лидером.

  • Broadcast — фаза трансляции, нормальный режим работы, при котором лидер продолжает предлагать новые клиентские запросы. Окончание фазы наступает после сбоя лидера.

Выбор контроллера Kafka

Контроллер Kafka — брокер, который отвечает за:

  • ведение списка синхронизированных реплик (ISR);

  • выбор нового лидера журнала (партиции) из ISR, когда текущий лидер выходит из строя;

  • управление партициями;

  • назначение партиций пользователям.

Контроллером может являться только один брокер одновременно. Выбор контроллера выполняется автоматически при помощи ZooKeeper в следующей последовательности:

  1. Каждый брокер пытается создать эфемерный znode с наименованием /controller (название может иным) в ZooKeeper.

  2. Первый брокер, создавший этот эфемерный znode, возьмет на себя роль контроллера, и каждый последующий запрос брокера будет получать сообщение node already exists.

  3. После того, как контроллер установлен, ему назначается "эпоха контроллера".

  4. Текущая эпоха контроллера передается по всему кластеру, и если брокер получает запрос контроллера от более старой эпохи контроллера, он игнорируется.

  5. Если происходит сбой контроллера, происходит новый выбор контроллера в соответствии с принципами выбора лидера протокола ZAB, описанного выше, и создается новая эпоха контроллера, которая передается в кластер.

Лидер реплицированного журнала Kafka

Лидер (Leader) журнала (партиции) — брокер, который работает с клиентами. Именно лидер работает с производителями сообщений. К лидеру осуществляют запросы последователи (followers) — брокеры, которые хранят реплику всех данных партиций. Потребители могут читать сообщения как с лидера, так и с последователей.

Алгоритм распределенного консенсуса ZAB также используется для согласования между последовательностью записи и чтения сообщений и управлением репликацией партиций в Kafka для обеспечения доступности и надежности системы.

Ниже описано, как в общем случае автоматически осуществляется выбор лидера и управление репликацией партиций в Kafka при помощи сервиса ZooKeeper.

Выбор лидера и работа с репликами
Выбор лидера и работа с репликами
Выбор лидера и работа с репликами
Выбор лидера и работа с репликами
  1. После создания топика контроллер Kafka выбирает лидера партиции среди активных брокеров при помощи znode, созданной в ZooKeeper. Активными считаются брокеры, эфемерные znode которых созданы в ZooKeeper и которые посылают контрольные сигналы (heartbeat) на сервер ZooKeeper для поддержания сессии. Для отслеживания состояния брокеров нода контроллера подписывается (watches) на ноды брокеров.

  2. Получив ID нового лидера, контроллер записывает информацию о новом лидере в хранилище ZooKeeper и отправляет данные о новом лидере каждому брокеру, который размещает реплику этой партиции.

  3. Производитель (producer) подключается к ZooKeeper и получает ID лидера партиции, куда он будет записывать данные.

  4. Производитель записывает сообщение в топик на лидера партиции.

  5. Взаимодействие между лидером партиции и последователями разделяется на три фазы:

    • propose — лидер предлагает создать репликацию последователям.

    • ack — лидер получает ответ от последователей о создании реплики.

    • commit — лидер фиксирует создание ISR при получении сообщения о создании реплики от всех последователей.

      Последователи, которые содержат реплики всех записанных сообщений — ISR-реплики — являются кандидатами в лидеры при отказе действующего лидера.

  6. Лидер передает данные о записанной партиции (метаданные) и о созданных ISR в контроллер.

  7. Контроллер записывает информацию о созданных ISR в хранилище ZooKeeper, а также метаданные партиции.

  8. Потребитель (consumer) подключается к ZooKeeper и получает ID лидера или последователей для запрашиваемой партиции, a также данные о смещении (offset), с которого следует читать.

  9. Потребитель запрашивает (pull) сообщения у лидера или последователей.

  10. Лидер получает данные о смещении (offset) и передает их в ZooKeeper.

Ниже описано, как осуществляется выбор лидера в Kafka в случае отказа лидера партиции.

Отказ лидера партиции
Отказ лидера партиции
Отказ лидера партиции
Отказ лидера партиции
  1. При отказе лидера его эфемерный znode удаляется.

  2. Kонтроллер получает информацию от своей znode об отказе лидера.

  3. Kонтроллер выбирает нового лидера из числа ISR-реплик.

  4. Получив ID нового лидера, контроллер записывает информацию о новом лидере в хранилище ZooKeeper и отправляет данные о новом лидере каждому брокеру, который размещает реплику этой партиции.

Далее работа с репликами, производителем и потребителем производится также, как описано для общего случая выше.

Этот выбор лидера является "чистым" (clean) — он гарантирует отсутствие потери данных.

ПРИМЕЧАНИЕ

После отказов брокеров может произойти неравномерное распределение и большинство лидеров-реплик партиций окажутся на одном узле кластера. В этом случае можно вручную перебалансировать кластер при помощи скрипта /usr/lib/kafka/bin/kafka-leader-election.sh c указанием опции --election-type со значением PREFERRED на хосте, где устновлен сервис Kafka.

Ниже описано развитие событий в Kafka в случае отказа брокера-последователя.

Отказ последователя
Отказ последователя
Отказ последователя
Отказ последователя
  1. Лидер ждет ответ о создании реплики (ack) от последователя по истечении заданного времени (время ответа определяется параметром Kafka-брокера replica.lag.time.max.ms, по умолчанию 30000 мс).

  2. При истечении времени лидер считает реплику несинхронизированной и удаляет ее из ISR.

  3. Информация об удалении ISR поступает в контроллер и ZooKeeper. Данный последователь перестает быть кандидатом в лидеры партиции.

Далее работа с репликами, производителем и потребителем производится также, как описано для общего случая выше.

ВНИМАНИЕ

Если включить параметр Kafka-брокера unclean.leader.election.enable, новые лидеры для партиций будут создаваться не из списка ISR. При этом будут потеряны все несинхронизированные сообщения, которые были отправлены старому лидеру. В случае, когда нет ISR, кроме ставшего недоступным лидера, и возврат лидера в сеть невозможен, это единственный вариант оставить топик доступным для потребителей. Для отдельного топика это можно выполнить при помощи скрипта /usr/lib/kafka/bin/kafka-configs.sh c использованием опции --add-config c указанием параметра unclean.leader.election.enable=true.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней