Пример использования Kafka API
В статье описано создание простых примеров Kafka API:
ПРИМЕЧАНИЕ
В описанных ниже примерах Kafka API приложения создаются и запускаются на одном из хостов с предустановленным сервисом Kafka версии 2.8.1 кластера ADS версии 2.8.1.1. Для других версий Kafka список актуальных API можно найти по ссылке https://kafka.apache.org/XX/javadoc/index.html, где |
Подготовка к созданию приложений
Создание примеров приложений Kafka API выполняется в среде OpenJDK с использованием предварительно установленного инструмента Apache Maven после настройки переменных среды.
ПРИМЕЧАНИЕ
Для автоматического создания топиков в Kafka необходимо включить параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka. |
Создание структуры проекта
Файловая структура
Перед созданием приложения Java с использованием Maven необходимо создать файловую структуру проекта. Создать проект с названием api.examples можно при помощи команды:
$ mkdir -p api.examples/src/main/java/myapps
Настройка pom.xml
В корневом каталоге проекта /api.examples/ необходимо создать pom.xml — XML-файл, содержащий информацию о проекте и сведения о конфигурации, используемые Maven для сборки проекта:
$ sudo vim pom.xml
Ниже представлено содержание pom.xml, включающего необходимые зависимости, в том числе:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>clients.examples</groupId>
<artifactId>clients.examples</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Kafka clients :: Java</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.8.1</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
Создание и запуск приложений
Приложения, описанные ниже, создаются в папке /api.examples/src/main/java/myapps.
После создания нового приложения или внесения изменений в существующее приложение в корневом каталоге проекта /api.examples/ (где размещается файл pom.xml) запустите команду, которая удаляет все файлы, созданные во время предыдущей сборки проекта, затем компилирует исходный код, выполняет тесты и упаковывает результат в JAR файл:
$ mvn clean package
Результат:
[INFO] Building jar: /home/olga/api.examples/target/api.examples-0.1.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 2.685 s [INFO] Finished at: 2023-10-25T20:40:23Z [INFO] ------------------------------------------------------------------------
Для запуска каждого приложения в корневом каталоге проекта /api.examples/ (где размещается файл pom.xml) запустите команду:
$ mvn compile exec:java -Dexec.mainClass="myapps.Producer"
где myapps
— наименование папки, в которой размещается файл приложения, Producer
— наименование файла приложения .java.
Результаты запуска приложений для каждого API описаны ниже.
Producer API
В данном разделе рассматривается пример Kafka Producer API, позволяющий записывать сообщения в топик.
В папке /api.examples/src/main/java/myapps создайте файл приложения:
$ sudo vim Producer.java
Ниже описан код Producer.java.
package myapps;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) {
log.info("I am a Kafka Producer");
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// create a producer record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("my_topic", "hello");
// send data - asynchronous
producer.send(producerRecord);
// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}
Ниже раскрыты некоторые строки кода Producer.java:
-
properties.setProperty
— установка конфигураций Kafka Producer, определяемых API класса ProducerConfig:-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
— список пар хост/порт, используемых для исходного подключения к кластеру Kafka. -
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
,ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
— библиотеки сериализации по умолчанию для пар ключ/значение записи.
-
-
KafkaProducer<String, String> producer
— создание объекта класса KafkaProducer. -
ProducerRecord<String, String> producerRecord
— создание значения сообщенияhello
, которое должно быть записано в топикmy_topic
c использованием класса ProducerRecord. -
Методы KafkaProducer:
-
producer.send(producerRecord)
— при вызове добавляет запись в буфер ожидающих отправки записей и немедленно возвращает результат; -
producer.flush()
— при вызове делает все буферизованные записи доступными для отправки. Завершение запросов, связанных с этими записями, блокируется. Способ убедиться, что все ранее отправленные сообщения действительно завершились. -
producer.close()
— закрывает производителя. Этот метод блокируется до тех пор, пока не будут завершены все ранее отправленные запросы.
-
В результате запуска приложения выводятся основные параметры продюсера (ProducerConfig), установленные в коде и определяемые по умолчанию, и информация об отработке кода.
3:55:20,816 INFO myapps.Producer - I am a Kafka Producer 13:55:20,843 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 13:55:21,009 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 13:55:21,010 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 13:55:21,010 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698674121007 13:55:21,361 INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA 13:55:21,401 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter 13:55:21,410 INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed 13:55:21,411 INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 2.097 s [INFO] Finished at: 2023-10-30T13:55:21Z [INFO] ------------------------------------------------------------------------
Для демонстрации записи сообщений в интерфейсе командной строки запустите чтение сообщений:
$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
В результате каждого запуска приложения в топик my_topic
записывается сообщение hello
, что отображается в консоли с запущенным потребителем.
Consumer API
В данном разделе рассматривается пример Kafka Consumer API, позволяющий читать сообщения из топика.
В папке /api.examples/src/main/java/myapps создайте файл приложения:
$ sudo vim Consumer.java
Ниже описан код Consumer.java.
package myapps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) {
log.info("I am a Kafka Consumer");
String bootstrapServers = "localhost:9092";
String groupId = "my-group";
String topic = "my_topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// get a reference to the current thread
final Thread mainThread = Thread.currentThread();
// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
// poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
consumer.close(); // this will also commit the offsets if need be.
log.info("The consumer is now gracefully closed.");
}
}
}
Ниже раскрыты некоторые строки кода Consumer.java:
-
properties.setProperty
— установка конфигураций Kafka Consumer, определяемых API класса ConsumerConfig:-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
— список пар хост/порт, используемых для исходного подключения к кластеру Kafka. -
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
— библиотеки десериализации по умолчанию для пар ключ/значение записи. -
ConsumerConfig.GROUP_ID_CONFIG
— обозначение группы потребителей. -
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
— номер смещения, с которого следует начинать чтение сообщений.
-
-
KafkaConsumer<String, String> consumer
— создание объекта класса KafkaConsumer. -
ConsumerRecords<String, String> records
— создание значения сообщенияhello
, которое должно быть записано в топикmy_topic
c использованием класса ConsumerRecords. -
Методы KafkaConsumer:
-
consumer.subscribe
— подключение потребителя к топику. Если использоватьArrays.asList()
, возможно подписаться на несколько топиков. -
consumer.poll
— возвращает данные, которые еще не были получены потребителем, подписанным на партиции. Продолжительность вызова опроса (Duration.ofMillis(100)
) — время блокировки этого вызова перед возвратом пустого списка в случае, если данные не были возвращены.
-
-
log.info("Key: " + record.key() + ", Value: " + record.value()); log.info("Partition: " + record.partition() + ", Offset:" + record.offset())
— форма считываемых данных. -
метод KafkaConsumer
consumer.close
вызывается при закрытии потребителя и выполняет:-
фиксацию смещений при необходимости;
-
закрытие соединения с Kafka.
-
В результате запуска приложения выводятся:
-
основные параметры потребителя (ConsumerConfig), установленные в коде и определяемые по умолчанию;
-
информация об отработке кода;
-
сообщения, записанные в топик
my_topic
.
13:50:26,590 INFO myapps.Consumer - I am a Kafka Consumer 13:50:26,619 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-group1-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = group1 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 13:50:26,796 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698673826794 13:50:26,798 INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-group1-1, groupId=group1] Subscribed to topic(s): my_topic 13:50:27,153 INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-group1-1, groupId=group1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA 13:50:27,154 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Discovered group coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) 13:50:27,160 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group 13:50:27,172 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group 13:50:30,174 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'} 13:50:30,175 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 3: {consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b=Assignment(partitions=[my_topic-0])} 13:50:30,180 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully synced group in generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'} 13:50:30,180 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[my_topic-0]) 13:50:30,181 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: my_topic-0 13:50:30,188 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition my_topic-0 13:50:30,198 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition my_topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[sov-test-1.ru-central1.internal:9092 (id: 1001 rack: null)], epoch=0}}. 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:0 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:1 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:2 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:3 13:50:30,221 INFO myapps.Consumer - Key: null, Value: hello 13:50:30,221 INFO myapps.Consumer - Partition: 0, Offset:4
Если продолжить записывать в топик сообщения, они будут появляться в консоли с указанием партиции и смещения. Закрытие потребителя осуществляется при помощи нажатия Ctrl+C
.
^C13:51:17,173 INFO myapps.Consumer - Detected a shutdown, let's exit by calling consumer.wakeup()... 13:51:17,177 INFO myapps.Consumer - Wake up exception! 13:51:17,181 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions my_topic-0 13:51:17,181 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b sending LeaveGroup request to coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) due to the consumer is being closed 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter 13:51:17,183 INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed 13:51:17,189 INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-group1-1 unregistered 13:51:17,190 INFO myapps.Consumer - The consumer is now gracefully closed.
Admin API
В данном разделе рассматривается пример Kafka Admin API, выполняющий:
-
создание нового топика;
-
вывод списка топиков.
В папке /api.examples/src/main/java/myapps создайте файл приложения:
$ sudo vim CreateTopic.java
Ниже описан код CreateTopic.java.
package myapps;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CreateTopic {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(properties);
//creating new topic
System.out.println("-- creating --");
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
admin.createTopics(Collections.singleton(newTopic));
//listing
System.out.println("-- listing --");
admin.listTopics().names().get().forEach(System.out::println);
}
}
Ниже раскрыты некоторые строки кода CreateTopic.java:
-
properties.setProperty
— установка конфигураций Admin Configs, определяемых API класса AdminClientConfig:-
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
— список пар хост/порт, используемых для исходного подключения к кластеру Kafka.
-
-
AdminClient admin
— создание обьекта класса AdminClient. -
NewTopic newTopic
— создание обьекта класса NewTopic. -
используемые методы AdminClient:
-
admin.createTopics
— создание топика; -
admin.listTopics
— создание списка топиков.
-
В результате запуска приложения выводятся:
-
основные параметры администратора (AdminClientConfig), установленные в коде и определяемые по умолчанию;
-
информация об отработке кода;
-
список топиков.
14:15:03,656 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = use_all_dns_ips client.id = connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15 14:15:03,803 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1698675303800 -- creating -- -- listing -- demo_java my-new-topic my_topic my-new-topic4 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 16.900 s [INFO] Finished at: 2023-10-30T14:15:19Z [INFO] ------------------------------------------------------------------------ ----