Log compaction in Kafka
This article describes the existing log cleaner (compaction) policies in Kafka, the parameters used when configuring the policies, and how to configure the policies.
Log cleanup policy
log.cleanup.policy — the main parameter that determines the strategy for deleting old records.
The following are possible log purge policy options:
-
delete — deleting records due to reaching one of the specified parameters or two parameters at the same time:
-
messages storage period;
-
volume of stored data (in bytes).
-
-
compact — retaining only the latest values for each key.
-
delete and compact — retaining only the latest values for each key, taking into account the parameters of storage periods or volume of information.
The choice of log cleanup policy depends on the project requirements.
The figure below shows how the message queue differs after applying different log cleanup policies.
The following describes the storage algorithms depending on the selected policy for a message at offset 5
:
-
with the
delete
policy, the message is deleted because its storage time is longer than the specified value; -
with the
compact
policy, the message is stored in the queue, since it contains the last value for thekey2
key; -
with the
delete and compact
policy, the message is deleted due to expiration, even though the value for thekey2
key was not overwritten.
All log cleanup policies are based on working with segment partitions.
All log cleanup policies and the parameters used are discussed in detail below.
Delete
The delete policy is suitable for logs where only event data is required and messages are not overwritten.
When you select a deletion policy, messages are deleted according to the configured settings:
-
log.retention.hours — storage time for a message, after which it will be deleted. This parameter is set by default for all brokers and created topics (value
168
). This value can also be changed for each topic individually via the retention.ms parameter. -
log.retention.bytes — log size in bytes, after reaching which the log will be deleted (default value is
-1
— no limit). This value can also be set for each topic individually via the retention.bytes parameter.
It is also possible to set both parameters at once to simultaneously control the message lifetime and log volume.
To control how often messages are deleted, use the log.cleanup.interval.mins
parameter. This parameter determines to determine how often the log cleaner checks for logs to be deleted.
The message deletion algorithm distinguishes between the following types of log partition segments:
-
Log segment — segment where messages are stored and new messages are not written.
-
Active log segment — segment that is currently being written to. The active segment does not participate in the message deletion algorithm.
The diagram for deleting Kafka log messages with a retention time limit is shown in the figure below.
Messages are deleted as part of segments in the following sequence:
-
Determining the message with the maximum timestamp (the newest one available) within a single segment;
-
Determining the difference between the maximum timestamp (
Tmax
) and the current time (Tcurrent
), comparing the resulting value with the value specified inlog.retention.hours
(retention.ms
for topics). -
If the condition
Tcurrent - Tmax > log.retention
is met for a segment, then the entire segment is deleted.
Thus, the total lifetime of a message may be greater than the value specified in log.retention.hours
and depends on the segment size value set via segment.bytes.
Сompact
Сompact policy overview
The compact policy is typically used for the logs containing messages in key/value format. For example, databases that store frequently overwritten values for the same keys.
This policy is the default for the __consumer_offsets
Kafka service topic.
The log compaction algorithm distinguishes between the following types of log partition segments:
-
Clean log segment — segments in which there are no duplicate keys, i.e. all segments contain only one value for each key — the last one at the time of the previous compaction. In "clean" segments there may be gaps in the offset numbering due to previous compaction.
-
Dirty log segment — segments that were written after the last "clean" segment. They may contain duplicate keys, as well as keys present in "clean" segments. "Dirty" segments begin at the cleaner point.
-
Active log segment — segment that is currently being written to. The active segment does not participate in the log compaction algorithm.
Start compaction
The moment the compaction starts is regulated by the broker parameters:
-
log.cleaner.min.cleanable.ratio — ratio of the volume of "dirty" segments to the total volume of the log at which compaction starts (the default value is
0.5
). This value can also be set for each topic individually via the min.cleanable.dirty.ratio parameter. -
log.cleaner.min.compaction.lag.ms — the minimum time during which a message will remain in the log uncompacted (default is
0
— no limit). Compaction will not start until the specified time has elapsed, even if the value of themin.cleanable.dirty.ratio
parameter has reached the specified level. This value can also be set for each topic individually via the min.compaction.lag.ms parameter. -
log.cleaner.max.compaction.lag.ms — the maximum time during which a message cannot be compacted in the log (the default value is
9223372036854775807
for all created topics). Compaction will begin after the specified time has elapsed, even if the value of themin.cleanable.dirty.ratio
parameter has not reached the specified level. This value can also be set for each topic individually via the max.compaction.lag.ms parameter.
Compaction algorithm
The basis of the compaction algorithm is the creation of a hash table, which is the result of scanning "dirty" segments. The hash table records the last offsets found for each key. Next, using this table, the cleanup threads copy the entire log to a buffer, remove offsets that are not in the hash table, and rewrite the remaining offsets into a new segment. This preserves the original offset of each event (maybe with gaps in the offsets).
The size of the cleanup thread buffer and its fill factor are determined by the broker configuration parameters:
The Kafka log compaction sequence is shown in the figure below.
-
Scan "dirty" segments from the cleaner point to the start of the active segment, create a hash table with the latest offsets, and determine which messages will be deleted. The exact number of segments scanned per iteration depends on the memory size of the cleanup thread buffer.
-
Scan "clean" segments based on the last keys written to the hash table when scanning "dirty" segments. Determine which messages will be deleted.
-
Copy retained offsets in both "dirty" and "clean" segments to new log segments. Depending on the segment size value set segment.bytes, messages can be copied into one or more new segments.
-
Removing old segments and setting a new cleaner point at the last cleared offset that marks the start of the first "dirty" segment, which will become the "active" segment after filling. The next stage of log compaction will start from this segment.
Deleting using compaction
After writing a message with a key and a null value, the message will be removed from the log.
The sequence for deleting a log entry using compaction is shown in the figure below.
-
Messages with a key and a null value are written to the "active" log segment. This type of recording is called a tombstone.
-
After the segment containing the tombstone is closed, it becomes a "dirty" segment. When it is compacted according to the algorithm described by above, all previous messages with the same key as the tombstone are deleted, and the tombstone itself is marked as ready for deletion. Tombstone is assigned a deletion time determined by the broker configuration parameter log.cleaner.delete.retention.ms (this value can also be changed for each topic individually using the parameter delete.retention.ms). During this time, the tombstone will be available for reading.
-
During subsequent compactions all events whose deletion time has already been reached will be deleted.
Compaction guarantees and benefits
-
Since compaction retains the most recent value for each key, it is ideal for backing up tables in various databases.
-
Compacted topics continue to receive updates without increasing the volume of logs.
-
Users reading compacted topics are guaranteed to see the most recent value for the desired key.
-
Compaction never changes the order of messages.
-
The message offset never changes. This is a permanent log item identifier.
-
It is possible to configure the storage time for record deletion markers.
-
Compaction does not block reading and writing of messages and can be configured to use rational amounts of memory resources so as not to impact producers and consumers.
Configure log cleanup
In Kafka, log cleaning is enabled via the log.cleaner.enable broker option that is defined in the configuration file /usr/lib /kafka/config/server.properties.
After adding and installing the Kafka service as part of ADS cluster, the Log Cleaner can be enabled on the configuration page of the Kafka service via ADCM. You need to turn on the Log Cleaner toggle, save changes by clicking Save, and apply the Restart action to Kafka.
Configure a cluster
Configuring compaction for the entire cluster, individual hosts (Kafka brokers), or groups of hosts can be done via the ADCM interface on the configuration page of the Kafka service. Available parameters are displayed in the Main and Log Cleaner nodes of the configuration tree. The Log Cleaner parameters are visible when the Log Cleaner toggle is on.
The following are the configurable log cleanup options available in ADCM, with a link to a description of how to use the option:
After changing the settings using the ADCM interface, restart the Kafka service. To do this, apply the Restart action by clicking in the Actions column.
Also, the necessary broker parameter can be set in the configuration file /usr/lib/kafka/config/server.properties.
Configure a topic
Individual topic configuration is performed via the command line. Use the kafka-topics.sh
script with the --config
option. This option overrides the configuration parameters when a new topic is created or for an existing topic.
$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config cleanup.policy=delete --config retention.ms=10000 --config retention.bytes=128000 --bootstrap-server localhost:9092
It is also possible to configure log cleanup for a specific topic via the CMAK user interface. The interface becomes available after adding and installing the Kafka-Manager service in the ADS cluster.
The setting can be performed for a created topic or when creating a new topic.
Select Topic → List in the top menu and go to the list of topics. Select the desired topic from the list by clicking the title. On the topic page that opens, click Update Config.
Set the required cleanup parameters and click Update Config.
NOTE
For information about the basic principles of working with Kafka and Kafka-Manager services, refer to the Quick start with Kafka article. |