Kafka сonfiguration best practices

NOTE

For information about the basic principles of working in Kafka, you can refer to Quick start with Kafka article.

Сonfiguration best practices for Kafka brokers

After installing ADS, the necessary parameters for Kafka brokers are automatically set in the configuration file /usr/lib/kafka/config/server.properties. The ADS configuration parameters article describes the parameters that can be changed when configuring the Kafka service.

If the required parameter is missing, to add it, use the Add key,value line in the server.properties group list, where you need to write the key and value in the appropriate fields.

Another way is to add the appropriate line in the /usr/lib/kafka/config/server.properties file.

Some parameters can be set directly on the command line for running scripts using options. This setting overrides the defaults specified in /usr/lib/kafka/config/server.properties.

Recommendations for configuring some parameters for Kafka brokers are given below.

Сonfigure topic replication

Topic replication is determined by the following parameters:

  • default.replication.factor — default replication factor for automatically created topics.

    A high replication factor is essential for designing a reliable system. With a replication factor of 2 or more, Kafka replicates the log for partitions of each topic across multiple servers. When the server goes down, there is an automatic failover to these replicas, messages remain available. For efficient operation of the system, it is recommended to set the replication factor to at least 3.

    This parameter can be changed in the ADCM interface.

    It is possible to set the replication factor for each topic separately.

    Example of setting the replication factor for a single topic

    It is necessary to use the --replication-factor option when creating a topic:

    $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic --replication-factor 3 --bootstrap-server hostname:9092

    When requesting information for the created topic, you can see that the topic has 3 replicas:

    $ /usr/lib/kafka/bin/kafka-topics.sh --describe --topic new-topic --bootstrap-server hostname:9092
    Topic: new-topic	TopicId: DuKhO8T3SUyhyivN3k5Lnw	PartitionCount: 1	ReplicationFactor: 3	Configs: unclean.leader.election.enable=false
    	Topic: new-topic	Partition: 0	Leader: 1002	Replicas: 1002,1001,1003	Isr: 1002,1001,1003

    If necessary, it is possible to change the replication factor for a single partition.

    Example of changing the replication factor

    To change, create custom remapping plan manually in JSON file:

    $ sudo vim increase-replication-factor.json

    JSON file content:

      {"version":1,
      "partitions":[{"topic":"new-topic","partition":0,"replicas":[1002,1001,1003]}]}

    where the replication factor of the partition 0 of the new-topic topic is extended to the brokers 1002,1001,1003.

    Run the remapping process using the generated JSON file with the --execute option:

    $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  • min.insync.replicas — the minimum number of insync replicas required to resolve producer requests with request.required.acks set, described below.

    When the producer sets request.required.acks to all (or -1), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an error (NotEnoughReplicas or NotEnoughReplicasAfterAppend).

    This parameter can be set in the ADCM interface.

    When used together, min.insync.replicas and request.required.acks provide higher reliability guarantees.

    Success scenario:

    • topic with replication factor 3;

    • min.insync.replicas with value 2;

    • request.required.acks with value all (or -1).

      This combination of options ensures that the producer will raise an error if most of the replicas do not receive a write.

      It is possible to set min.insync.replicas for each topic separately.

      Example of setting a parameter for a single topic

      When creating a topic, use the --config option with the min.insync.replicas parameter:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config min.insync.replicas=3 --bootstrap-server hostname:9092

Configure topic partitions

The num.partitions parameter specifies the default number of partitions for each topic created.

Splitting topics into partitions leads to better data balancing and promotes consumer parallelism. For data with keys, you should avoid changing the number of partitions in a topic.

This setting can be changed in the ADCM interface.

It is possible to set the number of partitions for each topic separately.

Example of setting the number of partitions for a single topic

When creating a topic, use the --partitions option:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic topic1 --partitions 3 --bootstrap-server hostname:9092

Retention policies configure

To optimize disk space, set the correct policy for storing messages in a topic.

