Пример использования Kafka REST Proxy
В статье описан пример использования сервиса Kafka REST Proxy — работа с топиками Kafka по интерфейсу RESTful при помощи инструмента командной строки cURL.
Запись сообщений в топик
Для записи в топик используется форма запроса POST /topics/(string:topic_name).
Запрос для записи пары ключ/значение {"name":"Helen"}
в JSON-формате в топик my_topic
выглядит следующим образом:
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"name":"Helen"}}]}' http://hostname:8082/topics/my_topic
где:
-
X POST
— определяет метод HTTP-запроса, который используется для отправки данных на сервер. -
H "Content-Type: application/vnd.kafka.json.v2+json"
— определяет пользовательский заголовок, описывающий тип контента тела запроса. -
data '{"records":[{"value":{"name":"Helen"}}]}'
— данные, описывающие тело запроса. Тело запроса содержит информацию, необходимую для создания или изменения объекта. -
http://hostname:8082/topics/my_topic
— конечная точка. -
hostname
— имя хоста, где установлен сервис Kafka REST Proxy.
ПРИМЕЧАНИЕ
Запросы, приведенные далее, имеют составные части, аналогичные запросу для записи сообщений в топик. |
Результат запроса при успешной записи сообщений:
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
ВНИМАНИЕ
При записи сообщений в топик необходимо учитывать следующее:
|
Чтение сообщений из топика
Для чтения сообщения из топика необходимо выполнить:
-
Создать потребителя (consumer). Для этого используется форма запроса POST /consumers/(string:group_name).
Запрос на создание потребителя
my_consumer
в группе потребителейmy_group
для считывания данных JSON-формата с начала топика выглядит следующим образом:$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer", "format": "json", "auto.offset.reset": "earliest"}' \ http://hostname:8082/consumers/my_group
Результат запроса при успешном создании потребителя:
{"instance_id":"my_consumer","base_uri":"http://hostname:8082/consumers/my_group/instances/my_consumer"}
-
Подписаться на топик. Для этого используется форма запроса POST /consumers/(string:group_name)/instances/(string:instance)/subscription.
Для подписки на топик необходимо использовать базовый URL-адрес
base_uri
, полученный в результате запроса на создание потребителя.Запрос на подписку на топик
my_topic
для потребителяmy_consumer
из группыmy_group
выглядит следующим образом:$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["my_topic"]}' \ http://hostname:8082/consumers/my_group/instances/my_consumer/subscription
-
Считать сообщения из топика. Для этого используется форма запроса GET /consumers/(string:group_name)/instances/(string:instance)/records.
Запрос на чтение потребителем
my_consumer
из группыmy_group
сообщений из топика, на который была подключена подписка, выглядит следующим образом:$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://hostname:8082/consumers/my_group/instances/my_consumer/records
где
H "Accept: application/vnd.kafka.json.v2+json"
— определяет пользовательский заголовок, указывающий тип контента, который клиент может принять.Результат запроса при успешном чтении сообщений:
[{"topic":"my_topic2","key":null,"value":{"name":"Helen"},"partition":0,"offset":0}]
Прочитана пара ключ/значение
{"name":"Helen"}
в JSON-формате, которая была записана в топик. -
Закрыть потребителя можно при помощи формы запроса DELETE /consumers/(string:group_name)/instances/(string:instance).
Запрос на закрытие потребителя
my_consumer
из группыmy_group
выглядит следующим образом:$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \ http://hostname:8082/consumers/my_group/instances/my_consumer
Потребитель удален и больше не может читать сообщения.
Получение данных о топиках
Список топиков
Для получения списка топиков используется форма запроса GET /topics.
Запрос на получение списка топиков клaстера ADS, в котором сервис Kafka REST Proxy установлен на хосте hostname
, выглядит следующим образом:
$ curl -X GET http://hostname:8082/topics/
В результате запроса показаны все топики, существующие в кластере:
["my_topic","_schemas"]
Конфигурационные параметры топиков
Для получения данных о конфигурационных параметрах топика используется форма запроса GET /topics/(string:topic_name).
Запрос на получение параметров топика my_topic
выглядит следующим образом:
$ curl -X GET http://hostname:8082/topics/my_topic
В результате запроса отображаются все параметры запрашиваемого топика:
{"name":"my_topic","configs":{"compression.type":"producer","leader.replication.throttled.replicas":"","message.downconversion.enable":"true","min.insync.replicas":"1","segment.jitter.ms":"0","cleanup.policy":"delete","flush.ms":"9223372036854775807","follower.replication.throttled.replicas":"","segment.bytes":"1073741824","retention.ms":"604800000","flush.messages":"9223372036854775807","message.format.version":"2.8-IV1","max.compaction.lag.ms":"9223372036854775807","file.delete.delay.ms":"60000","max.message.bytes":"1048588","min.compaction.lag.ms":"0","message.timestamp.type":"CreateTime","preallocate":"false","min.cleanable.dirty.ratio":"0.5","index.interval.bytes":"4096","unclean.leader.election.enable":"false","retention.bytes":"-1","delete.retention.ms":"86400000","segment.ms":"604800000","message.timestamp.difference.max.ms":"9223372036854775807","segment.index.bytes":"10485760"},"partitions":[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]}
Партиции
Для получения сведений о партициях топика используется форма GET /topics/(string:topic_name)/partitions.
Запрос на получение сведений о партициях топика my_topic
выглядит следующим образом:
$ curl -X GET http://hostname:8082/topics/my_topic/partitions
В результате запроса отображаются сведения о партициях топика:
[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]