Начало работы c Kafka
В данной статье описаны первые шаги для начала работы с Kafka.
Установка и запуск среды Kafka c использованием ADCM
-
Для установки и запуска среды Kafka c использованием ADCM в кластере ADS необходимо установить сервисы:
-
Kafka
-
ZooKeeper
-
-
После установки в интерфейсе ADCM для всех установленных сервисов необходимо по очереди применить действие Start, нажав на иконку
в столбце Actions и дождаться успешного завершения запуска.
Запуск сервисов в интерфейсе ADCMВ результате на соответствующих хостах установлена среда Kafka и запускаются серверы Kafka broker и ZooKeeper.
ПРИМЕЧАНИЕ
Установка сервисов описана в статьях Online-установка, Offline-установка. |
Подключение к Kafka
При установке Kafka на хостах создается новая директория /usr/lib/kafka/bin. Она содержит файлы c расширением .sh — скрипты для выполнения команд в среде Kafka. Для просмотра директории необходимо выполнить следующую команду:
$ ls -la /usr/lib/kafka/bin
Работа с сервисом Kafka происходит путем запуска соответствующих скриптов в командной строке. Обычно скрипты запускают, находясь в папке /usr/lib/kafka, указывая в команде часть пути bin/. Перейти в нужную директорию можно, используя команду:
$ cd /usr/lib/kafka
Создание топика
Для того чтобы записать сообщения в Kafka, необходимо создать топик. Это осуществляется при помощи скрипта kafka-topics.sh и опции --create
c любого хоста кластера, на котором установлен сервис Kafka.
Для создания топика необходимо выполнить следующую команду:
$ bin/kafka-topics.sh --create --topic new-topic --bootstrap-server hostname:9092
В записи команды применены опции для описания топика:
-
new-topic
— название топика; -
hostname
— имя хоста, где создается топик; -
9092
— HTTP-порт доступа к сервису Kafka.
В результате появляется сообщение:
Created topic new-topic.
Для просмотра данных о существующем топике используется скрипт kafka-topics.sh и опция --describe
.
Для просмотра данных о топике необходимо выполнить следующую команду:
$ bin/kafka-topics.sh --describe --topic new-topic --bootstrap-server hostname:9092
В результате выводятся данные о запрошенном топике в виде:
Topic: new-topic TopicId: STGaaB8QSF64s06MRtp-YA PartitionCount: 1 ReplicationFactor: 1 Configs: unclean.leader.election.enable=false Topic: new-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Здесь указано, что созданный топик обладает следующими параметрами:
-
количество партиций —
1
; -
коэффициент репликации —
1
; -
обозначение брокера-лидера для данной партиции —
1001
; -
настройка
unclean.leader.election.enable
выключена.
ПРИМЕЧАНИЕ
Для просмотра других действий, которые выполняет скрипт kafka-topics.sh, необходимо выполнить команду:
|
Запись сообщения в топик
Запись сообщения в топик производится при помощи скрипта kafka-console-producer.sh c любого хоста кластера, на котором установлен сервис Kafka.
Для записи сообщения в топик new-topic
необходимо:
-
Выполнить следующую команду:
$ bin/kafka-console-producer.sh --topic new-topic --bootstrap-server hostname:9092
При этом запускается режим записи сообщений.
-
На следующей строке после ввода команды ввести необходимые сообщения, каждое на новой строке. Запись сообщения и переход на новую строку производится при помощи
Enter
. Например, таким образом вводится несколько сообщений, каждое на своей строке:>Sunday >Monday >Tuesday >Wednesday >Thursday >Friday >Saturday
-
Выйти из режима записи сообщений, для этого необходимо:
-
после записи последнего сообщения перейти на следующую строку;
-
нажать
Ctrl+C
.
-
ПРИМЕЧАНИЕ
Для просмотра других опций, которые можно применить при выполнении скрипта kafka-console-producer.sh, необходимо выполнить команду:
|
Чтение сообщений из топика
Чтение сообщений из топика производится при помощи скрипта kafka-console-consumer.sh c любого хоста кластера, на котором установлен сервис Kafka.
Чтение сообщений с начала партиции
Для чтения сообщений из топика new-topic
с самого первого сообщения, записанного в партицию, необходимо выполнить следующую команду с использованием опции --from-beginning
:
$ bin/kafka-console-consumer.sh --topic new-topic --from-beginning --bootstrap-server hostname:9092
При этом запускается режим чтения сообщений. Сообщения считываются в том же порядке, как они были записаны:
Sunday Monday Tuesday Wednesday Thursday Friday Saturday
Выход из режима чтения сообщений осуществляется при помощи Ctrl+C
.
РЕКОМЕНДАЦИЯ
Если в этот же топик при помощи kafka-console-producer.sh с другой терминальной сессии записывать сообщения, в режиме чтения можно их сразу считывать.
|
Чтение сообщений с заданного смещения
Для чтения сообщений из топика new-topic
начиная со смещения 4 (Thursday
) необходимо выполнить следующую команду с использованием опций указания партиции --partition 0
и смещения --offset 4
:
$ bin/kafka-console-consumer.sh --topic new-topic --partition 0 --offset 4 --bootstrap-server hostname:9092
Результатом будет вывод сообщений, записанных начиная со смещения 4:
Thursday Friday Saturday
ПРИМЕЧАНИЕ
Для просмотра других опций, которые можно применить при выполнении скрипта kafka-console-consumer.sh, необходимо выполнить команду:
|
Завершение работы в среде Kafka
Для остановки сервисов в интерфейсе ADCM для всех сервисов по очереди необходимо применить действие Stop, нажав на иконку
в столбце Actions и дождаться успешного завершения действия.

Если необходимо удалить данные локальной среды Kafka, включая любые созданные сообщения, необходимо выполнить команду:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper