Выбор лидера в ZooKeeper
ПРИМЕЧАНИЕ
|
ZooKeeper — централизованный сервис, использующийся для хранения информации о конфигурации, обеспечения распределенной синхронизации брокеров Kafka.
Атомарная система обмена сообщениями, лежащая в основе ZooKeeper, основана на принципах алгоритма консенсуса.
Алгоритм консенсуса — совокупность принципов и правил, благодаря которым все участвующие в кластере узлы (ноды) автоматически приходят к консенсусу о текущем состоянии сети.
ZooKeeper реализует один из протоколов алгоритма консенсуса — ZAB (ZooKeeper Atomic BroadCast).
Протокол алгоритма консенсуса
Протокол ZAB гарантирует, что репликация ZooKeeper выполняется по порядку, а также отвечает за выбор ведущих узлов и восстановление любых отказавших узлов.
Ниже описаны основные положения ZAB.
Обмен сообщениями
В части обмена сообщениями ZAB использует следующие понятия:
-
Пакет (packet) — последовательность байтов, отправляемых по каналу FIFO.
-
Сообщение (message) — последовательность байтов, которая будет атомарно транслироваться на все серверы ZooKeeper. Сообщение помещается в предложение и согласовывается до того, как оно будет доставлено.
-
Предложение (proposal) — единица согласования. Предложения согласовываются путем обмена пакетами при наличии кворума серверов ZooKeeper.
Ниже показана модель обмена сообщениями, основанная на согласовании предложений, принятая в ZooKeeper.
Для связи с репликами при согласовании предложений используется модель, в которой для каждой новой записи требуется не менее трех транзакций: предложение (propose), подтверждение (ack) и фиксация (commit).
Принципы доставки сообщений
Протокол ZAB предполагает создание каналов FIFO (в каждой сессии сервер исполняет запросы клиента поочередно в порядке поступления) типа "точка-точка" между серверами. Для связи используется протокол TCP, обеспечивая следующие свойства:
-
Надежная доставка — если сообщение
m
доставляется одним сервером, оно в конечном итоге будет доставлено всеми серверами. -
Общий заказ — если сообщение
a
доставлено до сообщенияb
одним сервером, тоa
будет доставлено доb
всеми серверами. Если сообщенияa
иb
доставлены, то либоa
будет доставлено раньшеb
, либоb
будет доставлено раньшеa
. -
Причинный порядок — если сообщение
b
отправляется после того, как сообщениеa
было доставлено отправителемb
, сообщениеa
должно быть заказано доb
. Если отправитель отправляетc
после отправкиb
,c
должно быть заказано послеb
. -
Заказанная доставка — данные доставляются в том же порядке, в котором они были отправлены, и сообщение
m
доставляется только после того, как все сообщения, отправленные доm
, были доставлены. Следствием этого является то, что если сообщениеm
потеряно, все сообщения послеm
будут потеряны. -
Нет сообщения после закрытия — как только канал FIFO закрыт, от него не будут приниматься никакие сообщения.
Использование тайм-аутов обеспечивает достижения консенсуса при наличии сбоев.
Эпоха контроллера
Каждый кластер имеет один ведущий узел (Leader), а остальные узлы являются последователями (Follower).
Ниже приведена общая последовательность выбора лидера.
-
От сервера поступает предложение (propose) новой эпохи и каждому предложению присваивается свой zxid.
-
Каждый узел подтверждает (ack) предложение только в том случае, если он не знает ни одного другого лидера с более высоким номером эпохи в составе zxid, или, если эпоха такая же, с более высоким счетчиком транзакции. Прежде чем лидер будет выбран, он должен собрать голоса от кворума узлов. Кворумы для определения лидера должны иметь размер
(n/2+1)
, гдеn
— количество серверов, составляющих сервис ZooKeeper. -
После подтверждения кворума новый лидер совершает (commit) создание новой эпохи и и устанавливает следующий zxid для использования.
-
Последователь зафиксирует становление новой эпохи.
Ниже описаны все фазы эпохи контроллера.
Каждая эпоха состоит из трех фаз, и каждый узел может находиться в одной из этих трех фаз в любой момент времени:
-
Leader election — фаза выбора лидера по текущим конфигурациям кворума. В любой момент времени может быть не более одного лидера. Окончание фазы наступает после подтверждения новой эпохи всем последователями.
-
Synchronization — фаза синхронизации, во время которой новый лидер синхронизирует имеющиеся реплики с предыдущей эпохой и со всеми последователями (followers) как лидер. Окончание фазы наступает после того, как кворум последователей признал, что они синхронизированы с лидером.
-
Broadcast — фаза трансляции, нормальный режим работы, при котором лидер продолжает предлагать новые клиентские запросы. Окончание фазы наступает после сбоя лидера.
Выбор контроллера Kafka
Контроллер Kafka — брокер, который отвечает за:
-
ведение списка синхронизированных реплик (ISR);
-
выбор нового лидера журнала (партиции) из ISR, когда текущий лидер выходит из строя;
-
управление партициями;
-
назначение партиций пользователям.
Контроллером может являться только один брокер одновременно. Выбор контроллера выполняется автоматически при помощи ZooKeeper в следующей последовательности:
-
Каждый брокер пытается создать эфемерный znode с наименованием
/controller
(название может иным) в ZooKeeper. -
Первый брокер, создавший этот эфемерный znode, возьмет на себя роль контроллера, и каждый последующий запрос брокера будет получать сообщение
node already exists
. -
После того, как контроллер установлен, ему назначается "эпоха контроллера".
-
Текущая эпоха контроллера передается по всему кластеру, и если брокер получает запрос контроллера от более старой эпохи контроллера, он игнорируется.
-
Если происходит сбой контроллера, происходит новый выбор контроллера в соответствии с принципами выбора лидера протокола ZAB, описанного выше, и создается новая эпоха контроллера, которая передается в кластер.
Лидер реплицированного журнала Kafka
Лидер (Leader) журнала (партиции) — брокер, который работает с клиентами. Именно лидер работает с производителями сообщений. К лидеру осуществляют запросы последователи (followers) — брокеры, которые хранят реплику всех данных партиций. Потребители могут читать сообщения как с лидера, так и с последователей.
Алгоритм распределенного консенсуса ZAB также используется для согласования между последовательностью записи и чтения сообщений и управлением репликацией партиций в Kafka для обеспечения доступности и надежности системы.
Ниже описано, как в общем случае автоматически осуществляется выбор лидера и управление репликацией партиций в Kafka при помощи сервиса ZooKeeper.
-
После создания топика контроллер Kafka выбирает лидера партиции среди активных брокеров при помощи znode, созданной в ZooKeeper. Активными считаются брокеры, эфемерные znode которых созданы в ZooKeeper и которые посылают контрольные сигналы (heartbeat) на сервер ZooKeeper для поддержания сессии. Для отслеживания состояния брокеров нода контроллера подписывается (watches) на ноды брокеров.
-
Получив ID нового лидера, контроллер записывает информацию о новом лидере в хранилище ZooKeeper и отправляет данные о новом лидере каждому брокеру, который размещает реплику этой партиции.
-
Производитель (producer) подключается к ZooKeeper и получает ID лидера партиции, куда он будет записывать данные.
-
Производитель записывает сообщение в топик на лидера партиции.
-
Взаимодействие между лидером партиции и последователями разделяется на три фазы:
-
propose — лидер предлагает создать репликацию последователям.
-
ack — лидер получает ответ от последователей о создании реплики.
-
commit — лидер фиксирует создание ISR при получении сообщения о создании реплики от всех последователей.
Последователи, которые содержат реплики всех записанных сообщений — ISR-реплики — являются кандидатами в лидеры при отказе действующего лидера.
-
-
Лидер передает данные о записанной партиции (метаданные) и о созданных ISR в контроллер.
-
Контроллер записывает информацию о созданных ISR в хранилище ZooKeeper, а также метаданные партиции.
-
Потребитель (consumer) подключается к ZooKeeper и получает ID лидера или последователей для запрашиваемой партиции, a также данные о смещении (offset), с которого следует читать.
-
Потребитель запрашивает (pull) сообщения у лидера или последователей.
-
Лидер получает данные о смещении (offset) и передает их в ZooKeeper.
Ниже описано, как осуществляется выбор лидера в Kafka в случае отказа лидера партиции.
-
При отказе лидера его эфемерный znode удаляется.
-
Kонтроллер получает информацию от своей znode об отказе лидера.
-
Kонтроллер выбирает нового лидера из числа ISR-реплик.
-
Получив ID нового лидера, контроллер записывает информацию о новом лидере в хранилище ZooKeeper и отправляет данные о новом лидере каждому брокеру, который размещает реплику этой партиции.
Далее работа с репликами, производителем и потребителем производится также, как описано для общего случая выше.
Этот выбор лидера является "чистым" (clean) — он гарантирует отсутствие потери данных.
ПРИМЕЧАНИЕ
После отказов брокеров может произойти неравномерное распределение и большинство лидеров-реплик партиций окажутся на одном узле кластера. В этом случае можно вручную перебалансировать кластер при помощи скрипта /usr/lib/kafka/bin/kafka-leader-election.sh c указанием опции |
Ниже описано развитие событий в Kafka в случае отказа брокера-последователя.
-
Лидер ждет ответ о создании реплики (ack) от последователя по истечении заданного времени (время ответа определяется параметром Kafka-брокера
replica.lag.time.max.ms
, по умолчанию 30000 мс). -
При истечении времени лидер считает реплику несинхронизированной и удаляет ее из ISR.
-
Информация об удалении ISR поступает в контроллер и ZooKeeper. Данный последователь перестает быть кандидатом в лидеры партиции.
Далее работа с репликами, производителем и потребителем производится также, как описано для общего случая выше.
ВНИМАНИЕ
Если включить параметр Kafka-брокера |