Пример использования Kafka REST Proxy

В статье описан пример использования сервиса Kafka REST Proxy — работа с топиками Kafka по интерфейсу RESTful при помощи инструмента командной строки cURL.

Запись сообщений в топик

Для записи в топик используется форма запроса POST /topics/(string:topic_name).

Запрос для записи пары ключ/значение {"name":"Helen"} в JSON-формате в топик my_topic выглядит следующим образом:

$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{"records":[{"value":{"name":"Helen"}}]}' http://hostname:8082/topics/my_topic

где:

  • X POST — определяет метод HTTP-запроса, который используется для отправки данных на сервер.

  • H "Content-Type: application/vnd.kafka.json.v2+json" — определяет пользовательский заголовок, описывающий тип контента тела запроса.

  • data '{"records":[{"value":{"name":"Helen"}}]}' — данные, описывающие тело запроса. Тело запроса содержит информацию, необходимую для создания или изменения объекта.

  • http://hostname:8082/topics/my_topic — конечная точка.

  • hostname — имя хоста, где установлен сервис Kafka REST Proxy.

ПРИМЕЧАНИЕ

Запросы, приведенные далее, имеют составные части, аналогичные запросу для записи сообщений в топик.

Результат запроса при успешной записи сообщений:

{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
ВНИМАНИЕ

При записи сообщений в топик необходимо учитывать следующее:

  • Для автоматического создания топика необходимо включить параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

  • Наименование топика может включать буквенно-цифровые символы (в том числе в кодировке ASCII), а также символы ., _ и -. При использовании других символов будет сформирована ошибка.

Чтение сообщений из топика

Для чтения сообщения из топика необходимо выполнить:

  1. Создать потребителя (consumer). Для этого используется форма запроса POST /consumers/(string:group_name).

    Запрос на создание потребителя my_consumer в группе потребителей my_group для считывания данных JSON-формата с начала топика выглядит следующим образом:

    $ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
          --data '{"name": "my_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
          http://hostname:8082/consumers/my_group

    Результат запроса при успешном создании потребителя:

    {"instance_id":"my_consumer","base_uri":"http://hostname:8082/consumers/my_group/instances/my_consumer"}
  2. Подписаться на топик. Для этого используется форма запроса POST /consumers/(string:group_name)/instances/(string:instance)/subscription.

    Для подписки на топик необходимо использовать базовый URL-адрес base_uri, полученный в результате запроса на создание потребителя.

    Запрос на подписку на топик my_topic для потребителя my_consumer из группы my_group выглядит следующим образом:

    $ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["my_topic"]}' \
     http://hostname:8082/consumers/my_group/instances/my_consumer/subscription
  3. Считать сообщения из топика. Для этого используется форма запроса GET /consumers/(string:group_name)/instances/(string:instance)/records.

    Запрос на чтение потребителем my_consumer из группы my_group сообщений из топика, на который была подключена подписка, выглядит следующим образом:

    $ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
          http://hostname:8082/consumers/my_group/instances/my_consumer/records

    где H "Accept: application/vnd.kafka.json.v2+json" — определяет пользовательский заголовок, указывающий тип контента, который клиент может принять.

    Результат запроса при успешном чтении сообщений:

    [{"topic":"my_topic2","key":null,"value":{"name":"Helen"},"partition":0,"offset":0}]

    Прочитана пара ключ/значение {"name":"Helen"} в JSON-формате, которая была записана в топик.

  4. Закрыть потребителя можно при помощи формы запроса DELETE /consumers/(string:group_name)/instances/(string:instance).

    Запрос на закрытие потребителя my_consumer из группы my_group выглядит следующим образом:

    $ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
          http://hostname:8082/consumers/my_group/instances/my_consumer

    Потребитель удален и больше не может читать сообщения.

Получение данных о топиках

Список топиков

Для получения списка топиков используется форма запроса GET /topics.

Запрос на получение списка топиков клaстера ADS, в котором сервис Kafka REST Proxy установлен на хосте hostname, выглядит следующим образом:

$ curl -X GET http://hostname:8082/topics/

В результате запроса показаны все топики, существующие в кластере:

["my_topic","_schemas"]

Конфигурационные параметры топиков

Для получения данных о конфигурационных параметрах топика используется форма запроса GET /topics/(string:topic_name).

Запрос на получение параметров топика my_topic выглядит следующим образом:

$ curl -X GET http://hostname:8082/topics/my_topic

В результате запроса отображаются все параметры запрашиваемого топика:

{"name":"my_topic","configs":{"compression.type":"producer","leader.replication.throttled.replicas":"","message.downconversion.enable":"true","min.insync.replicas":"1","segment.jitter.ms":"0","cleanup.policy":"delete","flush.ms":"9223372036854775807","follower.replication.throttled.replicas":"","segment.bytes":"1073741824","retention.ms":"604800000","flush.messages":"9223372036854775807","message.format.version":"2.8-IV1","max.compaction.lag.ms":"9223372036854775807","file.delete.delay.ms":"60000","max.message.bytes":"1048588","min.compaction.lag.ms":"0","message.timestamp.type":"CreateTime","preallocate":"false","min.cleanable.dirty.ratio":"0.5","index.interval.bytes":"4096","unclean.leader.election.enable":"false","retention.bytes":"-1","delete.retention.ms":"86400000","segment.ms":"604800000","message.timestamp.difference.max.ms":"9223372036854775807","segment.index.bytes":"10485760"},"partitions":[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]}

Партиции

Для получения сведений о партициях топика используется форма GET /topics/(string:topic_name)/partitions.

Запрос на получение сведений о партициях топика my_topic выглядит следующим образом:

$ curl -X GET http://hostname:8082/topics/my_topic/partitions

В результате запроса отображаются сведения о партициях топика:

[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней