Настройка и использование Kafka Tiered Storage

Обзор

Опция Tiered Storage в ADS может быть включена в конфигурационных параметрах сервиса Kafka одновременно только для одного из двух вариантов хранилищ: HDFS или S3.

Ниже подробно описаны шаги включения опции Tiered Storage.

Hadoop Distributed File System (HDFS)

Для данного примера в качестве хранилища используется сервис HDFS кластера Arenadata Hadoop (ADH).

Предварительные требования

Для включения опции Tiered Storage в Kafka использовано следующее окружение:

$ sudo -u hdfs hdfs dfs -mkdir /kafka
$ sudo -u hdfs hdfs dfs -chown kafka:hadoop /kafka

Шаг 1. Импорт данных ADH

В интерфейсе ADCM откройте страницу Clusters и кликните по имени кластера ADS. Затем на открывшейся странице кластера перейдите на вкладку Import, выберите Cluster configuration напротив названия кластера ADH и нажмите Import.

Импорт данных ADH
Импорт данных ADH

Шаг 2. Настройка и включение Tiered Storage

  1. Откройте вкладку Services на странице кластера и кликните по имени сервиса Kafka в столбце Name.

    Переход к настройке сервиса
    Переход к настройке сервиса
  2. В открывшемся окне Primary configuration:

    • Переведите в активное состояние переключатель HDFS Tiered Storage и при необходимости измените значения параметров, установленные по умолчанию.

    • При необходимости раскройте группу Tiered Storage General и измените значения параметров, установленные по умолчанию.

      Настройка Tiered Storage
      Настройка Tiered Storage

      Описания параметров приведены в разделе Kafka статьи Конфигурационные параметры ADS.

  3. Нажмите Save и перезагрузите сервис Kafka при помощи действия Restart, нажав на иконку actions default dark actions default light в столбце Actions.

  4. Дождитесь завершения перезагрузки сервиса. Проанализируйте и исправьте ошибки в случае их возникновения на странице Jobs.

Шаг 3. Проверка результатов

  • При включении опции Tiered Storage через интерфейс ADCM автоматически устанавливаются все необходимые параметры для работы с хранилищем:

    • В конфигурационном файле брокера Kafka (в директории /etc/kafka/conf/) устанавливаются параметры для компонентов Remote Manager, StorageBackend (определяющиеся автоматически для выбранного вида хранилища) и механизма chunking.

      Ниже приведен пример измененного при помощи ADCM файла конфигурации.

      Конфигурационный файл брокера Kafka server.properties
      # Managed by ADCM
      
      node.id=1
      reserved.broker.max.id=5000
      auto.create.topics.enable=False
      listeners=PLAINTEXT://:9092
      log.dirs=/kafka-logs
      default.replication.factor=1
      num.partitions=1
      delete.topic.enable=true
      log.retention.hours=168
      log.roll.hours=168
      queued.max.requests=500
      num.network.threads=3
      num.io.threads=8
      auto.leader.rebalance.enable=True
      unclean.leader.election.enable=False
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      
      zookeeper.connect=sov-ads-test-4.ru-central1.internal:2181/arenadata/cluster/148
      zookeeper.connection.timeout.ms=30000
      zookeeper.session.timeout.ms=30000
      zookeeper.sync.time.ms=2000
      zookeeper.set.acl=false
      
      log.cleaner.enable=True
      log.cleanup.policy=delete
      log.cleanup.interval.mins=10
      log.cleaner.min.compaction.lag.ms=0
      log.cleaner.delete.retention.ms=86400000
      
      
      security.inter.broker.protocol=PLAINTEXT
      
      
      
      
      remote.log.metadata.manager.listener.name=PLAINTEXT
      
      remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
      remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/*
      remote.log.storage.system.enable=true
      rsm.config.chunk.size=4194304
      rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
      rsm.config.fetch.chunk.cache.path=/var/cache/kafka
      rsm.config.fetch.chunk.cache.prefetch.max.size=8388608
      rsm.config.fetch.chunk.cache.size=1073741824
      rsm.config.fetch.chunk.cache.retention.ms=600000
      rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.hdfs.HdfsStorage
      rsm.config.storage.hdfs.core-site.path=/usr/lib/kafka/config/core-site.xml
      rsm.config.storage.hdfs.hdfs-site.path=/usr/lib/kafka/config/hdfs-site.xml
      rsm.config.storage.hdfs.root=/kafka
      rsm.config.storage.hdfs.upload.buffer.size=8192
    • В конфигурационном файле переменных окружения /etc/kafka/conf/kafka-env.sh устанавливается значение для параметра HADOOP_CONF_DIR — место, куда скопированы файлы конфигурации кластера ADH:

      export HADOOP_CONF_DIR="/usr/lib/kafka/config"
  • При включении опции Tiered Storage в папку /usr/lib/kafka/config/ копируются конфигурационные файлы ADH:

Simple Storage Service (S3)

Для данного примера в качестве сервера S3 используется хранилище MINIO.

Предварительные требования

Для включения опции Tiered Storage в Kafka использовано следующее окружение:

  • Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.

  • Сервисы Kafka и ZooKeeper установлены в кластере ADS.

  • В облачном хранилище MINIO cоздан бакет для хранения данных.

ПРИМЕЧАНИЕ

В реализации Tiered Storage на основе сервера S3 перенос записей на remote-уровень контролируется при помощи специального топика __remote_log_metadata, который по умолчанию имеет значение фактора репликации 3. Это требует не менее трех брокеров Kafka в кластере для работы топика. Если количество брокеров меньше или необходимо по другой причине изменить фактор репликации для топика __remote_log_metadata, это можно выполнить на этапе настройки Kafka, указав новый параметр в группе server.properties:

rlmm.config.remote.log.metadata.topic.replication.factor=1

Шаг 1. Настройка и включение Tiered Storage

  1. Откройте вкладку Services на странице кластера и кликните по имени сервиса Kafka в столбце Name.

    Переход к настройке сервиса
    Переход к настройке сервиса
  2. В открывшемся окне Primary configuration:

    • Для серверов S3, у которых ссылка на бакет задается не в формате FQDN, а в виде пути (например, для MINIO ссылка на бакет выглядит так: http://<s3hostname:port>/browser/<bucket.name>), установите параметр удаленного хранилища, который определяет соответствующий вид ссылки к бакету:

      rsm.config.storage.s3.path.style.access.enabled=true

      Для этого раскройте группу server.properties и, используя поле Add key,value, выберите Add property и введите наименование параметра и его значение.

    • Переведите в активное состояние переключатель S3 Tiered Storage и введите значения параметров для подключения к хранилищу S3.

      ПРИМЕЧАНИЕ

      Значение параметра storage.s3.region не может быть пустым. Если на сервере S3 не предусмотрен такой параметр, установите для него любое значение, например, none.

    • При необходимости раскройте группу Tiered Storage General и измените значения параметров, установленные по умолчанию.

      Настройка Tiered Storage
      Настройка Tiered Storage

      Описания параметров приведены в разделе Kafka статьи Конфигурационные параметры ADS.

  3. Нажмите Save и перезагрузите сервис Kafka при помощи действия Restart, нажав на иконку actions default dark actions default light в столбце Actions.

  4. Дождитесь завершения перезагрузки сервиса. Проанализируйте и исправьте ошибки в случае их возникновения на странице Jobs.

Шаг 2. Проверка результатов

При включении опции Tiered Storage через интерфейс ADCM в конфигурационном файле брокера Kafka (в директории /etc/kafka/conf/) автоматически устанавливаются все необходимые параметры для работы с хранилищем S3: параметры компонентов Remote Manager, StorageBackend (определяющиеся автоматически для выбранного вида хранилища) и механизма chunking.

Ниже приведен пример измененного при помощи ADCM файла конфигурации.

Конфигурационный файл брокера Kafka server.properties
# Managed by ADCM

node.id=1
reserved.broker.max.id=5000
auto.create.topics.enable=False
listeners=PLAINTEXT://:9092
log.dirs=/kafka-logs
default.replication.factor=1
num.partitions=1
delete.topic.enable=true
log.retention.hours=168
log.roll.hours=168
queued.max.requests=500
num.network.threads=3
num.io.threads=8
auto.leader.rebalance.enable=True
unclean.leader.election.enable=False
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connect=sov-ads-test-2.ru-central1.internal:2181/arenadata/cluster/154
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
zookeeper.set.acl=false

log.cleaner.enable=True
log.cleanup.policy=delete
log.cleanup.interval.mins=10
log.cleaner.min.compaction.lag.ms=0
log.cleaner.delete.retention.ms=86400000

rlmm.config.remote.log.metadata.topic.replication.factor=1
rsm.config.storage.s3.path.style.access.enabled=true

security.inter.broker.protocol=PLAINTEXT

remote.log.metadata.manager.listener.name=PLAINTEXT

remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/*
remote.log.storage.system.enable=true
rsm.config.chunk.size=4194304
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/var/cache/kafka
rsm.config.fetch.chunk.cache.prefetch.max.size=8388608
rsm.config.fetch.chunk.cache.size=1073741824
rsm.config.fetch.chunk.cache.retention.ms=600000
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.endpoint.url=http://<s3hostname:port>
rsm.config.storage.s3.bucket.name=kafka
rsm.config.storage.aws.access.key.id=<access key>
rsm.config.storage.aws.secret.access.key=<secret key>
rsm.config.storage.s3.region=none

Использование Tiered Storage

При создании топика, сегменты которого должны быть перемещены в хранилище, необходимо включить для данного топика опцию Tiered Storage при помощи параметра remote.storage.enable.

Если для топика не указан параметр local.retention.ms, время локального хранения данных будет соответствовать времени, указанному для брокера.

Ниже приведен пример создания топика с указанием параметров, а также с указанием времени общего хранения и размера сегмента:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server ads-test-1.ru-central1.internal:9092 \
--config remote.storage.enable=true \
--config local.retention.ms=1000 \
--config retention.ms=300000 \
--config segment.bytes=200

После записи сообщений, если время локального хранения истекло и сегмент достиг указанного размера, файл появляется в директории хранилища. Данные сообщения могут быть прочитаны как обычно.

Ниже на рисунке показана директория с сегментами топика на remote-уровне в бакете хранилища MINIO.

Директория с сообщениями топика на remote-уровне
Директория с сообщениями топика на remote-уровне
Директория с сообщениями топика на remote-уровне
Директория с сообщениями топика на remote-уровне

Созданная в хранилище директория tieredTopic-BVqanvXiTsKvgy5XY371vQ имеет имя, состоящее из названия топика tieredTopic, за которым следует тире и идентификатор топика. Эта директория в свою очередь содержит папки, соответствующие номерам партиций. Папки партиций содержат по три файла для каждого сохраненного на remote-уровне сегмента:

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.indexes — файл, содержащий индексы Kafka, которые связаны с каждым сегментом журнала.

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.log — файл, являющийся сегментом журнала, содержит записи Kafka.

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.rsm-manifest — файл, содержащий метаданные о сегменте журнала и индексах.

Такие файлы имеют название, состоящее из номера смещения и идентификатора сегмента журнала.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней