Kafka API usage example

This article describes how to create simple Kafka API examples:

NOTE

In the Kafka API examples described below, applications are created and run on one of the hosts with the Kafka service version 2.8.1 pre-installed in the ADS cluster version 2.8.1.1. For other Kafka versions, a list of current APIs can be found at the link https://kafka.apache.org/XX/javadoc/index.html, where XX is the first digits of the required Kafka version number, for example API for Kafka 3.6.0.

Prepare to create an application

Kafka API example application creation is done in the OpenJDK environment using pre-installed tool Apache Maven after setting environment variables.

NOTE

To automatically create topics in Kafka, you must enable the auto.create.topics.enable parameter in the server.properties group when configuring the Kafka service.

Create a project structure

File structure

Before creating a Java application using Maven, you need to create a project file structure. You can create a project called api.examples using the command:

$ mkdir -p api.examples/src/main/java/myapps

Set up pom.xml

In the root directory of the /api.examples/ project you need to create pom.xml — XML file containing project information and configuration information used by Maven to build the project:

$ sudo vim pom.xml

Below is the content of pom.xml containing the necessary dependencies, including:

  • dependencies that connect a package to create Kafka API examples (Producer API, Consumer API, and Admin API applications) in Kafka version 2.8.1;

  • dependencies that connect the logging modules slf4j and log4j to view the results of running application codes.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>clients.examples</groupId>
    <artifactId>clients.examples</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Kafka clients :: Java</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.8.1</kafka.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
	<plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <!-- Apache Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
           <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

Create and run applications

The applications described below are created in the /api.examples/src/main/java/myapps folder.

After creating a new application or making changes to an existing application, you need to run a command in the /api.examples/ project root directory (where the pom.xml file is located) that deletes all files created during the previous project build, then compiles the source code, runs the tests, and packages the result into a JAR file:

$ mvn clean package

Result:

[INFO] Building jar: /home/olga/api.examples/target/api.examples-0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.685 s
[INFO] Finished at: 2023-10-25T20:40:23Z
[INFO] ------------------------------------------------------------------------

To run every application, you need to run the command in the root directory of the /api.examples/ project (where the pom.xml file is located):

$ mvn compile exec:java -Dexec.mainClass="myapps.Producer"

where myapps is the name of the folder in which the application file is located, Producer is the name of the .java application file.

The application run results for each API are described below.

Producer API

This section describes the Kafka Producer API example that allows users to write messages to a topic.

In the /api.examples/src/main/java/myapps folder create an application file:

$ sudo vim Producer.java

The code for Producer.java is described below.

Producer.java
package myapps;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class Producer {
    private static final Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) {
        log.info("I am a Kafka Producer");

        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // create the producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // create a producer record
        ProducerRecord<String, String> producerRecord =
                new ProducerRecord<>("my_topic", "hello");
        // send data - asynchronous
        producer.send(producerRecord);

        // flush data - synchronous
        producer.flush();
        // flush and close producer
        producer.close();
    }
}

Below are some lines of Producer.java code:

  • properties.setProperty — setting Kafka Producer configurations defined by the ProducerConfig class API:

    • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG — list of host/port pairs used for the initial connection to the Kafka cluster.

    • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG — default serialization libraries for record key/value pairs.

  • KafkaProducer<String, String> producer — creating an object of the KafkaProducer class.

  • ProducerRecord<String, String> producerRecord — creating a hello message value that should be written to the my_topic topic using the ProducerRecord class.

  • KafkaProducer methods:

    • producer.send(producerRecord) — when called, adds a record to the buffer of pending records to be sent and returns the result immediately;

    • producer.flush() — when called, makes all buffered records available for sending. Completion of requests associated with these records is blocked. A way to ensure that all previously sent messages actually completed.

    • producer.close() — closes the producer. This method blocks until all previously submitted requests have completed.

After creating the code, build and run an application.

As a result of running the application, the main parameters of the producer (ProducerConfig), installed in the code and defined by default, and information about the execution of the code are displayed.

Result of running Producer.java
3:55:20,816 INFO  myapps.Producer                                               - I am a Kafka Producer
13:55:20,843 INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values:
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

13:55:21,009 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
13:55:21,010 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
13:55:21,010 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698674121007
13:55:21,361 INFO  org.apache.kafka.clients.Metadata                             - [Producer clientId=producer-1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA
13:55:21,401 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics scheduler closed
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Closing reporter org.apache.kafka.common.metrics.JmxReporter
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics reporters closed
13:55:21,411 INFO  org.apache.kafka.common.utils.AppInfoParser                   - App info kafka.producer for producer-1 unregistered
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.097 s
[INFO] Finished at: 2023-10-30T13:55:21Z
[INFO] ------------------------------------------------------------------------

To demonstrate message recording, you can start reading messages on the console in the command line interface:

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092

As a result of every application running, a hello message is written to the my_topic topic, which is displayed in the console with the running consumer.

Consumer API

This section describes the Kafka Consumer API example that allows users to write messages from a topic.

In the /api.examples/src/main/java/myapps folder create an application file:

$ sudo vim Consumer.java

The code for Consumer.java is described below.

Consumer.java
package myapps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer");

        String bootstrapServers = "localhost:9092";
        String groupId = "my-group";
        String topic = "my_topic";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // get a reference to the current thread
        final Thread mainThread = Thread.currentThread();

        // adding the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
                consumer.wakeup();

                // join the main thread to allow the execution of the code in the main thread
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
	});

	try {

            // subscribe consumer to our topic(s)
            consumer.subscribe(Arrays.asList(topic));
            // poll for new data
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                }
            }

	} catch (WakeupException e) {
            log.info("Wake up exception!");
            // we ignore this as this is an expected exception when closing a consumer
        } catch (Exception e) {
            log.error("Unexpected exception", e);
        } finally {
            consumer.close(); // this will also commit the offsets if need be.
            log.info("The consumer is now gracefully closed.");
        }

    }
}

