Гарантии доставки сообщений в Kafka

В данной статье описаны гарантии доставки сообщений при использовании семантик доставки, существующих в Kafka.

Основные проблемы доставки сообщений, возникающие при работе с Kafka:

  • потеря сообщений при различных сбоях (производителя, потребителя, системы обмена сообщениями, сети);

  • дублирование сообщений.

Компоненты Kafka, принимающие участие в последовательности записи и чтения сообщений, могут быть настроены так, чтобы обеспечить следующие семантики доставки:

  • At most once — сообщения будут обработаны один раз или не будут обработаны вообще (потеряны).

  • At least once — сообщения будут обработаны хотя бы один раз.

  • Exactly once — каждое сообщение будет обрабатано один и только один раз.

При использовании разных семантик доставки производительность системы может быть изменена. Более сильная гарантия может снизить производительность, а более слабая гарантия повысит производительность.

At most once

Cемантика доставки at most once обычно применяется в системах, где предпочтительнее потерять, а не доставить сообщение дважды. Обладает наименьшими "накладными расходами" (дополнительный объем памяти, увеличение скорости обработки, задержки в работе).

Принцип работы семантики доставки at most once показан на рисунке ниже.

At most once
At most once
At most once
At most once
  1. Допустим, что производитель Producer записывает сообщения пакетами (batch) в топик Kafka (размер пакета устанавливается параметром batch.size), не ожидая подтверждения записи (параметр acks = 0). Все сообщения считаются отправленными.

  2. Потребитель Consumer 1 читает сообщения также пакетами.

  3. На стороне потребителя Consumer 1 автоматически (параметр enable.auto.commit = true) выполняется коммит смещений (commit offset) всех сообщений, которые были прочитаны за определенный период времени (определяется параметром auto.commit.interval.ms).

  4. Если после чтения пакета сообщений происходит отказ потребителя, дальнейшее чтение выполняет новый потребитель Consumer 2.

  5. Новому потребителю назначается новый пакет сообщений, начиная с последнего зафиксированного смещения.

  6. Потребитель Consumer 2 делает коммит смещений и обрабатывает полученные сообщения.

В данной схеме потеря сообщений может произойти:

  • на стороне производителя, если отправка сообщений осуществлена с ошибкой;

  • на стороне брокера Kafka при отказе брокера;

  • на стороне потребителя, если коммит смещений сообщений выполнен до отказа потребителя и прочитанные сообщения обработаны не будут.

At least once

Cемантика доставки at least once обычно применяется в системах, где недопустимо терять даже одно сообщение. В случае сбоя некоторые сообщения могут быть обработаны дополнительно. Обладает вероятными "накладными расходами" в части работы производителя и брокера (дополнительный объем памяти, увеличение скорости обработки, задержки в работе).

Как правило, при использовании данной семантики автоматический коммит смещений отключают (параметр enable.auto.commit = false). При этом используется ручной коммит смещений после обработки соообщений, например, при помощи методов Consumer.commitSync (приложение блокируется до тех пор, пока брокер не ответит к запросу на фиксацию) или Сonsumer.commitAsync (приложение не блокируется), задаваемых в API класса KafkaConsumer.

Принцип работы семантики доставки at least once показан на рисунке ниже.

At least once
At least once
At least once
At least once

Описание семантики и возможной проблемы на стороне производителя или брокера:

  1. Допустим, производитель Producer записывает сообщение C со смещением 2 в топик Kafka и не получает подтверждение записи от лидера партиции (acks = 1).

  2. Производитель Producer осуществляет повторные попытки записи сообщения, пока не истекут значения параметров retries и delivery.timeout.ms.

  3. При успешной записи производитель получает подтверждение от брокера.

В данной схеме дублирование сообщений C может произойти, если после записи сообщения в топик случился отказ брокера или производителя. Результатом этого становятся два одинаковых сообщения, записанных под разными смещениями. Если производитель записывает сообщения пакетами и повторил попытку записи, дублироваться могут несколько сообщений за одну итерацию записи.

Описание семантики и возможной проблемы на стороне потребителя:

  1. Потребитель Consumer 1 читает записанные сообщения A и B из Kafka.

  2. Представим, что случился отказ потребителя Consumer 1 после обработки A и до обработки В, и они находятся в одном пакете. Тогда в зависимости от логики приложения потребителя, коммит смещения A может быть не выполнен.

  3. Так как коммит смещений A и B не выполнен, чтение этих же сообщений выполняет новый потребитель Consumer 2.

  4. При успешной обработке сообщений Consumer 2 выполняет коммит смещений A и B.

В данной схеме дублирование обработки сообщений A может произойти, если после обработки сообщения случился отказ потребителя и не удалось выполнить коммит смещения.

Exactly once

Семантика exactly once предполагает отсутствие случаев дублирования сообщений, встречающихся в семантике доставки at least once.

Для семантики exactly once обычно одновременно применяются следующие механизмы:

Ниже описаны механизмы и их применение в семантике exactly once.

Идемпотентный производитель

Идемпотентный производитель — производитель, каждому из которых назначается идентификатор производителя (PID). PID включается каждый раз, когда данный производитель отправляет сообщения брокеру Kafka.

Каждое сообщение от идемпотентного производителя получает последовательно увеличивающийся порядковый номер (sequence), отличный от смещения.

Для каждой партиции топика, которой производитель отправляет сообщения, поддерживается отдельная последовательность. Брокер для каждой партиции отслеживает наибольшую записанную комбинацию PID+порядковый номер. Если получено сообщение с меньшим порядковым номером, чем предыдущее записанное, оно игнорируется.

Таким образом, сообщение будет записано только один раз, независимо от того, сколько попыток отправки было предпринято.

Идемпотентный производитель включается при помощи присвоения параметру enable.idempotence значения true.

Ниже описаны значения параметров, которые должны быть установлены, чтобы функция идемпотентного производителя работала без ошибок:

ПРИМЕЧАНИЕ
  • Идемпотентный производитель может быть использован в качестве компонента гибридного варианта семантики at least once.

  • Функция идемпотентного производителя может быть сохранена, только если повторная запись сообщения была выполнена при помощи внутренних настроек производителя, как описано в семантике at least once, а не внешнего приложения.

Транзакции

Транзакция — атомарная запись в один или несколько топиков и партиций Kafka. Все сообщения, включенные в транзакцию, будут успешно записаны, либо ни одно из них не будет записано. Если транзакция прерывается, все сообщения транзакции недоступны для чтения потребителям.

Работа транзакций в Kafka основана на следующих компонентах:

  • Идентификатор транзакции (transactional.id) — индивидуальный идентификатор, присваиваемый производителю транзакций.

  • Координатор транзакций (transaction coordinator) — модуль, работающий внутри каждого брокера Kafka. Только один координатор владеет зарегистрированным transactional.id. Для надежной работы координатора используется протокол репликации Kafka и процесс выбора лидера. Координатор транзакций — единственный компонент, который может читать и записывать журнал транзакций.

  • Журнал транзакций (__transaction_state) — внутренний топик Kafka, который хранит последнее состояние транзакции. Возможные состояния транзакций:

    • ongoing;

    • prepare_commit;

    • committed.

      Каждый координатор транзакций управляет некоторыми партициями в журнале транзакций, является лидером для них.

  • Эпоха (epoch) — часть метаданных, хранящая количество подключений каждого идентификатора транзакции.

  • Зомби-экземпляр — экземпляр производителя, который начал запись сообщений и потерпел неудачу, не завершив запись.

Настройка и работа в рамках транзакции происходит при помощи API класса KafkaProducer и других API, связанных с работой транзакций.

Семантика exactly once

Общий принцип семантики exactly once в Kafka показан на рисунке ниже.

Exactly once
Exactly once
Exactly once
Exactly once

Последовательность записи транзакций

  1. Поиск координатора транзакций при помощи запроса FindCoordinatorRequest. Производитель подключается к любому известному брокеру кластера, чтобы узнать местоположение его координатора.

  2. Регистрация производителя у координатора транзакций при помощи синхронного запроса InitPidRequest. При этом производитель указывает свой transactional.id.

    Данный запрос выполняет:

    • Возвращение одного и того же PID для одного transactional.id будущим экземплярам производителя.

    • Увеличение и возвращение эпохи PID, при этом любой предыдущий "зомби-экземпляр" производителя блокируется и не может продолжить свою транзакцию.

    • Удаление любой транзакции, оставшейся не завершенной предыдущим экземпляром производителя.

    • Создание записи (init) в журнале __transaction_state для заданного transactional.id, если это первая транзакция для производителя.

  3. Запуск транзакции при помощи вызова метода producer.beginTransaction. После запуска транзакции все записанные сообщения будут частью транзакции. В журнале __transaction_state для этого transactional.id меняется состояние транзакции на ongoing.

  4. Цикл чтение → преобразование → запись сообщений:

    • Регистрация новой партиции в рамках транзакции при помощи запроса AddPartitionsToTxnRequest.

    • Запись сообщений (включая PID, эпоху и порядковый номер) в партицию пользователя при помощи метода producer.send.

    • Отправка смещений прочитанных сообщений в рамках транзакции с идентификатором groupId координатору транзакции при помощи метода producer.sendOffsetsToTransaction для регистрации добавления этой партиции в журнал транзакций.

    • Регистрация смещений в топике __consumer_offsets при помощи запроса TxnOffsetCommitRequest производителя координатору потребителей (включая PID и эпоху). Запись смещений в топике __consumer_offsets тоже становится частью данной транзакции. Координатор потребителей проверяет, что производитель не является "зомби-экземпляром". Записанные смещения не видны потребителям до подтверждения транзакции.

  5. Подтверждение транзакции при помощи метода commitTransaction, который выполняет:

    • Запись сообщения prepare_commit в журнал транзакций.

    • Запись маркеров COMMIT в топики пользователя, в том числе и в топик __consumer_offsets.

    • Запись сообщения committed в журнал транзакций.

    • Открытие доступа для потребителей к сообщениям, записанным в топик.

Таким образом, смещения всех прочитанных за одну транзакцию сообщений были записаны только после их обработки.

Чтение транзакционных сообщений

Чтение транзакционных сообщений потребителем регулируется параметром isolation.level:

  • Если параметру isolation.level присвоено значение read_committed, потребитель буферизует сообщения, имеющие PID, до тех пор, пока не прочитает сообщение с маркером COMMIT. После этого сообщения будут доставлены. Cообщения считываются только до последнего стабильного смещения, которое меньше смещения первой открытой транзакции. Потребители read_committed не смогут читать топики полностью, пока есть незавершенные транзакции.

  • Если параметру isolation.level присвоено значение read_uncommitted, все сообщения видны потребителям, даже если они были частью прерванной транзакции.

Гарантии семантики exactly once могут быть обеспечены, если приложение, в составе которого потребитель читает транзакционные сообщения из топика Kafka, после обработки передает эти сообщения в другой топик Kafka (например, приложения Kafka Streams). Это связано с тем, что смещение потребителя сохраняется в виде сообщения в топике, поэтому смещения при чтении сообщений могут быть записаны в той же транзакции, что и сообщения топиков пользователя. В случае прерывания транзакции последнее смещение возвращается к старому значению, а созданные сообщения в топиках пользователя не будут видны потребителям.

Вероятные "накладные расходы" семантики exactly once:

  • в части работы производителя и брокера — чем больше сообщений в транзакции, тем меньше потери ресурсов (дополнительный объем памяти, увеличение скорости обработки, задержки в работе);

  • в части потребителя — чтение большой транзакции вызывают блокировку читаемой партиции для потребителей с настройкой read_committed и потерю ресурсов (объем памяти на буферизацию, увеличение скорости обработки, задержки в работе).

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней