Рекомендации по настройке Kafka

ПРИМЕЧАНИЕ

Для получения информации об основных принципах работы в Kafka можно обратиться к статье Начало работы c Kafka.

Рекомендации по настройке брокеров Kafka

После установки ADS в конфигурационном файле /usr/lib/kafka/config/server.properties автоматически устанавливаются необходимые параметры для брокеров Kafka. Параметры, доступные к изменению при конфигурировании сервиса Kafka, описаны в статье Конфигурационные параметры ADS.

Если необходимая настройка отсутствует, для ее добавления следует воспользоваться строкой Add key,value в списке группы server.properties, где требуется написать ключ и значение в соответствующих полях. Другой вариант — в файле /usr/lib/kafka/config/server.properties добавить соответствующую строку.

Некоторые параметры могут быть установлены непосредственно в командной строке запуска скриптов при помощи опций. Эта установка переопределяет значения по умолчанию, приведенные в /usr/lib/kafka/config/server.properties.

Ниже приведены рекомендации для настройки некоторых параметров брокеров Kafka.

Настройка репликации топиков

Репликацию топиков определяют следующие параметры:

  • default.replication.factor — коэффициент репликации по умолчанию для автоматически создаваемых топиков.

    Высокий коэффициент репликации необходим для проектирования надежной системы. При коэффициенте репликации 2 и более Kafka реплицирует журнал для партиций каждого топика на нескольких серверах. Когда сервер выходит из строя, происходит автоматическое переключение на эти реплики, сообщения остаются доступными. Для эффективной работы системы рекомендуется установить коэффициент репликации не менее 3.

    Этот параметр может быть изменен в интерфейсе ADCM.

    Существует возможность установить коэффициент репликации для каждого топика отдельно.

    Пример установки коэффициента репликации для отдельного топика

    Необходимо при создании топика использовать опцию --replication-factor:

    $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic --replication-factor 3 --bootstrap-server hostname:9092

    При запросе информации для созданного топика можно увидеть, что топик имеет 3 реплики:

    $ /usr/lib/kafka/bin/kafka-topics.sh --describe --topic new-topic --bootstrap-server hostname:9092
    Topic: new-topic	TopicId: DuKhO8T3SUyhyivN3k5Lnw	PartitionCount: 1	ReplicationFactor: 3	Configs: unclean.leader.election.enable=false
    	Topic: new-topic	Partition: 0	Leader: 1002	Replicas: 1002,1001,1003	Isr: 1002,1001,1003

    При необходимости возможно изменить коэффициент репликации у существующей партиции.

    Пример изменения коэффициента репликации

    Для изменения необходимо:

    Создать собственный план переназначения вручную в JSON-файле:

    $ sudo vim increase-replication-factor.json

    Содержимое JSON-файла:

      {"version":1,
      "partitions":[{"topic":"new-topic","partition":0,"replicas":[1002,1001,1003]}]}

    где коэффициент репликации партиции 0 топика new-topic распространяется на брокеры 1002,1001,1003.

    Запустить процесс переназначения с использованием созданного JSON-файла с параметром --execute:

    $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  • min.insync.replicas — минимальное количество синхронизированных реплик, необходимых для разрешения запросов производителя с установленным параметром request.required.acks, описанным ниже.

    Когда производитель устанавливает request.required.acks на all (или -1), min.insync.replicas указывает минимальное количество реплик, которые должны подтвердить запись, чтобы запись считалась успешной. Если этот минимум не может быть соблюден, то производитель вызовет ошибку (NotEnoughReplicas или NotEnoughReplicasAfterAppend).

    Этот параметр может быть задан в интерфейсе ADCM.

    При совместном использовании min.insync.replicas и request.required.acks позволяют обеспечить более высокие гарантии надежности.

    Успешный сценарий:

    • топик с коэффициентом репликации 3;

    • min.insync.replicas со значением 2;

    • request.required.acks со значением all (или -1).

      Такое сочетание параметров гарантирует, что производитель вызовет ошибку, если большинство реплик не получит запись.

      Существует возможность установить min.insync.replicas для каждого топика отдельно.

      Пример установки параметра для отдельного топика

      Необходимо при создании топика использовать опцию --config c указанием параметра min.insync.replicas:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config min.insync.replicas=3 --bootstrap-server hostname:9092

Настройка партиций топиков

Параметр num.partitions определяет количество партиций по умолчанию для каждого создаваемого топика.

Разбиение топиков на партиции приводит к лучшей балансировке данных и способствует параллелизму потребителей. Для данных с ключами следует избегать изменения количества партиций в топике.

Этот параметр может быть изменен в интерфейсе ADCM.

Существует возможность установить количество партиций для каждого топика отдельно.

Пример установки количества партиций для отдельного топика

Необходимо при создании топика использовать опцию --partitions:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic topic1 --partitions 3 --bootstrap-server hostname:9092

Настройка политик хранения

Для оптимизации дискового пространства необходимо установить корректную политику хранения сообщений в топике.

Политику хранения определяют следующие параметры:

  • log.cleanup.policy — определяет политику хранения для использования в сегментах журнала. Значение по умолчанию действует для всех пользовательских топиков. Может быть изменен в интерфейсе ADCM. Возможные значения:

    • delete — отбрасывает старые сегменты топика, когда их время хранения или предельный размер были достигнуты;

    • compact — включает сжатие журнала, при котором сохраняется последнее значение для каждого ключа. Также можно указать обе политики в списке, разделенном запятыми (например, delete, compact). В этом случае старые сегменты будут отброшены в соответствии с конфигурацией времени хранения и размера, а оставшиеся сегменты будут сжаты.

      Политика compact имеет смысл для топиков, где важно последнее значение для каждого ключа в топике.

  • log.retention.ms — определяет максимальное время, в течение которого сообщения в топике будут храниться, прежде чем будут удалены старые сегменты топика, чтобы освободить место, если установлена политика хранения delete. Если установлено значение -1, ограничение по времени не применяется. Параметр может быть задан в интерфейсе ADCM.

  • log.retention.bytes — определяет максимальный размер партиции, после достижения которого будут удалены старые сегменты журнала, чтобы освободить место, если установлена политика хранения delete. По умолчанию нет ограничений по размеру, только ограничение по времени. Поскольку это ограничение применяется на уровне партиции, умножьте его на количество партиций, чтобы вычислить размер неудаляемой части топика в байтах. Параметр может быть задан в интерфейсе ADCM.

Существует возможность установить данные параметры для каждого топика отдельно.

Пример установки политики хранения для отдельного топика

Необходимо при создании топика использовать опцию --config c указанием параметров cleanup.policy, retention.ms, retention.bytes:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic new-topic1 --config cleanup.policy=delete --config retention.ms=10000 --config retention.bytes=128000 --bootstrap-server localhost:9092

Настройка корректного завершения работы

Настройка корректного завершения работы важна для надежности системы.

Параметр controlled.shutdown.enable со значением true включает управляемое отключение сервера. Параметр может быть задан в интерфейсе ADCM.

При выходе сервера из строя или намеренного отключения для обслуживания или изменения конфигурации кластер Kafka автоматически обнаружит любое отключение или сбой брокера и выберет новых лидеров для партиций на этой машине.

Когда сервер корректно останавливается, он использует две оптимизации:

  1. Синхронизация всех журналов с диском, чтобы избежать необходимости восстанавливать журналы при перезапуске (т.е. проверять контрольную сумму для всех сообщений в конце журнала). Восстановление журнала требует времени, поэтому это ускоряет преднамеренные перезапуски.

  2. Перенос всех партиций, для которых сервер является ведущим, на другие реплики перед выключением. Это ускорит передачу лидерства и сведет к минимуму время недоступности каждой партиции до нескольких миллисекунд.

Обратите внимание, что контролируемое завершение работы будет успешным только в том случае, если все партиции, размещенные на брокере, имеют реплики (т.е. параметр default.replication.factor имеет значение 2 и более).

Настройка баланса лидерства

Всякий раз, когда брокер останавливается или аварийно завершает работу, управление партициями этого брокера передается другим репликам. После перезапуска брокера он будет ведомым только для всех своих партиций, то есть не будет использоваться для операций чтения и записи клиента. Чтобы избежать этого дисбаланса, в Kafka есть понятие предпочтительных реплик. Если список реплик для партиции равен 1, 5, 9, то узел 1 предпочтительнее в качестве лидера по сравнению с узлом 5 или 9, поскольку он находится раньше в списке реплик.

Параметр auto.leader.rebalance.enable со значением true включает автоматическую балансировку лидера (возврат лидерства предпочтительной реплике) в фоновом режиме через регулярные промежутки времени. Параметр может быть изменен в интерфейсе ADCM.

Если для этого параметра установить значение false, необходимо вручную восстановить лидерство в восстановленных репликах после перезагрузки сервера. Для этого необходимо выполнить команду:

$ /usr/lib/kafka/bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

Рекомендации по работе с производителями и потребителями

Настройка получения производителем подтверждения записи сообщения

Параметр request.required.acks определяет, когда запрос на производство считается выполненным. В частности, сколько других брокеров должны зафиксировать данные в своем журнале и подтвердить это лидеру. Возможные значения:

  • 0 — производитель никогда не ждет подтверждения от брокера. Этот вариант обеспечивает наименьшую задержку, но гарантирует наименьшую надежность (некоторые данные будут потеряны при сбое сервера).

  • 1 — производитель получает подтверждение после того, как ведущая реплика получит данные. Это значение по умолчанию.

  • -1 — производитель получает подтверждение после того, как все синхронизированные реплики получили данные. Этот вариант обеспечивает наилучшую устойчивость.

Совместное использование параметров min.insync.replicas, описанного выше, и request.required.acks позволяют обеспечить более высокие гарантии надежности.

Пример установки параметра

Необходимо при записи сообщений в топика использовать опцию --request-required-acks:

$ bin/kafka-console-producer.sh --topic new-topic --request-required-acks -1 --bootstrap-server hostname:9092

Проверка позиции потребителя

При необходимости при помощи скрипта kafka-consumer-groups.sh возможно увидеть позицию потребителей топика.

Пример просмотра позиции потребителей топика

Команда для просмотра информации для группы потребителей:

$ /usr/lib/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group new-group

В результате показана позиция потребителя в группе потребителей с именем new_group, а также то, насколько он отстает от конца журнала топика new-topic.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
new-group       new-topic       2          2               3               1               consumer-new-group-1-af831b5a-9111-4e54-a028-6a433bc71f7e /10.92.18.30    consumer-new-group-1

Назначение топикам групп потребителей

Каждое сообщение отправляется каждой группе потребителей, подписавшейся на топик или партицию, но внутри группы оно отправляется только одному потребителю. Таким образом, все группы потребителей, подписавшиеся на топик, получают сообщения, но только один потребитель в группе потребителей получает сообщение из партиции.

Если необходимо передать сообщение нескольким потребителям, следует назначить им разные группы потребителей.

Пример назначения группы потребителю

Необходимо во время подключения к топику использовать опцию --group:

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic new-topic --from-beginning --group new-group --bootstrap-server hostname:9092

Настройка безопасной среды

Для получения информации о настройке безопасности в среде Kafka можно перейти к разделу Базовые операцииKafkaУправление доступом.

Аппаратное обеспечение

Общие рекомендации при выборе оборудования

  • При возможности использовать для брокеров Kafka отдельные сервера или стойки.

  • Для применения оптимального коэффициента репликации иметь в системе не менее 3 брокеров Kafka.

  • Обеспечить HA (High Availability) в соответствии с требованиями, описанными в статье Аппаратные требования к серверам согласно планируемой нагрузке.

Рекомендации при выборе памяти

  • Kafka при работе может нуждаться в большом обьеме файловой системы для хранения и кеширования сообщений и мало использует heap-пространство.

  • Kafka при работе может нуждаться в достаточной памяти для буферизации активных операций чтения и записи.

Рекомендации при выборе процессоров

  • Включение защиты канала по протоколу SSL может увеличить требования к процессору (точные данные зависят от типа ЦП и реализации JVM).

  • При выборе процессора между более быстрыми процессорами или большим количеством ядер, следует выбрать большее количество ядер. Дополнительный параллелизм, который предлагает несколько ядер, предпочтительнее более высокой тактовой частоты.

Хранение данных

  • Не рекомендуется использовать массив дисков JBOD, так как для многоуровневого хранилища требуется одна точка монтирования.

  • Рекомендуется использовать несколько дисков, чтобы максимизировать пропускную способность.

  • Не рекомендуется использовать одни и те же диски, используемые для данных Kafka, совместно с журналами приложений или другой активностью файловой системы ОС.

  • Если вы настраиваете несколько каталогов данных, брокер помещает новую партицию по пути с наименьшим количеством сохраненных партиций. Каждая партиция будет полностью находиться в одном из каталогов данных. Если данные не сбалансированы между партициями, это может привести к дисбалансу нагрузки между дисками.

  • Массив дисков RAID потенциально может лучше справляться с балансировкой нагрузки между дисками, потому что балансирует нагрузку на более низком уровне.

  • Массив дисков RAID 10 рекомендуется как лучший вариант для большинства случаев использования. Он обеспечивает улучшенную производительность чтения и записи, защиту данных (способность выдерживать сбои диска) и быстрое восстановление.

  • Основным недостатком RAID является то, что он уменьшает доступное дисковое пространство. Другим недостатком является стоимость операций ввода-вывода при восстановлении массива при сбое диска. Стоимость восстановления относится к RAID в целом, с учетом нюансов между различными версиями.

  • Не рекомендуется использовать сетевые хранилища (Network Attached Storage, NAS). NAS часто медленнее, показывает большие задержки с более широким отклонением средней задержки и является единой точкой отказа.

Cеть

Требования к сетевым подключениям приведены в статье Аппаратные требования к сети. Низкая задержка гарантирует, что узлы могут легко обмениваться данными, а высокая пропускная способность помогает перемещать и восстанавливать сегменты. Современные сети центров обработки данных (1 GbE, 10 GbE) достаточны для подавляющего большинства кластеров.

Программное обеспечение

Общие рекомендации

  • Требования к программному обеспечению приведены в статьье Программные требования.

  • Для запуска Kafka рекомендуется использовать файловую систему XFS или ext4.

  • Используйте последнюю выпущенную версию Java, чтобы убедиться, что известные проблемы безопасности устранены.

  • Рекомендуемая настройка GC (проверенная на большом развертывании с JDK 1.8 u5) выглядит следующим образом:

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
  • Приведенные ниже рекомендации могут быть полезны для улучшения совместимости между Kafka и вашей операционной системой. Для лучшего применения рекомендаций необходимо обратиться к документации по конкретному дистрибутиву.

Файловые дескрипторы

Kafka в любой момент потенциально может требовать относительно большого количества доступных файловых дескрипторов.

Многие современные дистрибутивы Linux поставляются только с 1024 файловыми дескрипторами, разрешенными для каждого процесса. Это недостаточно для Kafka.

Необходимо увеличить количество файловых дескрипторов как минимум до 100,000, а возможно, и намного больше.

Для изменения лимита открытых файлов необходимо отредактировать /etc/security/limits.conf — файл, устанавливающий лимиты ресурсов для пользователей, вошедших в систему через PAMPrivileged Access Management (PAM). Для этого требуется для всех групп пользователей увеличить значение параметра nofile — максимальное число открытых файлов для типов hard и soft, добавив в /etc/security/limits.conf строки:

*  hard  nofile  100000
*  soft  nofile  100000

Виртуальная память

Может потребоваться изменить предельный размер виртуальной памяти (vm.max_map_count), позволяющий обрабатывать необходимое количество функций mmap.

Чтобы рассчитать текущее количество mmap, необходимо узнать количество файлов .index в каталоге данных Kafka. Файлы .index представляют собой большинство файлов с отображением памяти.

Подсчитать файлы .index можно при помощи этой команды:

$ find . -name '*index' | wc -l

Установите vm.max_map_count в файле /etc/sysctl.conf для сеанса. Это установит текущее количество отображаемых в память файлов. Минимальное значение лимита mmap (vm.max_map_count) — это количество открытых файлов ulimit.

Вы должны установить vm.max_map_count большим, чем количество файлов .index, чтобы учесть рост сегмента брокера.

$ sysctl -w vm.max_map_count=262144
$ echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
$ sysctl -p
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней