Kafka API usage example
This article describes how to create simple Kafka API examples:
NOTE
In the Kafka API examples described below, applications are created and run on one of the hosts with the Kafka service version 2.8.1 pre-installed in the ADS cluster version 2.8.1.1. For other Kafka versions, a list of current APIs can be found at the link https://kafka.apache.org/XX/javadoc/index.html, where |
Prepare to create an application
Kafka API example application creation is done in the OpenJDK environment using pre-installed tool Apache Maven after setting environment variables.
NOTE
To automatically create topics in Kafka, you must enable the auto.create.topics.enable parameter in the server.properties group when configuring the Kafka service. |
Create a project structure
File structure
Before creating a Java application using Maven, you need to create a project file structure. You can create a project called api.examples using the command:
$ mkdir -p api.examples/src/main/java/myapps
Set up pom.xml
In the root directory of the /api.examples/ project you need to create pom.xml — XML file containing project information and configuration information used by Maven to build the project:
$ sudo vim pom.xml
Below is the content of pom.xml containing the necessary dependencies, including:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>clients.examples</groupId>
<artifactId>clients.examples</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Kafka clients :: Java</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.8.1</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
Create and run applications
The applications described below are created in the /api.examples/src/main/java/myapps folder.
After creating a new application or making changes to an existing application, you need to run a command in the /api.examples/ project root directory (where the pom.xml file is located) that deletes all files created during the previous project build, then compiles the source code, runs the tests, and packages the result into a JAR file:
$ mvn clean package
Result:
[INFO] Building jar: /home/olga/api.examples/target/api.examples-0.1.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 2.685 s [INFO] Finished at: 2023-10-25T20:40:23Z [INFO] ------------------------------------------------------------------------
To run every application, you need to run the command in the root directory of the /api.examples/ project (where the pom.xml file is located):
$ mvn compile exec:java -Dexec.mainClass="myapps.Producer"
where myapps
is the name of the folder in which the application file is located, Producer
is the name of the .java application file.
The application run results for each API are described below.
Producer API
This section describes the Kafka Producer API example that allows users to write messages to a topic.
In the /api.examples/src/main/java/myapps folder create an application file:
$ sudo vim Producer.java
The code for Producer.java is described below.
package myapps;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) {
log.info("I am a Kafka Producer");
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// create a producer record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("my_topic", "hello");
// send data - asynchronous
producer.send(producerRecord);
// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}
Below are some lines of Producer.java code:
-
properties.setProperty
— setting Kafka Producer configurations defined by the ProducerConfig class API:-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
— list of host/port pairs used for the initial connection to the Kafka cluster. -
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
,ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
— default serialization libraries for record key/value pairs.
-
-
KafkaProducer<String, String> producer
— creating an object of the KafkaProducer class. -
ProducerRecord<String, String> producerRecord
— creating ahello
message value that should be written to themy_topic
topic using the ProducerRecord class. -
KafkaProducer methods:
-
producer.send(producerRecord)
— when called, adds a record to the buffer of pending records to be sent and returns the result immediately; -
producer.flush()
— when called, makes all buffered records available for sending. Completion of requests associated with these records is blocked. A way to ensure that all previously sent messages actually completed. -
producer.close()
— closes the producer. This method blocks until all previously submitted requests have completed.
-
As a result of running the application, the main parameters of the producer (ProducerConfig), installed in the code and defined by default, and information about the execution of the code are displayed.
3:55:20,816 INFO myapps.Producer - I am a Kafka Producer 13:55:20,843 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 13:55:21,009 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 13:55:21,010 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 13:55:21,010 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698674121007 13:55:21,361 INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA 13:55:21,401 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed 13:55:21,411 INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 2.097 s [INFO] Finished at: 2023-10-30T13:55:21Z [INFO] ------------------------------------------------------------------------
To demonstrate message recording, you can start reading messages on the console in the command line interface:
$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
As a result of every application running, a hello
message is written to the my_topic
topic, which is displayed in the console with the running consumer.
Consumer API
This section describes the Kafka Consumer API example that allows users to write messages from a topic.
In the /api.examples/src/main/java/myapps folder create an application file:
$ sudo vim Consumer.java
The code for Consumer.java is described below.
package myapps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) {
log.info("I am a Kafka Consumer");
String bootstrapServers = "localhost:9092";
String groupId = "my-group";
String topic = "my_topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// get a reference to the current thread
final Thread mainThread = Thread.currentThread();
// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
// poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
consumer.close(); // this will also commit the offsets if need be.
log.info("The consumer is now gracefully closed.");
}
}
}
Below are some lines of Consumer.java code:
-
properties.setProperty
— setting Kafka Consumer configurations defined by the ConsumerConfig class API:-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
— list of host/port pairs used for initial connection to the Kafka cluster. -
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
— default deserialization libraries for entry key/value pairs. -
ConsumerConfig.GROUP_ID_CONFIG
— designation of the consumer group. -
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
— offset number at which to start reading messages.
-
-
KafkaConsumer<String, String> consumer
— creating an object of the KafkaConsumer class. -
ConsumerRecords<String, String> records
— creating ahello
message value that should be recorded in themy_topic
topic using the ConsumerRecords class. -
KafkaConsumer methods:
-
consumer.subscribe
— connecting a consumer to a topic. If you useArrays.asList()
, it is possible to subscribe to multiple topics. -
consumer.poll
— returns data that has not yet been received by the consumer subscribed to the partition. Polling call duration (Duration.ofMillis(100)
) — time this call will block before returning an empty list if no data was returned.
-
-
log.info("Key: " + record.key() + ", Value: " + record.value()); log.info("Partition: " + record.partition() + ", Offset:" + record.offset())
— form of read data. -
KafkaConsumer
consumer.close
method is called when the consumer is closed and performs:-
fixation of displacements if necessary;
-
closing the connection to Kafka.
-
As a result of running the application, the following is displayed:
-
basic consumer parameters (ConsumerConfig), set in the code and defined by default;
-
information about code development;
-
messages written to the
my_topic
topic.
13:50:26,590 INFO myapps.Consumer - I am a Kafka Consumer 13:50:26,619 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group1-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group1 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698673826794 13:50:26,798 INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-group1-1, groupId=group1] Subscribed to topic(s): my_topic 13:50:27,153 INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-group1-1, groupId=group1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA 13:50:27,154 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Discovered group coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) 13:50:27,160 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group 13:50:27,172 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group 13:50:30,174 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'} 13:50:30,175 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 3: {consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b=Assignment(partitions=[my_topic-0])} 13:50:30,180 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully synced group in generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'} 13:50:30,180 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[my_topic-0]) 13:50:30,181 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: my_topic-0 13:50:30,188 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition my_topic-0 13:50:30,198 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition my_topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[sov-test-1.ru-central1.internal:9092 (id: 1001 rack: null)], epoch=0}}. 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:0 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:1 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:2 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:3 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:4
If you continue recording to the topic, messages will be displayed in this console indicating the partitions and offset. Closing a user is done by pressing Ctrl+C
.
^C13:51:17,173 INFO myapps.Consumer - Detected a shutdown, let's exit by calling consumer.wakeup()... 13:51:17,177 INFO myapps.Consumer - Wake up exception! 13:51:17,181 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions my_topic-0 13:51:17,181 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b sending LeaveGroup request to coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) due to the consumer is being closed 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed 13:51:17,189 INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-group1-1 unregistered 13:51:17,190 INFO myapps.Consumer - The consumer is now gracefully closed.
Admin API
This section discusses the Kafka Admin API example that does:
-
creating a new topic;
-
displaying a list of topics.
In the /api.examples/src/main/java/myapps folder create an application file:
$ sudo vim CreateTopic.java
The code for CreateTopic.java is described below.
package myapps;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CreateTopic {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(properties);
//creating new topic
System.out.println("-- creating --");
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
admin.createTopics(Collections.singleton(newTopic));
//listing
System.out.println("-- listing --");
admin.listTopics().names().get().forEach(System.out::println);
}
}
Below are some lines of CreateTopic.java code:
-
properties.setProperty
— setting Admin Configs defined by the AdminClientConfig class API:-
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
is a list of host/port pairs used for initial connection to the Kafka cluster.
-
-
AdminClient admin
— creating an object of the AdminClient class. -
NewTopic newTopic
— creating an object of the NewTopic class. -
AdminClient methods used:
-
admin.createTopics
— creating a topic; -
admin.listTopics
— creating a list of topics.
-
As a result of running the application, the following is displayed:
-
basic administrator parameters (AdminClientConfig), set in the code and defined by default;
-
information about code development;
-
list of topics.
14:15:03,656 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips client.id = connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698675303800 -- creating -- -- listing -- demo_java my-new-topic my_topic my-new-topic4 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 16.900 s [INFO] Finished at: 2023-10-30T14:15:19Z [INFO] ------------------------------------------------------------------------ ----