Конфигурирование Streams

Далее приведены конфигурации для клиентской библиотеки ADS Streams.

application.id – Идентификатор приложения потоковой обработки. Должен быть уникальным в пределах платформы ADS. Используется как: 1) префикс идентификатора клиента по умолчанию, 2) идентификатор группы для управления членством, 3) префикс топика изменений в журнале

  • TYPE – string
  • IMPORTANCE – high

bootstrap.servers – Список пар хост/порт, используемых для установления первоначального подключения к платформе ADS. В дальнейшем клиент будет использовать все сервера, независимо от того, какие указаны в данном параметре – этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Параметр должен быть задан в формате “host1:port1, host2:port2,…” (через запятую и без пробелов). Поскольку данные сервера используются только для первоначального подключения с целью обнаружения полного набора в кластере (который может динамически меняться), списку необязательно содержать полный набор серверов (можно указать более одного, на случай отказа первого)

  • TYPE – list
  • IMPORTANCE – high

replication.factor – Коэффициент репликации для топиков журнала изменений и репартицирования топиков, созданных приложением потоковой обработки

  • TYPE – int
  • DEFAULT – 1
  • IMPORTANCE – high

state.dir – Местоположение каталога хранилища state store

  • TYPE – string
  • DEFAULT – /tmp/kafka-streams
  • IMPORTANCE – high

cache.max.bytes.buffering – Максимальный объем памяти в байтах, используемый для буферизации всех потоков

  • TYPE – long
  • DEFAULT – 10485760
  • VALID VALUES – [0,…]
  • IMPORTANCE – medium

client.id – Строка префикса ID, используемая для идентификаторов клиента внутреннего потребителя и поставщика (восстановления потребителя); шаблон “-StreamThread–”

  • TYPE – string
  • DEFAULT – “”
  • IMPORTANCE – medium

default.deserialization.exception.handler – Класс обработки исключений, реализующий интерфейс org.apache.kafka.streams.errors.DeserializationExceptionHandler

  • TYPE – class
  • DEFAULT – org.apache.kafka.streams.errors.LogAndFailExceptionHandler
  • IMPORTANCE – medium

default.key.serde – Класс сериализатора/десериализатора по умолчанию для ключа, реализующего интерфейс org.apache.kafka.common.serialization.Serde

  • TYPE – class
  • DEFAULT – org.apache.kafka.common.serialization.Serdes$ByteArraySerde
  • IMPORTANCE – medium

default.production.exception.handler – Класс обработки исключений, реализующий интерфейс org.apache.kafka.streams.errors.ProductionExceptionHandler

  • TYPE – class
  • DEFAULT – org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
  • IMPORTANCE – medium

default.timestamp.extractor – Класс выделения временных меток по умолчанию, реализующий интерфейс org.apache.kafka.streams.processor.TimestampExtractor

  • TYPE – class
  • DEFAULT – org.apache.kafka.streams.processor.FailOnInvalidTimestamp
  • IMPORTANCE – medium

default.value.serde – Класс сериализатора/десериализатора по умолчанию для значения, реализующего интерфейс org.apache.kafka.common.serialization.Serde

  • TYPE – class
  • DEFAULT – org.apache.kafka.common.serialization.Serdes$ByteArraySerde
  • IMPORTANCE – medium

num.standby.replicas – Число резервных реплик для каждой задачи

  • TYPE – int
  • DEFAULT – 0
  • IMPORTANCE – medium

num.stream.threads – Количество потоков для выполнения потоковой обработки

  • TYPE – int
  • DEFAULT – 1
  • IMPORTANCE – medium

processing.guarantee – Гарантия на обработку. Возможные значения: “at_least_once” (по умолчанию) и “exact_once”. Для обработки “exact_once” требуется по меньшей мере три брокера по умолчанию, что является рекомендуемой настройкой для продуктивной среды; для разработки можно изменить параметр, при этом переустановив настройку брокера transaction.state.log.replication.factor

  • TYPE – string
  • DEFAULT – at_least_once
  • VALID VALUES – [at_least_once, exactly_once]
  • IMPORTANCE – medium

security.protocol – Протокол безопасности для связи между брокерами. Допустимые значения: “PLAINTEXT”, “SSL”, “SASL_PLAINTEXT”, “SASL_SSL”

  • TYPE – string
  • DEFAULT – PLAINTEXT
  • IMPORTANCE – medium

application.server – Пара “host:port”, указывающая на встроенную конечную точку пользователя, которая может использоваться для обнаружения местоположений хранилищ state stores в рамках одного приложения

  • TYPE – string
  • DEFAULT – “”
  • IMPORTANCE – low

buffered.records.per.partition – Максимальное количество записей в буфере для каждой партиции

  • TYPE – int
  • DEFAULT – 1000
  • IMPORTANCE – low

commit.interval.ms – Частота сохранения положения процессора. Если параметр processing.guarantee установлен на “exactly_once”, значение по умолчанию равно “100”, иначе (при “at_least_once”) значение по умолчанию равно “30000”. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 30000
  • IMPORTANCE – low

connections.max.idle.ms – Закрытие бездействующих соединений по истечению заданного периода. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 540000
  • IMPORTANCE – low

key.serde – Сериализатор/десериализатор для ключа, реализующего интерфейс org.apache.kafka.common.serialization.Serde. Данная конфигурация устарела, вместо нее используется default.key.serde

  • TYPE – class
  • DEFAULT – null
  • IMPORTANCE – low

metadata.max.age.ms – Период времени, после которого принудительно обновляются метаданные даже при отсутствии видимых изменений в лидере партиции с целью предварительного обнаружения новых брокеров или партиций. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 300000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

metric.reporters – Список классов для использования в качестве репортеров метрик. Реализация интерфейса org.apache.kafka.common.metrics.MetricsReporter позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен в реестр статистических данных JMX

  • TYPE – list
  • DEFAULT – “”
  • IMPORTANCE – low

metrics.num.samples – Количество выборок, поддерживаемых для вычисления метрик

  • TYPE – int
  • DEFAULT – 2
  • VALID VALUES – [1,…]
  • IMPORTANCE – low

metrics.recording.level – Самый высокий уровень записи для метрик

  • TYPE – string
  • DEFAULT – INFO
  • VALID VALUES – [INFO, DEBUG]
  • IMPORTANCE – low

metrics.sample.window.ms – Время ожидания вычисления метрик выборки. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 30000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

partition.grouper – Класс Partition grouper, реализующий интерфейс org.apache.kafka.streams.processor.PartitionGrouper

  • TYPE – class
  • DEFAULT – org.apache.kafka.streams.processor.DefaultPartitionGrouper
  • IMPORTANCE – low

poll.ms – Время блокировки ожидания ввода. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 100
  • IMPORTANCE – low

receive.buffer.bytes – Размер буфера приема TCP (SO_RCVBUF) при чтении данных. Если значение равно “-1”, используется ОС по умолчанию

  • TYPE – int
  • DEFAULT – 32768
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

reconnect.backoff.max.ms – Максимальный период времени ожидания повторного подключения к брокеру при неоднократных сбоях соединения. Отсрочка на хост увеличивается экспоненциально для каждого последующего сбоя соединения, вплоть до установленного максимума. После расчета увеличения отсрочки к значению добавляется 20% случайного джиттера во избежание помех связи. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 1000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

reconnect.backoff.ms – Базовый период времени ожидания повторного подключения к хосту. Позволяет избегать многократного подключения к узлу в узком цикле. Данная отсрочка применяется ко всем попыткам подключения клиента к брокеру. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 50
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

request.timeout.ms – Максимальное время ожидания клиентом ответа на запрос. Если ответ не получен до истечения установленного значения, клиент повторно отправляет запрос при необходимости. Указывается в миллисекундах

  • TYPE – int
  • DEFAULT – 40000
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

retries – Установка значения больше нуля приводит к тому, что клиент переотправляет любую запись, передача которой завершается с временной ошибкой

  • TYPE – int
  • DEFAULT – 0
  • VALID VALUES – [0,…,2147483647]
  • IMPORTANCE – low

retry.backoff.ms – Время ожидания перед повторной попыткой отправки неудавшегося запроса в партицию топика. Позволяет избежать неоднократной отправки запросов в сжатом цикле. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 100
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

rocksdb.config.setter – Класс или имя класса установщика конфигурации базы данных Rocks, реализующий интерфейс org.apache.kafka.streams.state.RocksDBConfigSetter

  • TYPE – class
  • DEFAULT – null
  • IMPORTANCE – low

send.buffer.bytes – Размер буфера отправки TCP (SO_SNDBUF) при отправке данных. Если значение равно “-1”, используется ОС по умолчанию

  • TYPE – int
  • DEFAULT – 131072
  • VALID VALUES – [0,…]
  • IMPORTANCE – low

state.cleanup.delay.ms – Время ожидания перед удалением state каталогов после перемещения партиции. Удаляются только state каталоги, которые не были изменены. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 600000
  • IMPORTANCE – low

timestamp.extractor – Класс выделения временных меток, реализующий интерфейс org.apache.kafka.streams.processor.TimestampExtractor. Данная конфигурация устарела, вместо нее используется default.timestamp.extractor

  • TYPE – class
  • DEFAULT – null
  • IMPORTANCE – low

value.serde – Класс сериализатора/десериализатора для значения, реализующего интерфейс org.apache.kafka.common.serialization.Serde. Данная конфигурация устарела, вместо нее используется ddefault.value.serde

  • TYPE – class
  • DEFAULT – null
  • IMPORTANCE – low

windowstore.changelog.additional.retention.ms – Добавление maintainMs с целью исключения риска преждевременного удаления данных из журнала. Позволяет осуществлять отставание часов. По умолчанию устанавливается 1 день. Указывается в миллисекундах

  • TYPE – long
  • DEFAULT – 86400000
  • IMPORTANCE – low

zookeeper.connect – Соединение строки для управления топиками ADS через Zookeeper. Конфигурация устарела и игнорируется, поскольку Streams API больше не использует Zookeeper

  • TYPE – string
  • DEFAULT – “”
  • IMPORTANCE – low