Message delivery guarantees in Kafka

This article describes message delivery guarantees when using the delivery semantics that exist in Kafka.

The main message delivery problems that arise when working with Kafka:

  • loss of messages due to various failures (producer, consumer, messaging system, network);

  • duplication of messages.

Kafka components that participate in message writing and sequence reading can be configured to provide the following delivery semantics:

  • At most once — messages will be processed once or not processed at all (lost).

  • At least once — messages will be processed at least once.

  • Exactly once — every message will be processed once and only once.

When using different delivery semantics, system performance may be affected. A stronger guarantee may reduce performance, while a weaker guarantee will increase performance.

At most once

At most once delivery semantics is usually used in systems where it is preferable to lose rather than deliver a message twice. Has the lowest "overhead" (additional memory, increased processing speed, delays in operation).

The operating principle of the at most once delivery semantics is shown in the figure below.

At most once
At most once
At most once
At most once
  1. Let us assume that Producer writes messages in batches to a Kafka topic (the batch size is set by the batch.size parameter), without waiting for write confirmation (acks = 0). All messages are considered sent.

  2. Consumer 1 also reads messages in batches.

  3. On the consumer side, Consumer 1 automatically (enable.auto.commit = true) commits the offsets of all messages, that were read over a certain period of time (defined by the auto.commit.interval.ms parameter).

  4. If Consumer 1 fails after reading a message batch, further reading is performed by a new Consumer 2.

  5. The new consumer is assigned a new messages batch starting from the last recorded offset.

  6. Consumer 2 commits the offsets and processes the received messages.

In this scheme, message loss can occur:

  • on the producer end — if messages were sent with an error;

  • on the Kafka broker end — if the broker fails;

  • on the consumer end — if the message offsets are committed before the consumer fails and the read messages will not be processed.

At least once

At least once delivery semantics are used in systems where it is unacceptable to lose even one message. In case of failure, some messages may be processed additionally. It has possible "overhead" in the work of the producer and broker (additional memory, increased processing speed, delays in work).

As a rule, when using this semantics, automatic commit of offsets is disabled (enable.auto.commit = false). In this case, manual commit of offsets is used after processing messages, for example, using the Consumer.commitSync (the application is blocked until the broker responds to the commit request) or Consumer.commitAsync (the application is not blocked) methods specified in KafkaConsumer class API.

The operating principle of the at least once delivery semantics is shown in the figure below.

At least once
At least once
At least once
At least once

Description of semantics and possible problems on the end of the producer or broker:

  1. Let us assume that Producer writes a message C with offset 2 to the Kafka topic and does not receive confirmation of the write from the partition leader (acks = 1) .

  2. Producer retries sending the message until the values of retries and delivery.timeout.ms parameters expire.

  3. If the write is successful, Producer receives confirmation from the broker.

In this scheme, duplication of C messages can occur if a broker or producer fail after writing a message to a topic. This results in two identical messages written at different offsets. If the producer writes messages in batches and tries to write again, multiple messages may be duplicated during a single write iteration.

Description of semantics and possible problems on the consumer end:

  1. Consumer 1 reads recorded messages A and B from Kafka.

  2. Let us assume that Consumer 1 fails after processing A and before processing B and they are in the same batches. Then, depending on the logic of the consumer application, the commit of offset A may not be executed.

  3. Since offsets A and B are not committed, the new consumer Consumer 2 reads the same messages.

  4. Upon successful message processing, Consumer 2 commits offsets A and B.

In this scheme, duplicate processing of A messages can occur if a consumer fails and the offset commit fails, after processing the message.

Exactly once

Exactly once semantics assumes that there are no cases of message duplication found in at least once delivery semantics.

For exactly once semantics, the following mechanisms are usually applied simultaneously:

The mechanisms and their application in exactly once semantics are described below.

Idempotent producer

Idempotent producer is a producer that is assigned a producer identifier (PID), and the PID is included whenever that producer sends messages to the Kafka broker.

Every message from an idempotent producer receives an increasing sequence number, distinct from the offset. For each topic partition to which the producer sends messages, a separate sequence is maintained. The broker for each partition keeps track of the largest PID+sequence number combination recorded. If a message is received with a lower sequence number than the previous one recorded, it is ignored.

This way the message will only be recorded once, no matter how many attempts are made to send it.

To enable the idempotent producer feature, add the enable.idempotence property with the value true.

The following describes the parameter values ​​that must be set for the idempotent producer function to work without errors:

NOTE
  • An idempotent producer can be used as a component of a hybrid variant of the at least once semantics.

  • The idempotent producer function can only be saved if the message rewrite was done using the producer’s internal settings, as described in the at least once semantics, rather than an external application.

Transactions

Transaction — atomic recording to one or more Kafka topics and partitions. All messages included in the transaction will be successfully written, or none will be written. If a transaction is aborted, none of the transaction messages are readable by consumers.

Transactions in Kafka are based on the following components:

  • transactional.id — individual identifier assigned to the transaction producer.

  • Transaction coordinator — module running inside each Kafka broker. Only one coordinator owns the registered transactional.id. For reliable operation of the coordinator, the Kafka replication protocol and the leader election process are used. The transaction coordinator is the only component that can read and write the transaction log.

  • Transaction log (__transaction_state) — internal Kafka topic that stores the latest transaction state. Possible transaction states:

    • ongoing;

    • prepare_commit;

    • committed.

      Each transaction coordinator manages some partitions in the transaction log and is a leader for them.

  • Epoch — piece of metadata that stores the number of connections of each transactional.id.

  • Zombie instance — producer instance that started recording messages and failed without completing the recording.

Configuring and working within the transaction occurs using the API class KafkaProducer and other APIs related to the operation of transactions.

Exactly once semantics

The main operating principle of exactly once semantics in Kafka is shown in the figure below.

Exactly once
Exactly once
Exactly once
Exactly once

Transaction recording sequence

  1. The transaction coordinator is searched using the FindCoordinatorRequest request. The producer connects to any known broker in the cluster to find out the location of its coordinator.

  2. Registration of the producer with the transaction coordinator is performed using a synchronous InitPidRequest request. In this case, the producer indicates its transactional.id.

    This request performs the following functions:

    • Return the same PID for one transactional.id to future instances of the producer.

    • Increment and return the PID epoch so that any previous producer "zombie instance" is blocked and cannot continue its transaction.

    • Remove any transaction left unfinished by a previous instance of the producer.

    • Create an entry (init) in the __transaction_state log for the given transactional.id if this is the first transaction for the producer.

  3. Starting a transaction is done by calling the producer.beginTransaction method. Once a transaction is started, all recorded messages will be part of the transaction. The __transaction_state log for this transactional.id changes the transaction state to ongoing.

  4. Cycle consume → processing → produce messages:

    • Register a new partition as part of a transaction using the AddPartitionsToTxnRequest request.

    • Write messages (including PID, epoch, and sequence number) to the user’s partition using the producer.send method.

    • Send offsets of read messages within a transaction with groupId identifier to the transaction coordinator using the producer.sendOffsetsToTransaction method to register the addition of this partition to the transaction log.

    • Register offsets in the __consumer-offsets topic using the TxnOffsetCommitRequest request from producer to consumer coordinator (including PID and epoch). Recording offsets in the __consumer-offsets topic also becomes part of this transaction. The consumer coordinator verifies that the producer is not a "zombie instance". Recorded offsets are not visible to consumers until the transaction is confirmed.

  5. Confirming a transaction using the commitTransaction method, which performs the following functions:

    • Write the prepare_commit message to the transaction log.

    • Write COMMIT markers to user topics, including the __consumer_offsets topic.

    • Write the committed message to the transaction log.

    • Open access for consumers to messages recorded in a topic.

Thus, the offsets of all messages read in one transaction were written only after they were processed.

Read transactional messages

The reading of transactional messages by the consumer is controlled by the isolation.level parameter:

  • If the parameter value is set to read_committed, the consumer buffers messages that have a PID until it reads a message with a COMMIT marker. After this, the messages will be delivered. Messages are read only up to the last stable offset, which is less than the offset of the first open transaction. read_committed consumers will not be able to read topics in their entirety while there are pending transactions.

  • If the parameter value is set to read_uncommitted, all messages are visible to consumers, even if they were part of an aborted transaction.

Guarantees of exactly once semantics can be provided if the application in which the consumer reads transactional messages from a Kafka topic then passes them (after processing) to another Kafka topic (for example, Kafka Streams applications). This is because the consumer offset is stored as a message in the topic, so offsets when reading messages can be written in the same transaction as the user topic messages. If the transaction is aborted, the last offset returns to the old value, and the created messages in the user topics will not be visible to consumers.

Possible "overhead" of exactly once semantics:

  • in the work of the producer and broker — the more messages in the transaction, the less resource loss (additional memory, increased processing speed, delays in work);

  • in the work of the consumer — reading a large transaction causes blocking of the read partition for consumers with the read_committed setting and loss of resources (memory for buffering, increased processing speed, delays in operation).

Found a mistake? Seleсt text and press Ctrl+Enter to report it