The storage policy is defined by the following parameters:

  • log.cleanup.policy — defines the retention policy to use for log segments. Default value is valid for all custom topics. Can be changed in the ADCM interface. Possible values:

    • delete — deletes old topic segments when their retention time or size limit has been reached;

    • compact — enables log compression, which keeps the last value for each key. You can also specify both policies in a comma-separated list (e.g. delete, compact). In this case, the old segments will be discarded according to the retention time and size configuration, and the remaining segments will be compressed.

      The compact policy makes sense for topics where the last value for each key in the topic is important.

  • log.retention.ms — specifies the maximum time that messages in a topic will be retained before old topic segments are deleted to make room if the retention policy is set to delete. If set to -1, no time limit is applied. The parameter can be set in the ADCM interface.

  • log.retention.bytes — specifies the maximum partition size after which old log segments will be deleted to make room if the storage policy is set to delete. There is no size limit by default, only a time limit. Since this limit is applied at the partition level, multiply it by the number of partitions to calculate the topic retention in bytes. The parameter can be set in the ADCM interface.

It is possible to set these parameters for each topic separately.

Example of setting a retention policy for a single topic

When creating a topic, use the --config option with parameters cleanup.policy, retention.ms, retention.bytes:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config cleanup.policy=delete --config retention.ms=10000 --config retention.bytes=128000 --bootstrap-server localhost:9092

Set the graceful shutdown

Graceful shutdown is important to system reliability.

The controlled.shutdown.enable parameter set to true enables a controlled shutdown of the server. The parameter can be set in the ADCM interface.

When a server goes down or is intentionally taken down for maintenance or a configuration change, the Kafka cluster will automatically detect any outage or broker failure and select new leaders for the partitions on that machine.

When the server gracefully stops, it uses two optimizations:

  1. Synchronizing all logs to disk to avoid having to rebuild the logs on restart (i.e. checking the checksum for all messages at the end of the log). Log recovery takes time, so it speeds up deliberate restarts.

  2. Move all partitions for which the server is the master to other replicas before shutting down. This will speed up the transfer of leadership and minimize the downtime of each partition to a few milliseconds.

Note that a controlled shutdown will only succeed if all partitions hosted on the broker have replicas (i.e. default.replication.factor is set to 2 or greater).

Set the leadership balance

Whenever a broker stops or crashes, control of that broker’s partitions is transferred to other replicas. After the broker is restarted, it will only be a slave for all its partitions, that is, it will not be used for client reads and writes. To avoid this imbalance, Kafka has the concept of preferred replicas. If the replica list for a partition is 1, 5, 9, then node 1 is preferred as the leader over node 5 or 9 because it is earlier in the replica list.

The auto.leader.rebalance.enable parameter set to true enables automatic leader rebalancing (returning leadership to the preferred replica) in the background at regular intervals. The parameter can be changed in the ADCM interface.

If you set this parameter to false, you should manually restore the leadership of the restored replicas after a server reboot. To do this, you need to run the command:

$ /usr/lib/kafka/bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

Сonfiguration best practices for working with producers and consumers

Configure the producer to receive confirmation of a message record

The request.required.acks parameter specifies when a production request is considered complete. In particular, how many other brokers should log the data and confirm it to the leader. Possible values:

  • 0 — the producer never waits for confirmation from the broker. This option provides the least latency, but guarantees the least reliability (some data will be lost if the server crashes).

  • 1 — the producer is acknowledged after the master has received data. This is the default value.

  • -1 — the producer receives confirmation after all synchronized replicas have received data. This option provides the best stability.

Using the min.insync.replicas options described by above and request.required.acks together allows for higher reliability guarantees.

Parameter setting example

When writing messages to a topic, use the --request-required-acks option:

$ bin/kafka-console-producer.sh --topic new-topic --request-required-acks -1 --bootstrap-server hostname:9092

Check the consumer position

Using the kafka-consumer-groups.sh script it is possible to see the position of the topic consumers.

Example of viewing the position of the topic consumers

Command to view information for a consumer group:

$ /usr/lib/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group new-group

The result shows the consumer’s position in the consumer group named new_group and how far behind it is from the end of the new-topic topic log.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
new-group       new-topic       2          2               3               1               consumer-new-group-1-af831b5a-9111-4e54-a028-6a433bc71f7e /10.92.18.30    consumer-new-group-1

Assign consumer groups to topics

Every message is sent to each consumer group that subscribes to a topic or partition, but within a group it is sent to only one consumer. Thus, all consumer groups that subscribe to a topic receive messages, but only one consumer in a consumer group receives a message from the topic.

If you need to send a message to multiple consumers, you should assign them to different consumer groups.

Example of assigning a group to a consumer

When connecting to a topic, use the --group option :

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic new-topic --from-beginning --group new-group --bootstrap-server hostname:9092

Secure environment

For information about configuring security in a Kafka environment, you can go to Basic OperationsKafkaAccess management.

Hardware

Equipment

  • If possible, use separate servers or racks for Kafka brokers.

  • To apply the optimal replication factor, have at least 3 Kafka brokers in the system.

  • Provide HA (High Availability) by meeting the requirements described in the Hardware requirements article in accordance with the planned load.

Memory

  • Kafka can run on a large file system to store and cache messages and use little heap space.

  • Kafka may need enough memory to buffer active reads and writes when running.

Processor

  • Enabling SSL channel security may increase CPU requirements (exact data depends on CPU type and JVM implementation).

  • When choosing a processor between faster processors or more cores, the higher number of cores should be selected. The extra parallelism that multiple cores offer is preferable to a higher clock speed.

Data storage

  • It is not recommended to use a JBOD disk array because tiered storage requires a single mount point.

  • It is recommended to use multiple drives to maximize throughput.

  • It is not recommended to share the same drives used for Kafka data with application logs or other OS file system activity.

  • If you are setting up multiple data directories, the broker places the new partition on the path with the fewest partitions saved. Each partition will reside entirely in one of the data directories. If data is not balanced between partitions, this can lead to load imbalance between disks.

  • A RAID disk array can potentially do a better job of load balancing across disks because it balances the load at a lower level.

  • A RAID 10 disk array is recommended as the best option for most use cases. It provides improved read and write performance, data protection (ability to withstand disk failures), and fast recovery.

  • Network Attached Storage (NAS) is not recommended. NAS is often slower, shows higher latency with wider average latency variance, and is a single point of failure.

Network

For network connection requirements, see Network requirements article. Low latency ensures that nodes can easily communicate, while high throughput helps move and restore shards. Modern data center networks (1 GbE, 10 GbE) are sufficient for the vast majority of clusters.

Software

Main guidelines

  • For software requirements, see Software requirements article.

  • It is recommended to use XFS or ext4 file system to run Kafka.

  • Use the latest released version of Java to ensure that known security issues are resolved.

  • The recommended GC setup (tested on a large deployment with JDK 1.8 u5) is as follows:

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
  • The guidelines below may be helpful to improve compatibility between Kafka and your operating system. For the best application of recommendations, you should refer to the documentation for a specific distribution.

File handles

Kafka has the potential to require a relatively large number of available file handles at any given time.

Many modern Linux distributions come with only 1024 file descriptors allowed per process. This is not enough for Kafka.

You need to increase the number of file handles to at least 100,000, and possibly much more.

To change the open file limit, you need to edit /etc/security/limits.conf, a file that sets resource limits for users logged in through PAMPrivileged Access Management (PAM). To do this, for all user groups, increase the value of the nofile parameter — the maximum number of open files for types hard and soft, by adding the following lines to /etc/security/limits.conf:

*  hard  nofile  100000
*  soft  nofile  100000

Virtual memory

You may need to change the virtual memory size limit (vm.max_map_count) to allow the required number of mmap functions to be processed.

To calculate the current number of mmaps, you need to know the number of .index files in the Kafka data directory. .index files are the most memory-mapped files.

You can count .index files with this command:

$ find . -name '*index' | wc -l

Set vm.max_map_count in the /etc/sysctl.conf file for the session. This will set the current number of memory-mapped files. The minimum mmap limit value (vm.max_map_count) is the number of open ulimit files.

Set vm.max_map_count sufficiently higher than the number of .index files to account for broker segment growth.

$ sysctl -w vm.max_map_count=262144
$ echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
$ sysctl -p
Found a mistake? Seleсt text and press Ctrl+Enter to report it