Message format in Kafka

This article describes the message format in Kafka.

Message, record is a fixation in the system of an event that has occurred, including the state of an object, the value of a physical quantity or any other parameter that requires tracking, storage, or transmission to another system.

In the process of configuring components that take part in recording and reading messages, it is important to understand the message format and configure the parameters associated with it.

Record messages

Messages in Kafka are stored (buffered) in a record batch and then sent to the Kafka broker based on producer configurations. A batch can contain one or several messages. Messages from one batch are written to the same partition.

The batch message processing is regulated via the following parameters:

  • max.in.flight.requests.per.connection — the maximum number of unconfirmed requests (number of batches) per entry that can be sent simultaneously (default value is 5);

  • linger.ms — the artificial delay time for sending batchs to combine records into a batch (default value is 0);

  • batch.size — the upper limit of the batch size in bytes (default value is 16384).

The principle of batch message processing in Kafka is shown in the figure below.

Recording messages in Kafka
Recording messages in Kafka
Recording messages in Kafka
.Recording messages in Kafka

Message format

The core of a Kafka message is a key/value pair. Both the key and value are a variable length byte array. The value can be array of any data, including a certain number of key/value pairs. The key may be null or not specified at all.

Both the message key and value can be serialized in one of the following formats: json, avro, protobuf. Serialization occurs when the producer instance is started.

When creating a message, specify the following parameters along with the key/value pair:

  • name of the topic where the message should be written;

  • number of the partition in which the message should be written (may not be specified);

  • timestamp specified by the user (may not be specified).

 

Based on the specified key and partition number when creating a message, one of the strategies for writing a message to a specific partition of the topic is determined:

  • writing a message to the specified partition if this partition exists;

  • recording messages with the same key into one partition, which is calculated using the hash function of the key (if the partition number is not specified and the key is present);

  • writing a message to a partition selected in a cyclic method (round-robin) (neither partition nor key is specified).

If the user did not set a timestamp when creating a message, the message will be marked with the current time.

Using the message header, it is possible to add metadata for each message. The header is a key/value pair, where the key is always an encoded string, and the value can be array of any data, including a certain number of key/value pairs.

Creation of a record indicating the necessary Kafka parameters is performed via the ProducerRecord class.

Below is the disk format of the record and record header, indicating which data types are used for all values.

Disc recording format
length: varint
attributes: int8
    bit 0~7: unused
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
Disc header format
headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

Record batch format

After the message is created, a KafkaProducer instance is launched, which performs:

  1. Serialization of records in accordance with the specified key.serializer and value.serializer serialization parameters .

  2. Partitioning records according to the selected strategy.

  3. Buffering of records and batch sorting.

  4. Performing the necessary compression of messages within the batch in accordance with the specified compression.type parameter.

A record batch contains one to several records and a header — the metadata section of the record batch. The header always contains 61 bytes. The header contains information about the type of compression used, the batch creation timestamp, the number of records, and other metadata.

Below is the disk format of the record batch header, indicating the data types used for all values.

Disk record batch header format
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
        4: zstd
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
    bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]

Store a record on a broker

After writing to the Kafka broker, the message is saved with all the metadata that was written during the message writing process, while:

The RecordMetadata class returns the metadata of a record successfully written to the broker.

Found a mistake? Seleсt text and press Ctrl+Enter to report it