Below are some lines of Consumer.java code:

  • properties.setProperty — setting Kafka Consumer configurations defined by the ConsumerConfig class API:

    • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG — list of host/port pairs used for initial connection to the Kafka cluster.

    • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG — default deserialization libraries for entry key/value pairs.

    • ConsumerConfig.GROUP_ID_CONFIG — designation of the consumer group.

    • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG — offset number at which to start reading messages.

  • KafkaConsumer<String, String> consumer — creating an object of the KafkaConsumer class.

  • ConsumerRecords<String, String> records — creating a hello message value that should be recorded in the my_topic topic using the ConsumerRecords class.

  • KafkaConsumer methods:

    • consumer.subscribe — connecting a consumer to a topic. If you use Arrays.asList(), it is possible to subscribe to multiple topics.

    • consumer.poll — returns data that has not yet been received by the consumer subscribed to the partition. Polling call duration (Duration.ofMillis(100)) — time this call will block before returning an empty list if no data was returned.

  • log.info("Key: " + record.key() + ", Value: " + record.value()); log.info("Partition: " + record.partition() + ", Offset:" + record.offset()) — form of read data.

  • KafkaConsumer consumer.close method is called when the consumer is closed and performs:

    • fixation of displacements if necessary;

    • closing the connection to Kafka.

After creating the code, build and run an application.

As a result of running the application, the following is displayed:

  • basic consumer parameters (ConsumerConfig), set in the code and defined by default;

  • information about code development;

  • messages written to the my_topic topic.

Result of running Consumer.java
13:50:26,590 INFO  myapps.Consumer                                               - I am a Kafka Consumer
13:50:26,619 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-group1-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = group1
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698673826794
13:50:26,798 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=consumer-group1-1, groupId=group1] Subscribed to topic(s): my_topic
13:50:27,153 INFO  org.apache.kafka.clients.Metadata                             - [Consumer clientId=consumer-group1-1, groupId=group1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA
13:50:27,154 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Discovered group coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null)
13:50:27,160 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
13:50:27,172 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
13:50:30,174 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'}
13:50:30,175 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 3: {consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b=Assignment(partitions=[my_topic-0])}
13:50:30,180 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully synced group in generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'}
13:50:30,180 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[my_topic-0])
13:50:30,181 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: my_topic-0
13:50:30,188 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition my_topic-0
13:50:30,198 INFO  org.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition my_topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[sov-test-1.ru-central1.internal:9092 (id: 1001 rack: null)], epoch=0}}.
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:0
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:1
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:2
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:3
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:4

If you continue recording to the topic, messages will be displayed in this console indicating the partitions and offset. Closing a user is done by pressing Ctrl+C.

Result of closing the Consumer.java application
^C13:51:17,173 INFO  myapps.Consumer                                               - Detected a shutdown, let's exit by calling consumer.wakeup()...
13:51:17,177 INFO  myapps.Consumer                                               - Wake up exception!
13:51:17,181 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions my_topic-0
13:51:17,181 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b sending LeaveGroup request to coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) due to the consumer is being closed
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics scheduler closed
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Closing reporter org.apache.kafka.common.metrics.JmxReporter
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics reporters closed
13:51:17,189 INFO  org.apache.kafka.common.utils.AppInfoParser                   - App info kafka.consumer for consumer-group1-1 unregistered
13:51:17,190 INFO  myapps.Consumer                                               - The consumer is now gracefully closed.

Admin API

This section discusses the Kafka Admin API example that does:

  • creating a new topic;

  • displaying a list of topics.

In the /api.examples/src/main/java/myapps folder create an application file:

$ sudo vim CreateTopic.java

The code for CreateTopic.java is described below.

CreateTopic
package myapps;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CreateTopic {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties properties = new Properties();
      properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(properties);
      //creating new topic
      System.out.println("-- creating --");
      NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
      admin.createTopics(Collections.singleton(newTopic));

      //listing
      System.out.println("-- listing --");
      admin.listTopics().names().get().forEach(System.out::println);
  }
}

Below are some lines of CreateTopic.java code:

  • properties.setProperty — setting Admin Configs defined by the AdminClientConfig class API:

    • AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG is a list of host/port pairs used for initial connection to the Kafka cluster.

  • AdminClient admin — creating an object of the AdminClient class.

  • NewTopic newTopic — creating an object of the NewTopic class.

  • AdminClient methods used:

    • admin.createTopics — creating a topic;

    • admin.listTopics — creating a list of topics.

After creating the code, build and run an application.

As a result of running the application, the following is displayed:

  • basic administrator parameters (AdminClientConfig), set in the code and defined by default;

  • information about code development;

  • list of topics.

Result of running CreateTopic.java
14:15:03,656 INFO  org.apache.kafka.clients.admin.AdminClientConfig              - AdminClientConfig values:
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	client.id =
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698675303800
-- creating --
-- listing --
demo_java
my-new-topic
my_topic
my-new-topic4

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  16.900 s
[INFO] Finished at: 2023-10-30T14:15:19Z
[INFO] ------------------------------------------------------------------------
----
Found a mistake? Seleсt text and press Ctrl+Enter to report it