Kafka REST Proxy usage example

The article describes an example of using the Kafka REST Proxy service — working with Kafka topics over the RESTful interface using the command line tool cURL.

Write messages to a topic

To write to a topic, use the request form POST /topics/(string:topic_name).

The query to write the {"name":"Helen"} key/value pair in JSON format to the my_topic topic looks like this:

$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{"records":[{"value":{"name":"Helen"}}]}' http://hostname:8082/topics/my_topic

Where:

  • X POST — defines an HTTP request method that is used to send data to the server.

  • H "Content-Type: application/vnd.kafka.json.v2+json" — defines a custom header that describes the content-type of the request body.

  • data '{"records":[{"value":{"name":"Helen"}}]}' — data that describes the request body. The body of the request contains the information needed to create or modify the object.

  • http://hostname:8082/topics/my_topic — endpoint.

  • hostname — hostname where the Kafka REST Proxy service is installed.

NOTE

The queries below have components similar to the query for writing messages to a topic.

Query result on successful message recording:

{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
CAUTION

When writing messages to a topic, keep the following in mind:

  • To automatically create a topic, you must enable the auto.create.topics.enable parameter in the server.properties group when configuring the Kafka service.

  • The topic name can include alphanumeric characters (including ASCII), as well as the characters ., _, and -. Any other characters will generate an error.

Read messages from a topic

To read a message from a topic, do the following:

  1. Create a consumer. To do this, use the request form POST /consumers/(string:group_name).

    The request to create a my_consumer consumer in the my_group consumer group to read JSON-formatted data, starting from the beginning of the topic, looks like this:

    $ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
          --data '{"name": "my_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
          http://hostname:8082/consumers/my_group

    Query result upon successful consumer creation:

    {"instance_id":"my_consumer","base_uri":"http://hostname:8082/consumers/my_group/instances/my_consumer"}
  2. Subscribe to the topic. To do this, use the request form POST / consumers/(string:group_name)/instances/(string:instance)/subscription.

    To subscribe to a topic, you must use the base URL base_uri obtained from the request to create a consumer.

    The request to subscribe to the my_topic topic for the my_consumer consumer in the my_group group looks like this:

    $ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["my_topic"]}' \
     http://hostname:8082/consumers/my_group/instances/my_consumer/subscription
  3. Read messages from a topic. To do this, use the request form GET /consumers/(string:group_name)/instances/(string:instance)/records.

    The request for the my_consumer consumer from the my_group group to read messages from the topic to which the subscription was connected looks like this:

    $ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
          http://hostname:8082/consumers/my_group/instances/my_consumer/records

    where H "Accept: application/vnd.kafka.json.v2+json" — defines a custom header indicating content type that the client can accept.

    Query result on successful reading of messages:

    [{"topic":"my_topic2","key":null,"value":{"name":"Helen"},"partition":0,"offset":0}]

    Read a {"name":"Helen"} key/value pair in JSON format that was written to the topic.

  4. You can close a consumer using the request form DELETE / consumers/(string:group_name)/instances/(string:instance).

    The request to close the my_consumer consumer from the my_group group looks like this:

    $ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
          http://hostname:8082/consumers/my_group/instances/my_consumer

    The consumer has been deleted and can no longer read messages.

Get data on topics

List of topics

To get a list of topics, use the request form GET /topics.

A request to get a list of ADS cluster topics with a Kafka REST Proxy service installed on the hostname host looks like this:

$ curl -X GET http://hostname:8082/topics/

The query result displays all topics that exist in the cluster:

["my_topic","_schemas"]

Topic configuration parameters

To get data on the configuration parameters of a topic, use the request form GET /topics/(string: topic_name).

The request to get the parameters of the my_topic topic looks like this:

$ curl -X GET http://hostname:8082/topics/my_topic

The query result displays all parameters of the requested topic:

{"name":"my_topic","configs":{"compression.type":"producer","leader.replication.throttled.replicas":"","message.downconversion.enable":"true","min.insync.replicas":"1","segment.jitter.ms":"0","cleanup.policy":"delete","flush.ms":"9223372036854775807","follower.replication.throttled.replicas":"","segment.bytes":"1073741824","retention.ms":"604800000","flush.messages":"9223372036854775807","message.format.version":"2.8-IV1","max.compaction.lag.ms":"9223372036854775807","file.delete.delay.ms":"60000","max.message.bytes":"1048588","min.compaction.lag.ms":"0","message.timestamp.type":"CreateTime","preallocate":"false","min.cleanable.dirty.ratio":"0.5","index.interval.bytes":"4096","unclean.leader.election.enable":"false","retention.bytes":"-1","delete.retention.ms":"86400000","segment.ms":"604800000","message.timestamp.difference.max.ms":"9223372036854775807","segment.index.bytes":"10485760"},"partitions":[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]}

Partitions

To get information about topic partitions, use the form GET /topics/(string:topic_name)/partitions.

The request to get information about the partitions of the my_topic topic looks like this:

$ curl -X GET http://hostname:8082/topics/my_topic/partitions

The query result displays information about topic partitions:

[{"partition":0,"leader":1001,"replicas":[{"broker":1001,"leader":true,"in_sync":true}]}]
Found a mistake? Seleсt text and press Ctrl+Enter to report it