Message delivery guarantees in Kafka
This article describes message delivery guarantees when using the delivery semantics that exist in Kafka.
The main message delivery problems that arise when working with Kafka:
-
loss of messages due to various failures (producer, consumer, messaging system, network);
-
duplication of messages.
Kafka components that participate in message writing and sequence reading can be configured to provide the following delivery semantics:
-
At most once — messages will be processed once or not processed at all (lost).
-
At least once — messages will be processed at least once.
-
Exactly once — every message will be processed once and only once.
When using different delivery semantics, system performance may be affected. A stronger guarantee may reduce performance, while a weaker guarantee will increase performance.
At most once
At most once delivery semantics is usually used in systems where it is preferable to lose rather than deliver a message twice. Has the lowest "overhead" (additional memory, increased processing speed, delays in operation).
The operating principle of the at most once delivery semantics is shown in the figure below.
-
Let us assume that
Producer
writes messages in batches to a Kafka topic (the batch size is set by the batch.size parameter), without waiting for write confirmation (acks =0
). All messages are considered sent. -
Consumer 1
also reads messages in batches. -
On the consumer side,
Consumer 1
automatically (enable.auto.commit =true
) commits the offsets of all messages, that were read over a certain period of time (defined by the auto.commit.interval.ms parameter). -
If
Consumer 1
fails after reading a message batch, further reading is performed by a newConsumer 2
. -
The new consumer is assigned a new messages batch starting from the last recorded offset.
-
Consumer 2
commits the offsets and processes the received messages.
In this scheme, message loss can occur:
-
on the producer end — if messages were sent with an error;
-
on the Kafka broker end — if the broker fails;
-
on the consumer end — if the message offsets are committed before the consumer fails and the read messages will not be processed.
At least once
At least once delivery semantics are used in systems where it is unacceptable to lose even one message. In case of failure, some messages may be processed additionally. It has possible "overhead" in the work of the producer and broker (additional memory, increased processing speed, delays in work).
As a rule, when using this semantics, automatic commit of offsets is disabled (enable.auto.commit = false
). In this case, manual commit of offsets is used after processing messages, for example, using the Consumer.commitSync
(the application is blocked until the broker responds to the commit request) or Consumer.commitAsync
(the application is not blocked) methods specified in KafkaConsumer class API.
The operating principle of the at least once delivery semantics is shown in the figure below.
Description of semantics and possible problems on the end of the producer or broker:
-
Let us assume that
Producer
writes a messageC
with offset2
to the Kafka topic and does not receive confirmation of the write from the partition leader (acks =1
) . -
Producer
retries sending the message until the values of retries and delivery.timeout.ms parameters expire. -
If the write is successful,
Producer
receives confirmation from the broker.
In this scheme, duplication of C
messages can occur if a broker or producer fail after writing a message to a topic. This results in two identical messages written at different offsets. If the producer writes messages in batches and tries to write again, multiple messages may be duplicated during a single write iteration.
Description of semantics and possible problems on the consumer end:
-
Consumer 1
reads recorded messagesA
andB
from Kafka. -
Let us assume that
Consumer 1
fails after processingA
and before processingB
and they are in the same batches. Then, depending on the logic of the consumer application, the commit of offsetA
may not be executed. -
Since offsets
A
andB
are not committed, the new consumerConsumer 2
reads the same messages. -
Upon successful message processing,
Consumer 2
commits offsetsA
andB
.
In this scheme, duplicate processing of A
messages can occur if a consumer fails and the offset commit fails, after processing the message.
Exactly once
Exactly once semantics assumes that there are no cases of message duplication found in at least once delivery semantics.
For exactly once semantics, the following mechanisms are usually applied simultaneously:
The mechanisms and their application in exactly once semantics are described below.
Idempotent producer
Idempotent producer is a producer that is assigned a producer identifier (PID), and the PID is included whenever that producer sends messages to the Kafka broker.
Every message from an idempotent producer receives an increasing sequence number, distinct from the offset. For each topic partition to which the producer sends messages, a separate sequence is maintained. The broker for each partition keeps track of the largest PID+sequence number combination recorded. If a message is received with a lower sequence number than the previous one recorded, it is ignored.
This way the message will only be recorded once, no matter how many attempts are made to send it.
To enable the idempotent producer feature, add the enable.idempotence property with the value true
.
The following describes the parameter values that must be set for the idempotent producer function to work without errors:
-
max.in.flight.requests.per.connection — no more than
5
; -
retries >
0
; -
acks =
all
.
NOTE
|
Transactions
Transaction — atomic recording to one or more Kafka topics and partitions. All messages included in the transaction will be successfully written, or none will be written. If a transaction is aborted, none of the transaction messages are readable by consumers.
Transactions in Kafka are based on the following components:
-
transactional.id — individual identifier assigned to the transaction producer.
-
Transaction coordinator — module running inside each Kafka broker. Only one coordinator owns the registered
transactional.id
. For reliable operation of the coordinator, the Kafka replication protocol and the leader election process are used. The transaction coordinator is the only component that can read and write the transaction log. -
Transaction log (
__transaction_state
) — internal Kafka topic that stores the latest transaction state. Possible transaction states:-
ongoing
; -
prepare_commit
; -
committed
.Each transaction coordinator manages some partitions in the transaction log and is a leader for them.
-
-
Epoch — piece of metadata that stores the number of connections of each
transactional.id
. -
Zombie instance — producer instance that started recording messages and failed without completing the recording.
Configuring and working within the transaction occurs using the API class KafkaProducer and other APIs related to the operation of transactions.
Exactly once semantics
The main operating principle of exactly once semantics in Kafka is shown in the figure below.
Transaction recording sequence
-
The transaction coordinator is searched using the FindCoordinatorRequest request. The producer connects to any known broker in the cluster to find out the location of its coordinator.
-
Registration of the producer with the transaction coordinator is performed using a synchronous InitPidRequest request. In this case, the producer indicates its
transactional.id
.This request performs the following functions:
-
Return the same PID for one
transactional.id
to future instances of the producer. -
Increment and return the PID epoch so that any previous producer "zombie instance" is blocked and cannot continue its transaction.
-
Remove any transaction left unfinished by a previous instance of the producer.
-
Create an entry (
init
) in the__transaction_state
log for the giventransactional.id
if this is the first transaction for the producer.
-
-
Starting a transaction is done by calling the
producer.beginTransaction
method. Once a transaction is started, all recorded messages will be part of the transaction. The__transaction_state
log for thistransactional.id
changes the transaction state toongoing
. -
Cycle consume → processing → produce messages:
-
Register a new partition as part of a transaction using the AddPartitionsToTxnRequest request.
-
Write messages (including PID, epoch, and sequence number) to the user’s partition using the
producer.send
method. -
Send offsets of read messages within a transaction with
groupId
identifier to the transaction coordinator using theproducer.sendOffsetsToTransaction
method to register the addition of this partition to the transaction log. -
Register offsets in the
__consumer-offsets
topic using the TxnOffsetCommitRequest request from producer to consumer coordinator (including PID and epoch). Recording offsets in the__consumer-offsets
topic also becomes part of this transaction. The consumer coordinator verifies that the producer is not a "zombie instance". Recorded offsets are not visible to consumers until the transaction is confirmed.
-
-
Confirming a transaction using the
commitTransaction
method, which performs the following functions:-
Write the
prepare_commit
message to the transaction log. -
Write
COMMIT
markers to user topics, including the__consumer_offsets
topic. -
Write the
committed
message to the transaction log. -
Open access for consumers to messages recorded in a topic.
-
Thus, the offsets of all messages read in one transaction were written only after they were processed.
Read transactional messages
The reading of transactional messages by the consumer is controlled by the isolation.level parameter:
-
If the parameter value is set to
read_committed
, the consumer buffers messages that have a PID until it reads a message with aCOMMIT
marker. After this, the messages will be delivered. Messages are read only up to the last stable offset, which is less than the offset of the first open transaction.read_committed
consumers will not be able to read topics in their entirety while there are pending transactions. -
If the parameter value is set to
read_uncommitted
, all messages are visible to consumers, even if they were part of an aborted transaction.
Possible "overhead" of exactly once semantics:
-
in the work of the producer and broker — the more messages in the transaction, the less resource loss (additional memory, increased processing speed, delays in work);
-
in the work of the consumer — reading a large transaction causes blocking of the read partition for consumers with the
read_committed
setting and loss of resources (memory for buffering, increased processing speed, delays in operation).