Конфигурирование Producer

Далее приведены конфигурации Java-поставщика.

key.serializer – Класс Serializer для ключа, реализующего интерфейс org.apache.kafka.common.serialization.Serializer

  • TYPE – class
  • IMPORTANCE – high

value.serializer – Класс Serializer для значения, реализующего интерфейс org.apache.kafka.common.serialization.Serializer

  • TYPE – class
  • IMPORTANCE – high

acks – Количество подтверждений, которые поставщик требует от лидера перед рассмотрением запроса. Параметр контролирует устойчивость отправляемых записей. Возможны следующие настройки:

  • acks=0 – поставщик не ждет подтверждения с сервера. Запись немедленно добавляется в буфер сокета и считается отправленной. Данная настройка не гарантирует получение сервером записи, и конфигурация повторных попыток не вступает в силу (так как клиент обычно не знает о каких-либо сбоях). Смещение, возвращаемое для каждой записи, всегда равно “-1”.
  • acks=1 – лидер фиксирует запись в свой локальный журнал, но отвечает, не дожидаясь полного подтверждения от всех подписчиков. В этом случае лидер может выйти из строя сразу после подтверждения записи и до того, как подписчики реплицируют ее, тогда запись теряется
  • acks=all – лидер ожидает полного набора синхронизированных реплик для подтверждения записи. Данная настройка гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается в строе. Это наивысшая гарантия. Эквивалентно настройке acks=-1
  • TYPE – string
  • DEFAULT – 1
  • VALID VALUES – [all, - 1, 0, 1]
  • IMPORTANCE – high

bootstrap.servers – Список пар хост/порт, используемых для установления первоначального подключения к платформе ADS. В дальнейшем клиент будет использовать все сервера, независимо от того, какие указаны в данном параметре – этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Параметр должен быть задан в формате “host1:port1, host2:port2,…” (через запятую и без пробелов). Поскольку данные сервера используются только для первоначального подключения с целью обнаружения полного набора в кластере (который может динамически меняться), списку необязательно содержать полный набор серверов (можно указать более одного, на случай отказа первого)

buffer.memory – Общий объем памяти в байтах, которую поставщик может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, поставщик блокирует параметр max.block.ms, после чего будет сделано исключение. Параметр должен соответствовать примерно общему объему памяти, которая используется поставщиком, но не полному объему, так как не вся память используется для буферизации. Некоторый дополнительный объем используется для сжатия (если оно включено), а также для поддержания запросов на лету

  • TYPE – long
  • DEFAULT – 33554432
  • VALID VALUES – [0,…]
  • IMPORTANCE – high

compression.type – Тип сжатия для всех данных, созданных поставщиком. По умолчанию используется значение “none” (без сжатия). Допустимые значения: “none”, “gzip”, “snappy” и “lz4”. Сжатие выполняется над полной партией данных, поэтому эффективность дозирования влияет на коэффициент сжатия (более многочисленное порционирование означает лучшее сжатие)

  • TYPE – string
  • DEFAULT – none
  • IMPORTANCE – high

retries – Установка значения больше нуля приводит к тому, что клиент переотправляет любую запись, передача которой завершается с временной ошибкой. Повторная попытка ничем не отличается от повторной отправки записи клиентом при получении ошибки. Повторная отправка данных без установки параметра max.in.flight.requests.per.connection в значение “1” потенциально может изменить порядок записей, так как если две партии данных отправляются в одну партицию, при этом первая партия не выполняется и повторно отправляется, а вторая выполняется успешно, то данные второго пакета появляются в партиции первыми

  • TYPE – int
  • DEFAULT – 0
  • VALID VALUES – [0,…,2147483647]
  • IMPORTANCE – high

ssl.key.password – Пароль закрытого ключа в файле хранилища ключей. Необязательный параметр для клиента

  • TYPE – password
  • DEFAULT – null
  • IMPORTANCE – high

ssl.keystore.location – Расположение файла хранилища ключей. Необязательный параметр для клиента, может использоваться для двусторонней аутентификации клиента

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – high

ssl.keystore.password – Пароль хранилища для файла хранения ключей. Необязательный параметр для клиента, требуется только при настройке ssl.keystore.location

  • TYPE – password
  • DEFAULT – null
  • IMPORTANCE – high

ssl.truststore.location – Расположение файла хранилища trust store

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – high

ssl.truststore.password – Пароль для файла хранилища trust store. При неустановленном пароле доступ к хранилищу есть, но осуществляется с отключенной проверкой надежности

  • TYPE – password
  • DEFAULT – null
  • IMPORTANCE – high

batch.size – При отправке нескольких записей в одну и ту же партицию поставщик пытается объединить их. Это помогает производительности как на клиенте, так и на сервере. Конфигурация управляет размером пакета в байтах. Пакетирование большего размера, чем задан в параметре, не осуществляется. В таком случае отправленные брокерам запросы содержат несколько пакетов (по одному для каждой партиции) с доступными для отправки данными. Небольшой размер пакета делает его менее востребованным и может снизить пропускную способность (нулевой размер пакета полностью отключает пакетирование). Очень большой размер пакета может использовать память расточительно, так как всегда выделяется буфер указанного размера пакета в ожидании дополнительных записей

  • TYPE – int
  • DEFAULT – 16384
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

client.id – Строка id для передачи на сервер при выполнении запросов. Целью является возможность отслеживания источника запросов за пределами ip/port, позволяя включать логическое имя приложения в журнал запросов на стороне сервера

  • TYPE – string
  • DEFAULT – “”
  • IMPORTANCE – medium

connections.max.idle.ms – Закрытие бездействующих соединений по истечению заданного периода. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 540000
  • IMPORTANCE – medium

linger.ms – Поставщик объединяет в один пакет все записи, поступающие между трансмиссиями запросов. Обычно это происходит, когда данные поступают быстрее, чем могут быть отправлены. Однако клиент может уменьшить количество запросов даже при умеренной загрузке. Это реализуется путем добавления небольшого промежутка времени искусственной задержки, то есть вместо немедленной отправки данных поставщик ждет до указанной отметки с целью пакетирования данных. Это можно рассматривать как аналог алгоритма Nagle в TCP. Параметр дает верхнюю границу задержки по времени для пакетной обработки. Но как только достигается установленный размер пакета данных batch.size для партиции, пакет немедленно отправляется (независимо от заданного параметра linger.ms). Однако, имея меньший объем байт пакета, чем в указаном параметре batch.size, осуществляется задержка в течение времени, заданного linger.ms, с целью ожидания появления новых данных. По умолчанию параметр linger.ms равен “0” (то есть без задержки). Например, установка “linger.ms=5” приведет к уменьшению количества отправленных запросов, но добавит до 5 мс задержки для данных, отправленных при отсутствии нагрузки. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 0
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

max.block.ms – Время блокировки ADSProducer.send() и ADSProducer.partitionsFor(). Данные методы могут быть заблокированы либо по причине заполненного буфера, либо из-за недоступности метаданных. Блокировка в предоставленных пользователем сериализаторах или разделителе не учитывается по таймауту данного параметра. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 60000
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

max.request.size – Максимальный размер запроса в байтах. Параметр ограничивает количество пакетов данных, которые поставщик отправляет в одном запросе во избежание отправки огромных запросов. Параметр также эффективно ограничивает максимальный размер пакета данных. При этом сервер имеет свой собственный предел размера пакета данных, который может отличаться от указанного

  • TYPE – int
  • DEFAULT – 1048576
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

partitioner.class – Класс Partitioner, реализующий интерфейс org.apache.kafka.clients.producer.Partitioner

  • TYPE – class
  • DEFAULT – org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • IMPORTANCE – medium

receive.buffer.bytes – Размер буфера приема TCP (SO_RCVBUF) при чтении данных. Если значение равно “-1”, используется ОС по умолчанию

  • TYPE – int
  • DEFAULT – 32768
  • VALID VALUES – [-1,…]
  • IMPORTANCE – medium

request.timeout.ms – Максимальное время ожидания клиентом ответа на запрос. Если ответ не получен до истечения установленного значения, клиент повторно отправляет запрос при необходимости. Значение параметра должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера), с целью сокращения возможного дублирования данных по причине излишних попыток поставщика. Указывается в миллисекундах

  • TYPE – int
  • DEFAULT – 30000
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

sasl.jaas.config – Параметры контекста входа JAAS для соединений SSL в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан по ссылке. Формат значения: “(=)*;”

  • TYPE – password
  • DEFAULT – null
  • IMPORTANCE – medium

sasl.kerberos.service.name – Имя принципала Kerberos, которое запускает ADS. Значение можно определить в конфигурации ADS JAAS либо в конфигурации ADS

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – medium

sasl.mechanism – Механизм SASL для клиентских подключений. Может быть любой механизм, для которого обеспечивается безопасность. По умолчанию используется GSSAPI

  • TYPE – string
  • DEFAULT – GSSAPI
  • IMPORTANCE – medium

security.protocol – Протокол безопасности для связи между брокерами. Допустимые значения: “PLAINTEXT”, “SSL”, “SASL_PLAINTEXT”, “SASL_SSL”

  • TYPE – string
  • DEFAULT – PLAINTEXT
  • IMPORTANCE – medium

send.buffer.bytes – Размер буфера отправки TCP (SO_SNDBUF) при отправке данных. Если значение равно “-1”, используется ОС по умолчанию

  • TYPE – int
  • DEFAULT – 131072
  • VALID VALUES – [-1,…]
  • IMPORTANCE – medium

ssl.enabled.protocols – Список протоколов, включенных для соединений SSL

  • TYPE – list
  • DEFAULT – TLSv1.2,TLSv1.1,TLSv1
  • IMPORTANCE – medium

ssl.keystore.type – Формат файла хранилища ключей. Необязательный параметр для клиента

  • TYPE – string
  • DEFAULT – JKS
  • IMPORTANCE – medium

ssl.protocol – Протокол SSL для генерации SSLContext. Значение по умолчанию – “TLS”, что подходит для большинства случаев. Допустимыми значениями в последних JVM являются “TLS”, “TLSv1.1” и “TLSv1.2”. Протоколы “SSL”, “SSLv2” и “SSLv3” могут поддерживаться в более старых JVM, но их использование не рекомендуется из-за известных уязвимостей безопасности

  • TYPE – string
  • DEFAULT – TLS
  • IMPORTANCE – medium

ssl.provider – Имя поставщика безопасности для соединений SSL. Значением по умолчанию является поставщик безопасности по умолчанию для JVM

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – medium

ssl.truststore.type – Формат файла хранилища trust store

  • TYPE – string
  • DEFAULT – JKS
  • IMPORTANCE – medium

enable.idempotence – При установленном значении “true” поставщик гарантирует, что ровно одна копия каждого сообщения записывается в поток. При значении “false” в поток могут быть записаны дубликаты сообщений при повторных попытках отправки данных поставщиком из-за сбоев брокера или по другим причинам. Данный параметр требует, чтобы свойство max.in.flight.requests.per.connection было меньше или равно “5”, повторные попытки более “0”, и acks установлены на “all”. Если перечисленные настройки явно не заданы пользователем, выбираются подходящие значения. При установке несовместимых значений, выдается ConfigException

  • TYPE – boolean
  • DEFAULT – false
  • IMPORTANCE – low

interceptor.classes – Список классов для использования в качестве интерсепторов. Реализация интерфейса org.apache.kafka.clients.producer.ProducerInterceptor позволяет перехватывать (и, возможно, видоизменять) записи, полученные поставщиком до их публикации в кластере ADS. По умолчанию интерсепторы не установлены

max.in.flight.requests.per.connection – Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если параметр имеет значение больше 1, то в случае сбоев существует риск переупорядочения данных из-за повторных попыток (если они включены)

  • TYPE – int
  • DEFAULT – 5
  • VALID VALUES – [1,…]
  • IMPORTANCE – low

metadata.max.age.ms – Период времени, после которого принудительно обновляются метаданные даже при отсутствии видимых изменений в лидере партиции с целью предварительного обнаружения новых брокеров или партиций. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 300000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

metric.reporters – Список классов для использования в качестве репортеров метрик. Реализация интерфейса org.apache.kafka.common.metrics.MetricsReporter позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен в реестр статистических данных JMX

metrics.num.samples – Количество выборок, поддерживаемых для вычисления метрик

  • TYPE – int
  • DEFAULT – 2
  • VALID VALUES – [1,…]
  • IMPORTANCE – low

metrics.recording.level – Самый высокий уровень записи для метрик

  • TYPE – string
  • DEFAULT – INFO
  • VALID VALUES – [INFO, DEBUG]
  • IMPORTANCE – low

metrics.sample.window.ms – Время ожидания вычисления метрик выборки. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 30000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

reconnect.backoff.max.ms – Максимальный период времени ожидания повторного подключения к брокеру при неоднократных сбоях соединения. Отсрочка на хост увеличивается экспоненциально для каждого последующего сбоя соединения, вплоть до установленного максимума. После расчета увеличения отсрочки к значению добавляется 20% случайного джиттера во избежание помех связи. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 1000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

reconnect.backoff.ms – Базовый период времени ожидания повторного подключения к хосту. Позволяет избегать многократного подключения к узлу в узком цикле. Данная отсрочка применяется ко всем попыткам подключения клиента к брокеру. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 50
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

retry.backoff.ms – Время ожидания перед повторной попыткой отправки неудавшегося запроса в партицию топика. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 100
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

sasl.kerberos.kinit.cmd – Путь команд Kerberos kinit

  • TYPE – string
  • DEFAULT – /usr/bin/kinit
  • IMPORTANCE – low

sasl.kerberos.min.time.before.relogin – Время ожидания авторизации потока между попытками обновления

  • TYPE – long
  • DEFAULT – 60000
  • IMPORTANCE – low

sasl.kerberos.ticket.renew.jitter – Процент случайного джиттера по отношению к времени возобновления

  • TYPE – double
  • DEFAULT – 0.05
  • IMPORTANCE – low

sasl.kerberos.ticket.renew.window.factor – Время ожидания авторизации потока до тех пор, пока не будет достигнут указанный коэффициент времени от последнего обновления до истечения срока действия тикета, и попытка возобновления тикета за этот период времени

  • TYPE – double
  • DEFAULT – 0.8
  • IMPORTANCE – low

ssl.cipher.suites – Список наборов шифров. Именованная комбинация аутентификации, шифрования, MAC и ключей обмена алгоритма для согласования параметров безопасности для сетевого подключения с использованием протокола TLS или SSL. По умолчанию поддерживаются все доступные варианты шифрования

  • TYPE – list
  • DEFAULT – null
  • IMPORTANCE – low

ssl.endpoint.identification.algorithm – Алгоритм идентификации конечных точек для валидации имени хоста сервера с использованием сертификата сервера

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – low

ssl.keymanager.algorithm – Алгоритм службы управления ключами для SSL-соединений. Значением по умолчанию является алгоритм, настроенный для Java Virtual Machine

  • TYPE – string
  • DEFAULT – SunX509
  • IMPORTANCE – low

ssl.secure.random.implementation – Реализация SecureRandom PRNG, используемая для операций шифрования SSL

  • TYPE – string
  • DEFAULT – null
  • IMPORTANCE – low

ssl.trustmanager.algorithm – Алгоритм доверенной службы управления ключами для SSL-соединений. Значением по умолчанию является алгоритм, настроенный для Java Virtual Machine

  • TYPE – string
  • DEFAULT – PKIX
  • IMPORTANCE – low

transaction.timeout.ms – Максимальный интервал времени, который координатор транзакции ожидает для обновления статуса транзакции от поставщика перед тем, как будет прервана текущая транзакция. Если установленное значение больше, чем значение transaction.max.timeout.ms в настройках брокера, запрос завершается ошибкой InvalidTransactionTimeout

  • TYPE – int
  • DEFAULT – 60000
  • IMPORTANCE – low

transactional.id – Идентификатор транзакции. Параметр позволяет использовать семантику достоверности, которая охватывает несколько сессий поставщика, и позволяет гарантировать клиенту, что транзакции, использующие тот же TransactionalId, завершены до начала любых новых транзакций. Если TransactionalId не указан, то поставщик ограничивается идемпотентной доставкой. Важно, что параметр enable.idempotence должен быть включен при сконфигурированном TransactionalId. Значение по умолчанию “null”, что означает невозможность использования транзакций. Для транзакций требуется по меньшей мере три брокера по умолчанию, что является рекомендуемым параметром для продуктивной системы; для разработки можно изменить настройки в параметре брокера transaction.state.log.replication.factor

  • TYPE – string
  • DEFAULT – null
  • VALID VALUES – non-empty string
  • IMPORTANCE – low