Настройка и использование Kafka Tiered Storage
Обзор
Опция Tiered Storage в ADS может быть включена в конфигурационных параметрах сервиса Kafka одновременно только для одного из двух вариантов хранилищ: HDFS или S3.
Ниже подробно описаны шаги включения опции Tiered Storage.
Hadoop Distributed File System (HDFS)
Для данного примера в качестве хранилища используется сервис HDFS кластера Arenadata Hadoop (ADH).
Предварительные требования
Для включения опции Tiered Storage в Kafka использовано следующее окружение:
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.
-
Сервисы Kafka и ZooKeeper установлены в кластере ADS.
-
Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.3.6.2.b1.
-
Сервисы HDFS, Core configuration и ZooKeeper установлены в кластере ADH.
-
В HDFS необходимо создать директорию, путь и название которой совпадает с указываемым в качестве конфигурационного параметра storage.hdfs.root, и настроить права доступа:
$ sudo -u hdfs hdfs dfs -mkdir /kafka
$ sudo -u hdfs hdfs dfs -chown kafka:hadoop /kafka
Шаг 1. Импорт данных ADH
В интерфейсе ADCM откройте страницу Clusters и кликните по имени кластера ADS. Затем на открывшейся странице кластера перейдите на вкладку Import, выберите Cluster configuration напротив названия кластера ADH и нажмите Import.

Шаг 2. Настройка и включение Tiered Storage
-
Откройте вкладку Services на странице кластера и кликните по имени сервиса Kafka в столбце Name.
Переход к настройке сервиса -
В открывшемся окне Primary configuration:
-
Переведите в активное состояние переключатель HDFS Tiered Storage и при необходимости измените значения параметров, установленные по умолчанию.
-
При необходимости раскройте группу Tiered Storage General и измените значения параметров, установленные по умолчанию.
Настройка Tiered StorageОписания параметров приведены в разделе Kafka статьи Конфигурационные параметры ADS.
-
-
Нажмите Save и перезагрузите сервис Kafka при помощи действия Restart, нажав на иконку
в столбце Actions.
-
Дождитесь завершения перезагрузки сервиса. Проанализируйте и исправьте ошибки в случае их возникновения на странице Jobs.
Шаг 3. Проверка результатов
-
При включении опции Tiered Storage через интерфейс ADCM автоматически устанавливаются все необходимые параметры для работы с хранилищем:
-
В конфигурационном файле брокера Kafka (в директории /etc/kafka/conf/) устанавливаются параметры для компонентов Remote Manager, StorageBackend (определяющиеся автоматически для выбранного вида хранилища) и механизма chunking.
Ниже приведен пример измененного при помощи ADCM файла конфигурации.
Конфигурационный файл брокера Kafka server.properties# Managed by ADCM node.id=1 reserved.broker.max.id=5000 auto.create.topics.enable=False listeners=PLAINTEXT://:9092 log.dirs=/kafka-logs default.replication.factor=1 num.partitions=1 delete.topic.enable=true log.retention.hours=168 log.roll.hours=168 queued.max.requests=500 num.network.threads=3 num.io.threads=8 auto.leader.rebalance.enable=True unclean.leader.election.enable=False offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 zookeeper.connect=sov-ads-test-4.ru-central1.internal:2181/arenadata/cluster/148 zookeeper.connection.timeout.ms=30000 zookeeper.session.timeout.ms=30000 zookeeper.sync.time.ms=2000 zookeeper.set.acl=false log.cleaner.enable=True log.cleanup.policy=delete log.cleanup.interval.mins=10 log.cleaner.min.compaction.lag.ms=0 log.cleaner.delete.retention.ms=86400000 security.inter.broker.protocol=PLAINTEXT remote.log.metadata.manager.listener.name=PLAINTEXT remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/* remote.log.storage.system.enable=true rsm.config.chunk.size=4194304 rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache rsm.config.fetch.chunk.cache.path=/var/cache/kafka rsm.config.fetch.chunk.cache.prefetch.max.size=8388608 rsm.config.fetch.chunk.cache.size=1073741824 rsm.config.fetch.chunk.cache.retention.ms=600000 rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.hdfs.HdfsStorage rsm.config.storage.hdfs.core-site.path=/usr/lib/kafka/config/core-site.xml rsm.config.storage.hdfs.hdfs-site.path=/usr/lib/kafka/config/hdfs-site.xml rsm.config.storage.hdfs.root=/kafka rsm.config.storage.hdfs.upload.buffer.size=8192
-
В конфигурационном файле переменных окружения /etc/kafka/conf/kafka-env.sh устанавливается значение для параметра
HADOOP_CONF_DIR
— место, куда скопированы файлы конфигурации кластера ADH:export HADOOP_CONF_DIR="/usr/lib/kafka/config"
-
-
При включении опции Tiered Storage в папку /usr/lib/kafka/config/ копируются конфигурационные файлы ADH:
Simple Storage Service (S3)
Для данного примера в качестве сервера S3 используется хранилище MINIO.
Предварительные требования
Для включения опции Tiered Storage в Kafka использовано следующее окружение:
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.
-
Сервисы Kafka и ZooKeeper установлены в кластере ADS.
-
В облачном хранилище MINIO cоздан бакет для хранения данных.
ПРИМЕЧАНИЕ
В реализации Tiered Storage на основе сервера S3 перенос записей на remote-уровень контролируется при помощи специального топика
|
Шаг 1. Настройка и включение Tiered Storage
-
Откройте вкладку Services на странице кластера и кликните по имени сервиса Kafka в столбце Name.
Переход к настройке сервиса -
В открывшемся окне Primary configuration:
-
Для серверов S3, у которых ссылка на бакет задается не в формате FQDN, а в виде пути (например, для MINIO ссылка на бакет выглядит так: http://<s3hostname:port>/browser/<bucket.name>), установите параметр удаленного хранилища, который определяет соответствующий вид ссылки к бакету:
rsm.config.storage.s3.path.style.access.enabled=true
Для этого раскройте группу server.properties и, используя поле Add key,value, выберите Add property и введите наименование параметра и его значение.
-
Переведите в активное состояние переключатель S3 Tiered Storage и введите значения параметров для подключения к хранилищу S3.
ПРИМЕЧАНИЕЗначение параметра storage.s3.region не может быть пустым. Если на сервере S3 не предусмотрен такой параметр, установите для него любое значение, например,
none
. -
При необходимости раскройте группу Tiered Storage General и измените значения параметров, установленные по умолчанию.
Настройка Tiered StorageОписания параметров приведены в разделе Kafka статьи Конфигурационные параметры ADS.
-
-
Нажмите Save и перезагрузите сервис Kafka при помощи действия Restart, нажав на иконку
в столбце Actions.
-
Дождитесь завершения перезагрузки сервиса. Проанализируйте и исправьте ошибки в случае их возникновения на странице Jobs.
Шаг 2. Проверка результатов
При включении опции Tiered Storage через интерфейс ADCM в конфигурационном файле брокера Kafka (в директории /etc/kafka/conf/) автоматически устанавливаются все необходимые параметры для работы с хранилищем S3: параметры компонентов Remote Manager, StorageBackend (определяющиеся автоматически для выбранного вида хранилища) и механизма chunking.
Ниже приведен пример измененного при помощи ADCM файла конфигурации.
# Managed by ADCM
node.id=1
reserved.broker.max.id=5000
auto.create.topics.enable=False
listeners=PLAINTEXT://:9092
log.dirs=/kafka-logs
default.replication.factor=1
num.partitions=1
delete.topic.enable=true
log.retention.hours=168
log.roll.hours=168
queued.max.requests=500
num.network.threads=3
num.io.threads=8
auto.leader.rebalance.enable=True
unclean.leader.election.enable=False
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connect=sov-ads-test-2.ru-central1.internal:2181/arenadata/cluster/154
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
zookeeper.set.acl=false
log.cleaner.enable=True
log.cleanup.policy=delete
log.cleanup.interval.mins=10
log.cleaner.min.compaction.lag.ms=0
log.cleaner.delete.retention.ms=86400000
rlmm.config.remote.log.metadata.topic.replication.factor=1
rsm.config.storage.s3.path.style.access.enabled=true
security.inter.broker.protocol=PLAINTEXT
remote.log.metadata.manager.listener.name=PLAINTEXT
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/*
remote.log.storage.system.enable=true
rsm.config.chunk.size=4194304
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/var/cache/kafka
rsm.config.fetch.chunk.cache.prefetch.max.size=8388608
rsm.config.fetch.chunk.cache.size=1073741824
rsm.config.fetch.chunk.cache.retention.ms=600000
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.endpoint.url=http://<s3hostname:port>
rsm.config.storage.s3.bucket.name=kafka
rsm.config.storage.aws.access.key.id=<access key>
rsm.config.storage.aws.secret.access.key=<secret key>
rsm.config.storage.s3.region=none
Использование Tiered Storage
При создании топика, сегменты которого должны быть перемещены в хранилище, необходимо включить для данного топика опцию Tiered Storage при помощи параметра remote.storage.enable.
Если для топика не указан параметр local.retention.ms, время локального хранения данных будет соответствовать времени, указанному для брокера.
Ниже приведен пример создания топика с указанием параметров, а также с указанием времени общего хранения и размера сегмента:
$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server ads-test-1.ru-central1.internal:9092 \
--config remote.storage.enable=true \
--config local.retention.ms=1000 \
--config retention.ms=300000 \
--config segment.bytes=200
После записи сообщений, если время локального хранения истекло и сегмент достиг указанного размера, файл появляется в директории хранилища. Данные сообщения могут быть прочитаны как обычно.
Ниже на рисунке показана директория с сегментами топика на remote-уровне в бакете хранилища MINIO.


Созданная в хранилище директория tieredTopic-BVqanvXiTsKvgy5XY371vQ имеет имя, состоящее из названия топика tieredTopic
, за которым следует тире и идентификатор топика.
Эта директория в свою очередь содержит папки, соответствующие номерам партиций. Папки партиций содержат по три файла для каждого сохраненного на remote-уровне сегмента:
-
00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.indexes — файл, содержащий индексы Kafka, которые связаны с каждым сегментом журнала.
-
00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.log — файл, являющийся сегментом журнала, содержит записи Kafka.
-
00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.rsm-manifest — файл, содержащий метаданные о сегменте журнала и индексах.
Такие файлы имеют название, состоящее из номера смещения и идентификатора сегмента журнала.