Начало работы c Kafka

В данной статье описаны первые шаги для начала работы с Kafka.

Установка и запуск среды Kafka c использованием ADCM

  1. Для установки и запуска среды Kafka c использованием ADCM в кластере ADS необходимо установить сервисы:

    • Kafka

    • ZooKeeper

  2. После установки в интерфейсе ADCM для всех установленных сервисов необходимо по очереди применить действие Start, нажав на иконку actions default dark actions default light в столбце Actions и дождаться успешного завершения запуска.

    Запуск сервисов в интерфейсе ADCM
    Запуск сервисов в интерфейсе ADCM

    В результате на соответствующих хостах установлена среда Kafka и запускаются серверы Kafka broker и ZooKeeper.

ПРИМЕЧАНИЕ

Установка сервисов описана в статьях Online-установка, Offline-установка.

Подключение к Kafka

При установке Kafka на хостах создается новая директория /usr/lib/kafka/bin. Она содержит файлы c расширением .sh — скрипты для выполнения команд в среде Kafka. Для просмотра директории необходимо выполнить следующую команду:

$ ls -la /usr/lib/kafka/bin

Работа с сервисом Kafka происходит путем запуска соответствующих скриптов в командной строке. Обычно скрипты запускают, находясь в папке /usr/lib/kafka, указывая в команде часть пути bin/. Перейти в нужную директорию можно, используя команду:

$ cd /usr/lib/kafka

Создание топика

Для того чтобы записать сообщения в Kafka, необходимо создать топик. Это осуществляется при помощи скрипта kafka-topics.sh и опции --create c любого хоста кластера, на котором установлен сервис Kafka.

Для создания топика необходимо выполнить следующую команду:

$ bin/kafka-topics.sh --create --topic new-topic --bootstrap-server hostname:9092

В записи команды применены опции для описания топика:

  • new-topic — название топика;

  • hostname — имя хоста, где создается топик;

  • 9092 — HTTP-порт доступа к сервису Kafka.

В результате появляется сообщение:

Created topic new-topic.

Для просмотра данных о существующем топике используется скрипт kafka-topics.sh и опция --describe.

Для просмотра данных о топике необходимо выполнить следующую команду:

$ bin/kafka-topics.sh --describe --topic new-topic --bootstrap-server hostname:9092

В результате выводятся данные о запрошенном топике в виде:

Topic: new-topic	TopicId: STGaaB8QSF64s06MRtp-YA	PartitionCount: 1	ReplicationFactor: 1	Configs: unclean.leader.election.enable=false
Topic: new-topic	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001

Здесь указано, что созданный топик обладает следующими параметрами:

  • количество партиций — 1;

  • коэффициент репликации — 1;

  • обозначение брокера-лидера для данной партиции — 1001;

  • настройка unclean.leader.election.enable выключена.

ПРИМЕЧАНИЕ

Для просмотра других действий, которые выполняет скрипт kafka-topics.sh, необходимо выполнить команду:

$ bin/kafka-topics.sh

Запись сообщения в топик

Запись сообщения в топик производится при помощи скрипта kafka-console-producer.sh c любого хоста кластера, на котором установлен сервис Kafka.

Для записи сообщения в топик new-topic необходимо:

  1. Выполнить следующую команду:

    $ bin/kafka-console-producer.sh --topic new-topic --bootstrap-server hostname:9092

    При этом запускается режим записи сообщений.

  2. На следующей строке после ввода команды ввести необходимые сообщения, каждое на новой строке. Запись сообщения и переход на новую строку производится при помощи Enter. Например, таким образом вводится несколько сообщений, каждое на своей строке:

    >Sunday
    >Monday
    >Tuesday
    >Wednesday
    >Thursday
    >Friday
    >Saturday
  3. Выйти из режима записи сообщений, для этого необходимо:

    • после записи последнего сообщения перейти на следующую строку;

    • нажать Ctrl+C.

ПРИМЕЧАНИЕ

Для просмотра других опций, которые можно применить при выполнении скрипта kafka-console-producer.sh, необходимо выполнить команду:

$ bin/kafka-console-producer.sh

Чтение сообщений из топика

Чтение сообщений из топика производится при помощи скрипта kafka-console-consumer.sh c любого хоста кластера, на котором установлен сервис Kafka.

Чтение сообщений с начала партиции

Для чтения сообщений из топика new-topic с самого первого сообщения, записанного в партицию, необходимо выполнить следующую команду с использованием опции --from-beginning:

$ bin/kafka-console-consumer.sh --topic new-topic --from-beginning --bootstrap-server hostname:9092

При этом запускается режим чтения сообщений. Сообщения считываются в том же порядке, как они были записаны:

Sunday
Monday
Tuesday
Wednesday
Thursday
Friday
Saturday

Выход из режима чтения сообщений осуществляется при помощи Ctrl+C.

РЕКОМЕНДАЦИЯ
Если в этот же топик при помощи kafka-console-producer.sh с другой терминальной сессии записывать сообщения, в режиме чтения можно их сразу считывать.

Чтение сообщений с заданного смещения

Для чтения сообщений из топика new-topic начиная со смещения 4 (Thursday) необходимо выполнить следующую команду с использованием опций указания партиции --partition 0 и смещения --offset 4:

$ bin/kafka-console-consumer.sh --topic new-topic --partition 0 --offset 4 --bootstrap-server hostname:9092

Результатом будет вывод сообщений, записанных начиная со смещения 4:

Thursday
Friday
Saturday
ПРИМЕЧАНИЕ

Для просмотра других опций, которые можно применить при выполнении скрипта kafka-console-consumer.sh, необходимо выполнить команду:

$ bin/kafka-console-consumer.sh

Завершение работы в среде Kafka

Для остановки сервисов в интерфейсе ADCM для всех сервисов по очереди необходимо применить действие Stop, нажав на иконку actions default dark actions default light в столбце Actions и дождаться успешного завершения действия.

Остановка сервисов в интерфейсе ADCM
Остановка сервисов в интерфейсе ADCM

Если необходимо удалить данные локальной среды Kafka, включая любые созданные сообщения, необходимо выполнить команду:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней