Message format in Kafka
This article describes the message format in Kafka.
Message, record is a fixation in the system of an event that has occurred, including the state of an object, the value of a physical quantity or any other parameter that requires tracking, storage, or transmission to another system.
In the process of configuring components that take part in recording and reading messages, it is important to understand the message format and configure the parameters associated with it.
Record messages
Messages in Kafka are stored (buffered) in a record batch and then sent to the Kafka broker based on producer configurations. A batch can contain one or several messages. Messages from one batch are written to the same partition.
The batch message processing is regulated via the following parameters:
-
max.in.flight.requests.per.connection — the maximum number of unconfirmed requests (number of batches) per entry that can be sent simultaneously (default value is
5
); -
linger.ms — the artificial delay time for sending batchs to combine records into a batch (default value is
0
); -
batch.size — the upper limit of the batch size in bytes (default value is
16384
).
The principle of batch message processing in Kafka is shown in the figure below.
Message format
The core of a Kafka message is a key/value pair. Both the key and value are a variable length byte array. The value can be array of any data, including a certain number of key/value pairs. The key may be null
or not specified at all.
Both the message key and value can be serialized in one of the following formats: json, avro, protobuf. Serialization occurs when the producer instance is started.
When creating a message, specify the following parameters along with the key/value pair:
-
name of the topic where the message should be written;
-
number of the partition in which the message should be written (may not be specified);
-
timestamp specified by the user (may not be specified).
Based on the specified key and partition number when creating a message, one of the strategies for writing a message to a specific partition of the topic is determined:
-
writing a message to the specified partition if this partition exists;
-
recording messages with the same key into one partition, which is calculated using the hash function of the key (if the partition number is not specified and the key is present);
-
writing a message to a partition selected in a cyclic method (round-robin) (neither partition nor key is specified).
If the user did not set a timestamp when creating a message, the message will be marked with the current time.
Using the message header, it is possible to add metadata for each message. The header is a key/value pair, where the key is always an encoded string, and the value can be array of any data, including a certain number of key/value pairs.
Creation of a record indicating the necessary Kafka parameters is performed via the ProducerRecord class.
Below is the disk format of the record and record header, indicating which data types are used for all values.
length: varint attributes: int8 bit 0~7: unused timestampDelta: varlong offsetDelta: varint keyLength: varint key: byte[] valueLen: varint value: byte[] Headers => [Header]
headerKeyLength: varint headerKey: String headerValueLength: varint Value: byte[]
Record batch format
After the message is created, a KafkaProducer instance is launched, which performs:
-
Serialization of records in accordance with the specified key.serializer and value.serializer serialization parameters .
-
Partitioning records according to the selected strategy.
-
Buffering of records and batch sorting.
-
Performing the necessary compression of messages within the batch in accordance with the specified compression.type parameter.
A record batch contains one to several records and a header — the metadata section of the record batch. The header always contains 61 bytes. The header contains information about the type of compression used, the batch creation timestamp, the number of records, and other metadata.
Below is the disk format of the record batch header, indicating the data types used for all values.
baseOffset: int64 batchLength: int32 partitionLeaderEpoch: int32 magic: int8 (current magic value is 2) crc: uint32 attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction) bit 7~15: unused lastOffsetDelta: int32 baseTimestamp: int64 maxTimestamp: int64 producerId: int64 producerEpoch: int16 baseSequence: int32 records: [Record]
Store a record on a broker
After writing to the Kafka broker, the message is saved with all the metadata that was written during the message writing process, while:
-
The message on the broker has timestamp based on the log.message.timestamp.type broker parameter or message.timestamp.type topic parameter.
-
Compression of recorded messages on the broker is performed based on the configured compression.type broker parameter or compression.type topic parameter. The message is stored in compressed form in the broker and decompressed only at the consumer.
-
The message on the broker is saved in serialized form based on the serialization parameters set by the producer.
The RecordMetadata class returns the metadata of a record successfully written to the broker.