Configure and use Kafka Tiered Storage

Overview

The Tiered Storage option in ADS can be enabled in the configuration parameters of the Kafka service at a time for only one of two storage options: HDFS or S3.

Below are detailed steps to enable the Tiered Storage option.

Hadoop Distributed File System (HDFS)

For this example, the HDFS service of the Arenadata Hadoop (ADH) cluster is used as storage.

Prerequisites

To enable the Tiered Storage option in Kafka, the following environment was used:

  • The ADS cluster is installed according to the Online installation guide. The minimum version of ADS is 3.6.2.2.b1.

  • Kafka and ZooKeeper services are installed in the ADS cluster.

  • The ADH cluster is installed according to the Online installation guide. The minimum version of ADH is 3.3.6.2.b1.

  • HDFS, Core configuration, and ZooKeeper services are installed in the ADH cluster.

  • In HDFS, you need to create a directory whose path and name matches that specified as the storage.hdfs.root configuration parameter and configure access rights:

$ sudo -u hdfs hdfs dfs -mkdir /kafka
$ sudo -u hdfs hdfs dfs -chown kafka:hadoop /kafka

Step 1. Import ADH data

In the ADCM interface, open the Clusters page and click on the ADS cluster name. Then on the cluster page that opens, go to the Import tab, select Cluster configuration next to the ADH cluster name, and click Import.

Import ADH data
Import ADH data

Step 2. Configure and enable Tiered Storage

  1. Open the Services tab on the cluster page and click on the Kafka service name in the Name column.

    Switch to the service configuration
    Switch to the service configuration
  2. In the Primary configuration window that opens:

    • Set the HDFS Tiered Storage switch to active and change the default settings as necessary.

    • If necessary, expand the Tiered Storage General group and change the default settings.

      Tiered Storage configuration
      Tiered Storage configuration

      Parameter descriptions are provided in the Kafka section of the ADS configuration parameters article.

  3. Click Save and restart the Kafka service using the Restart action by clicking actions default dark actions default light in the Actions column.

  4. Wait until the service restarts. Analyze and correct errors if they occur on the Jobs page.

Step 3. Check the results

  • When you enable the Tiered Storage option via the ADCM interface, all the necessary parameters for working with HDFS storage are automatically set:

    • In the Kafka broker configuration file (in the /etc/kafka/conf/ directory), parameters are set for Remote Manager components, StorageBackend (defined automatically for the selected storage type), and the chunking mechanism.

      Below is an example of a configuration file modified by ADCM.

      Kafka broker configuration file server.properties
      # Managed by ADCM
      
      node.id=1
      reserved.broker.max.id=5000
      auto.create.topics.enable=False
      listeners=PLAINTEXT://:9092
      log.dirs=/kafka-logs
      default.replication.factor=1
      num.partitions=1
      delete.topic.enable=true
      log.retention.hours=168
      log.roll.hours=168
      queued.max.requests=500
      num.network.threads=3
      num.io.threads=8
      auto.leader.rebalance.enable=True
      unclean.leader.election.enable=False
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      
      zookeeper.connect=sov-ads-test-4.ru-central1.internal:2181/arenadata/cluster/148
      zookeeper.connection.timeout.ms=30000
      zookeeper.session.timeout.ms=30000
      zookeeper.sync.time.ms=2000
      zookeeper.set.acl=false
      
      log.cleaner.enable=True
      log.cleanup.policy=delete
      log.cleanup.interval.mins=10
      log.cleaner.min.compaction.lag.ms=0
      log.cleaner.delete.retention.ms=86400000
      
      
      security.inter.broker.protocol=PLAINTEXT
      
      
      
      
      remote.log.metadata.manager.listener.name=PLAINTEXT
      
      remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
      remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/*
      remote.log.storage.system.enable=true
      rsm.config.chunk.size=4194304
      rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
      rsm.config.fetch.chunk.cache.path=/var/cache/kafka
      rsm.config.fetch.chunk.cache.prefetch.max.size=8388608
      rsm.config.fetch.chunk.cache.size=1073741824
      rsm.config.fetch.chunk.cache.retention.ms=600000
      rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.hdfs.HdfsStorage
      rsm.config.storage.hdfs.core-site.path=/usr/lib/kafka/config/core-site.xml
      rsm.config.storage.hdfs.hdfs-site.path=/usr/lib/kafka/config/hdfs-site.xml
      rsm.config.storage.hdfs.root=/kafka
      rsm.config.storage.hdfs.upload.buffer.size=8192
    • In the environment variables configuration file /etc/kafka/conf/kafka-env.sh sets the value for the HADOOP_CONF_DIR parameter — the location where the ADH cluster configuration files are copied:

      export HADOOP_CONF_DIR="/usr/lib/kafka/config"
  • When the Tiered Storage option is enabled, the ADH configuration files are copied to the /usr/lib/kafka/config/ folder:

Simple Storage Service (S3)

For this example, the MINIO storage is used as the S3 server.

Prerequisites

To enable the Tiered Storage option in Kafka, the following environment was used:

  • The ADS cluster is installed according to the Online installation guide. The minimum version of ADS is 3.6.2.2.b1.

  • Kafka and ZooKeeper services are installed in the ADS cluster.

  • A bucket for storing data was created in the MINIO cloud storage.

NOTE

In the Tiered Storage implementation based on the S3 server, the transfer of records to the remote level is controlled using a special __remote_log_metadata topic, which by default has a replication factor value of 3. This requires at least three Kafka brokers in the cluster for the topic to work. If the number of brokers is smaller or it is necessary for another reason to change the replication factor for the __remote_log_metadata topic, this can be done at the Kafka settings stage by specifying a new parameter in the server.properties group:

rlmm.config.remote.log.metadata.topic.replication.factor=1

Step 1. Configure and enable Tiered Storage

  1. Open the Services tab on the cluster page and click on the Kafka service name in the Name column.

    Switch to the service configuration
    Switch to the service configuration
  2. In the Primary configuration window that opens:

    • For S3 servers where the link to the bucket is specified not in the FQDN format, but as a path (for example, for MINIO, the link to the bucket looks like this: http://<s3hostname:port>/browser/<bucket.name>), set the remote storage parameter, which determines the appropriate type of a link to the bucket:

      rsm.config.storage.s3.path.style.access.enabled=true

      To do this, expand the server.properties group and, using the Add key,value field, select Add property and enter the name of the parameter and its value.

    • Set the S3 Tiered Storage switch to active and enter the parameter values ​​for connecting to S3 storage.

      NOTE

      The value of the storage.s3.region parameter cannot be empty. If the S3 server does not provide such a parameter, set it to any value, for example, none.

    • If necessary, expand the Tiered Storage General group and change the default settings.

      Tiered Storage configuration
      Tiered Storage configuration

      Parameter descriptions are provided in the Kafka section of the ADS configuration parameters article.

  3. Click Save and restart the Kafka service using the Restart action by clicking actions default dark actions default light in the Actions column.

  4. Wait until the service restarts. Analyze and correct errors if they occur on the Jobs page.

Step 2. Check the results

When you enable the Tiered Storage option via the ADCM interface, in the Kafka broker configuration file (in the /etc/kafka/conf/ directory), all the necessary parameters for working with S3 storage are automatically set: parameters of the Remote Manager components, StorageBackend (detected automatically for the selected storage type), and the chunking mechanism.

Below is an example of a configuration file modified by ADCM.

Kafka broker configuration file server.properties
# Managed by ADCM

node.id=1
reserved.broker.max.id=5000
auto.create.topics.enable=False
listeners=PLAINTEXT://:9092
log.dirs=/kafka-logs
default.replication.factor=1
num.partitions=1
delete.topic.enable=true
log.retention.hours=168
log.roll.hours=168
queued.max.requests=500
num.network.threads=3
num.io.threads=8
auto.leader.rebalance.enable=True
unclean.leader.election.enable=False
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connect=sov-ads-test-2.ru-central1.internal:2181/arenadata/cluster/154
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
zookeeper.set.acl=false

log.cleaner.enable=True
log.cleanup.policy=delete
log.cleanup.interval.mins=10
log.cleaner.min.compaction.lag.ms=0
log.cleaner.delete.retention.ms=86400000

rlmm.config.remote.log.metadata.topic.replication.factor=1
rsm.config.storage.s3.path.style.access.enabled=true

security.inter.broker.protocol=PLAINTEXT

remote.log.metadata.manager.listener.name=PLAINTEXT

remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.storage.manager.class.path=/usr/lib/kafka/libs/tiered-storage/*
remote.log.storage.system.enable=true
rsm.config.chunk.size=4194304
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/var/cache/kafka
rsm.config.fetch.chunk.cache.prefetch.max.size=8388608
rsm.config.fetch.chunk.cache.size=1073741824
rsm.config.fetch.chunk.cache.retention.ms=600000
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.endpoint.url=http://<s3hostname:port>
rsm.config.storage.s3.bucket.name=kafka
rsm.config.storage.aws.access.key.id=<access key>
rsm.config.storage.aws.secret.access.key=<secret key>
rsm.config.storage.s3.region=none

Use Tiered Storage

When creating a topic whose segments are to be moved to storage, you should enable the Tiered Storage option for this topic using the remote.storage.enable parameter.

If the local.retention.ms parameter is not specified for the topic, the local data storage time will correspond to the time specified for broker.

Below is an example of creating a topic indicating the parameters, as well as indicating total storage time and segment size:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server ads-test-1.ru-central1.internal:9092 \
--config remote.storage.enable=true \
--config local.retention.ms=1000 \
--config retention.ms=300000 \
--config segment.bytes=200

After message recording, if the local storage time has expired and the segment has reached the specified size, the file appears in the HDFS storage directory. These messages can be read as usual.

The figure below shows a directory with topic segments at the remote level in the MINIO storage bucket.

Directory with topic messages at the remote level
Directory with topic messages at the remote level
Directory with topic messages at the remote level
Directory with topic messages at the remote level

The tieredTopic-BVqanvXiTsKvgy5XY371vQ directory created in the repository has the name that includes the topic name (tieredTopic), followed by a dash and the topic identifier. This directory, in turn, contains folders corresponding to partition numbers. The partition folders contain three files for each segment saved at the remote level:

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.indexes — a file containing Kafka indexes that are associated with each log segment.

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.log — the file, which is a log segment, contains Kafka records.

  • 00000000000000000009-UzfbrQXzQKOt7IqzpRh1kA.rsm-manifest — a file containing metadata about the log segment and indexes.

Such files have a name consisting of an offset number and a log segment ID.

Found a mistake? Seleсt text and press Ctrl+Enter to report it