Формат сообщений в Kafka

В данной статье описан формат сообщений в Kafka.

Сообщение (message), запись (record) — фиксация в системе произошедшего события, в том числе состояния обьекта, значения физической величины или любого другого параметра, который требует отслеживания, хранения или передачи в другую систему.

В процессе настройки компонентов, принимающих участие в записи и чтении сообщений, имеет значение понимание формата сообщений и настройка связанных с ним параметров.

Запись сообщений

Сообщения в Kafka сохраняются (буферизуются) в пакеты записей (record batch), а затем с учетом конфигураций производителя отправляются в брокер Kafka. Пакет может содержать от одного до нескольких сообщений. При этом сообщения одного пакета записываются в одну и ту же партицию.

Регулирование пакетной передачи сообщений выполняется при помощи параметров:

  • max.in.flight.requests.per.connection — максимальное количество неподтвержденных запросов (количество пакетов) на запись, которые могут быть отправлены одновременно (по умолчанию значение 5);

  • linger.ms — настройка времени искусственной задержки отправления пакетов для обьединения записей в пакет (по умолчанию значение 0);

  • batch.size — настройка верхней границы размера пакета в байтах (по умолчанию значение 16384).

Принцип пакетной обработки сообщений в Kafka показан на рисунке ниже.

Запись сообщений в Kafka
Запись сообщений в Kafka
Запись сообщений в Kafka
Запись сообщений в Kafka

Формат сообщения

Основой сообщения Kafka является пара ключ/значение (key/value). И ключ, и значение представляют собой массив байтов переменной длины. Значение может представлять собой любой массив данных, в том числе и определенное количество пар ключ/значение. Ключ может быть равен null или не указываться вообще.

И ключ, и значение сообщения могут быть сериализованы в одном из форматов: json, avro, protobuf. Сериализация выполняется при запуске экземпляра производителя.

При создании сообщения вместе с парой ключ/значение указываются следующие параметры:

  • наименование топика (topic), куда должно быть записано сообщение;

  • номер партиции (partition), в которую должно быть записано сообщение (может быть не указан);

  • временная метка (timestamp), указанная пользователем (может отсутствовать).

 

Исходя из указанных ключа и номера партиции, при создании сообщения определяется одна из стратегий записи сообщения в определенную партицию топика:

  • запись сообщения в указанную партицию, если данная партиция существует;

  • запись сообщений с одинаковым ключом в одну партицию, которая вычисляется при помощи хеш-функции ключа (если не указан номер партиции и присутствует ключ);

  • запись сообщения в партицию, выбранную циклическим способом (round-robin) (ни партиция, ни ключ не указаны).

Если при создании сообщения пользователь не установил временную метку, сообщение будет отмечено текущим временем.

При помощи заголовка записи (header) существует возможность добавить метаданные для каждого сообщения. Заголовок представляет собой пару ключ/значение, где ключ — всегда закодированная строка, а значение может быть любым массивом данных, в том числе и определенное количество пар ключ/значение.

Создание записи с указанием необходимых параметров Kafka выполняется c использованием класса ProducerRecord.

Ниже приведен дисковый формат записи и заголовка записи с указанием используемых типов данных для всех значений.

Дисковый формат записи
length: varint
attributes: int8
    bit 0~7: unused
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
Дисковый формат заголовка
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

Формат пакета записей

После создания записи запускается экземпляр KafkaProducer, который выполняет:

  1. Сериализацию записей в соответствии с установленными параметрами сериализации key.serializer и value.serializer.

  2. Партиционирование записей в соответствии с выбранной стратегией.

  3. Буферизацию записей и сортировку по пакетам.

  4. Выполнение необходимого сжатия сообщений внутри пакета в соответствии с установленными параметрами сжатия compression.type.

Пакет записей содержит от одной до нескольких записей и заголовок (header) — раздел метаданных пакета записей. Заголовок всегда содержит 61 байт. Заголовок содержит данные о типе используемого сжатия, метку создания пакета, количество записей и другие метаданные.

Ниже приведен дисковый формат заголовка пакета записей с указанием используемых типов данных для всех значений.

Дисковый формат заголовка пакета записей
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
        4: zstd
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
    bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]

Хранение записи на брокере

После записи на брокер Kafka сообщение сохраняется со всеми метаданными, которые были записаны в процессе записи сообщения, при этом:

  • Cообщение на брокере имеет временную метку исходя из настроенного параметра брокера log.message.timestamp.type или параметра топика message.timestamp.type.

  • Cжатие записанных сообщений на брокере выполняется исходя из настроенного параметра брокера compression.type или параметра топика compression.type. Cообщение в сжатом виде сохраняется в брокере и распаковывается только у потребителя.

  • Cообщение на брокере сохраняется в сериализованном виде исходя из параметров сериализации установленных производителем.

Класс RecordMetadata возвращает метаданные записи, успешно записанной на брокере.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней