Гарантии доставки сообщений в Kafka
В данной статье описаны гарантии доставки сообщений при использовании семантик доставки, существующих в Kafka.
Основные проблемы доставки сообщений, возникающие при работе с Kafka:
-
потеря сообщений при различных сбоях (производителя, потребителя, системы обмена сообщениями, сети);
-
дублирование сообщений.
Компоненты Kafka, принимающие участие в последовательности записи и чтения сообщений, могут быть настроены так, чтобы обеспечить следующие семантики доставки:
-
At most once — сообщения будут обработаны один раз или не будут обработаны вообще (потеряны).
-
At least once — сообщения будут обработаны хотя бы один раз.
-
Exactly once — каждое сообщение будет обрабатано один и только один раз.
При использовании разных семантик доставки производительность системы может быть изменена. Более сильная гарантия может снизить производительность, а более слабая гарантия повысит производительность.
At most once
Cемантика доставки at most once обычно применяется в системах, где предпочтительнее потерять, а не доставить сообщение дважды. Обладает наименьшими "накладными расходами" (дополнительный объем памяти, увеличение скорости обработки, задержки в работе).
Принцип работы семантики доставки at most once показан на рисунке ниже.
-
Допустим, что производитель
Producer
записывает сообщения пакетами (batch) в топик Kafka (размер пакета устанавливается параметром batch.size), не ожидая подтверждения записи (параметр acks =0
). Все сообщения считаются отправленными. -
Потребитель
Consumer 1
читает сообщения также пакетами. -
На стороне потребителя
Consumer 1
автоматически (параметр enable.auto.commit =true
) выполняется коммит смещений (commit offset) всех сообщений, которые были прочитаны за определенный период времени (определяется параметром auto.commit.interval.ms). -
Если после чтения пакета сообщений происходит отказ потребителя, дальнейшее чтение выполняет новый потребитель
Consumer 2
. -
Новому потребителю назначается новый пакет сообщений, начиная с последнего зафиксированного смещения.
-
Потребитель
Consumer 2
делает коммит смещений и обрабатывает полученные сообщения.
В данной схеме потеря сообщений может произойти:
-
на стороне производителя, если отправка сообщений осуществлена с ошибкой;
-
на стороне брокера Kafka при отказе брокера;
-
на стороне потребителя, если коммит смещений сообщений выполнен до отказа потребителя и прочитанные сообщения обработаны не будут.
At least once
Cемантика доставки at least once обычно применяется в системах, где недопустимо терять даже одно сообщение. В случае сбоя некоторые сообщения могут быть обработаны дополнительно. Обладает вероятными "накладными расходами" в части работы производителя и брокера (дополнительный объем памяти, увеличение скорости обработки, задержки в работе).
Как правило, при использовании данной семантики автоматический коммит смещений отключают (параметр enable.auto.commit = false
). При этом используется ручной коммит смещений после обработки соообщений, например, при помощи методов Consumer.commitSync
(приложение блокируется до тех пор, пока брокер не ответит к запросу на фиксацию) или Сonsumer.commitAsync
(приложение не блокируется), задаваемых в API класса KafkaConsumer.
Принцип работы семантики доставки at least once показан на рисунке ниже.
Описание семантики и возможной проблемы на стороне производителя или брокера:
-
Допустим, производитель
Producer
записывает сообщениеC
со смещением2
в топик Kafka и не получает подтверждение записи от лидера партиции (acks =1
). -
Производитель
Producer
осуществляет повторные попытки записи сообщения, пока не истекут значения параметров retries и delivery.timeout.ms. -
При успешной записи производитель получает подтверждение от брокера.
В данной схеме дублирование сообщений C
может произойти, если после записи сообщения в топик случился отказ брокера или производителя. Результатом этого становятся два одинаковых сообщения, записанных под разными смещениями. Если производитель записывает сообщения пакетами и повторил попытку записи, дублироваться могут несколько сообщений за одну итерацию записи.
Описание семантики и возможной проблемы на стороне потребителя:
-
Потребитель
Consumer 1
читает записанные сообщенияA
иB
из Kafka. -
Представим, что случился отказ потребителя
Consumer 1
после обработкиA
и до обработкиВ
, и они находятся в одном пакете. Тогда в зависимости от логики приложения потребителя, коммит смещенияA
может быть не выполнен. -
Так как коммит смещений
A
иB
не выполнен, чтение этих же сообщений выполняет новый потребительConsumer 2
. -
При успешной обработке сообщений
Consumer 2
выполняет коммит смещенийA
иB
.
В данной схеме дублирование обработки сообщений A
может произойти, если после обработки сообщения случился отказ потребителя и не удалось выполнить коммит смещения.
Exactly once
Семантика exactly once предполагает отсутствие случаев дублирования сообщений, встречающихся в семантике доставки at least once.
Для семантики exactly once обычно одновременно применяются следующие механизмы:
Ниже описаны механизмы и их применение в семантике exactly once.
Идемпотентный производитель
Идемпотентный производитель — производитель, каждому из которых назначается идентификатор производителя (PID). PID включается каждый раз, когда данный производитель отправляет сообщения брокеру Kafka.
Каждое сообщение от идемпотентного производителя получает последовательно увеличивающийся порядковый номер (sequence), отличный от смещения.
Для каждой партиции топика, которой производитель отправляет сообщения, поддерживается отдельная последовательность. Брокер для каждой партиции отслеживает наибольшую записанную комбинацию PID+порядковый номер. Если получено сообщение с меньшим порядковым номером, чем предыдущее записанное, оно игнорируется.
Таким образом, сообщение будет записано только один раз, независимо от того, сколько попыток отправки было предпринято.
Идемпотентный производитель включается при помощи присвоения параметру enable.idempotence значения true
.
Ниже описаны значения параметров, которые должны быть установлены, чтобы функция идемпотентного производителя работала без ошибок:
-
max.in.flight.requests.per.connection — не более
5
; -
retries >
0
; -
acks =
all
.
ПРИМЕЧАНИЕ
|
Транзакции
Транзакция — атомарная запись в один или несколько топиков и партиций Kafka. Все сообщения, включенные в транзакцию, будут успешно записаны, либо ни одно из них не будет записано. Если транзакция прерывается, все сообщения транзакции недоступны для чтения потребителям.
Работа транзакций в Kafka основана на следующих компонентах:
-
Идентификатор транзакции (transactional.id) — индивидуальный идентификатор, присваиваемый производителю транзакций.
-
Координатор транзакций (transaction coordinator) — модуль, работающий внутри каждого брокера Kafka. Только один координатор владеет зарегистрированным
transactional.id
. Для надежной работы координатора используется протокол репликации Kafka и процесс выбора лидера. Координатор транзакций — единственный компонент, который может читать и записывать журнал транзакций. -
Журнал транзакций (
__transaction_state
) — внутренний топик Kafka, который хранит последнее состояние транзакции. Возможные состояния транзакций:-
ongoing
; -
prepare_commit
; -
committed
.Каждый координатор транзакций управляет некоторыми партициями в журнале транзакций, является лидером для них.
-
-
Эпоха (epoch) — часть метаданных, хранящая количество подключений каждого идентификатора транзакции.
-
Зомби-экземпляр — экземпляр производителя, который начал запись сообщений и потерпел неудачу, не завершив запись.
Настройка и работа в рамках транзакции происходит при помощи API класса KafkaProducer и других API, связанных с работой транзакций.
Семантика exactly once
Общий принцип семантики exactly once в Kafka показан на рисунке ниже.
Последовательность записи транзакций
-
Поиск координатора транзакций при помощи запроса FindCoordinatorRequest. Производитель подключается к любому известному брокеру кластера, чтобы узнать местоположение его координатора.
-
Регистрация производителя у координатора транзакций при помощи синхронного запроса InitPidRequest. При этом производитель указывает свой
transactional.id
.Данный запрос выполняет:
-
Возвращение одного и того же PID для одного
transactional.id
будущим экземплярам производителя. -
Увеличение и возвращение эпохи PID, при этом любой предыдущий "зомби-экземпляр" производителя блокируется и не может продолжить свою транзакцию.
-
Удаление любой транзакции, оставшейся не завершенной предыдущим экземпляром производителя.
-
Создание записи (
init
) в журнале__transaction_state
для заданногоtransactional.id
, если это первая транзакция для производителя.
-
-
Запуск транзакции при помощи вызова метода
producer.beginTransaction
. После запуска транзакции все записанные сообщения будут частью транзакции. В журнале__transaction_state
для этогоtransactional.id
меняется состояние транзакции наongoing
. -
Цикл чтение → преобразование → запись сообщений:
-
Регистрация новой партиции в рамках транзакции при помощи запроса AddPartitionsToTxnRequest.
-
Запись сообщений (включая PID, эпоху и порядковый номер) в партицию пользователя при помощи метода
producer.send
. -
Отправка смещений прочитанных сообщений в рамках транзакции с идентификатором
groupId
координатору транзакции при помощи методаproducer.sendOffsetsToTransaction
для регистрации добавления этой партиции в журнал транзакций. -
Регистрация смещений в топике
__consumer_offsets
при помощи запроса TxnOffsetCommitRequest производителя координатору потребителей (включая PID и эпоху). Запись смещений в топике__consumer_offsets
тоже становится частью данной транзакции. Координатор потребителей проверяет, что производитель не является "зомби-экземпляром". Записанные смещения не видны потребителям до подтверждения транзакции.
-
-
Подтверждение транзакции при помощи метода
commitTransaction
, который выполняет:-
Запись сообщения
prepare_commit
в журнал транзакций. -
Запись маркеров
COMMIT
в топики пользователя, в том числе и в топик__consumer_offsets
. -
Запись сообщения
committed
в журнал транзакций. -
Открытие доступа для потребителей к сообщениям, записанным в топик.
-
Таким образом, смещения всех прочитанных за одну транзакцию сообщений были записаны только после их обработки.
Чтение транзакционных сообщений
Чтение транзакционных сообщений потребителем регулируется параметром isolation.level:
-
Если параметру
isolation.level
присвоено значениеread_committed
, потребитель буферизует сообщения, имеющие PID, до тех пор, пока не прочитает сообщение с маркеромCOMMIT
. После этого сообщения будут доставлены. Cообщения считываются только до последнего стабильного смещения, которое меньше смещения первой открытой транзакции. Потребителиread_committed
не смогут читать топики полностью, пока есть незавершенные транзакции. -
Если параметру
isolation.level
присвоено значениеread_uncommitted
, все сообщения видны потребителям, даже если они были частью прерванной транзакции.
Вероятные "накладные расходы" семантики exactly once:
-
в части работы производителя и брокера — чем больше сообщений в транзакции, тем меньше потери ресурсов (дополнительный объем памяти, увеличение скорости обработки, задержки в работе);
-
в части потребителя — чтение большой транзакции вызывают блокировку читаемой партиции для потребителей с настройкой
read_committed
и потерю ресурсов (объем памяти на буферизацию, увеличение скорости обработки, задержки в работе).