Пример использования Kafka API

В статье описано создание простых примеров Kafka API:

ПРИМЕЧАНИЕ

В описанных ниже примерах Kafka API приложения создаются и запускаются на одном из хостов с предустановленным сервисом Kafka версии 2.8.1 кластера ADS версии 2.8.1.1. Для других версий Kafka список актуальных API можно найти по ссылке https://kafka.apache.org/XX/javadoc/index.html, где XX — первые цифры номера требуемой версии Kafka, например API для Kafka 3.6.0.

Подготовка к созданию приложений

Создание примеров приложений Kafka API выполняется в среде OpenJDK с использованием предварительно установленного инструмента Apache Maven после настройки переменных среды.

ПРИМЕЧАНИЕ

Для автоматического создания топиков в Kafka необходимо включить параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

Создание структуры проекта

Файловая структура

Перед созданием приложения Java с использованием Maven необходимо создать файловую структуру проекта. Создать проект с названием api.examples можно при помощи команды:

$ mkdir -p api.examples/src/main/java/myapps

Настройка pom.xml

В корневом каталоге проекта /api.examples/ необходимо создать pom.xml — XML-файл, содержащий информацию о проекте и сведения о конфигурации, используемые Maven для сборки проекта:

$ sudo vim pom.xml

Ниже представлено содержание pom.xml, включающего необходимые зависимости, в том числе:

  • зависимости, подключающие пакет для создания приложений Producer API, Consumer API и Admin API в Kafka версии 2.8.1;

  • зависимости, подключающие модули логирования slf4j и log4j для просмотра результатов отработки кода приложений.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>clients.examples</groupId>
    <artifactId>clients.examples</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Kafka clients :: Java</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.8.1</kafka.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <build>
	<plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <!-- Apache Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
           <version>1.2.17</version>
        </dependency>
    </dependencies>
</project>

Создание и запуск приложений

Приложения, описанные ниже, создаются в папке /api.examples/src/main/java/myapps.

После создания нового приложения или внесения изменений в существующее приложение в корневом каталоге проекта /api.examples/ (где размещается файл pom.xml) запустите команду, которая удаляет все файлы, созданные во время предыдущей сборки проекта, затем компилирует исходный код, выполняет тесты и упаковывает результат в JAR файл:

$ mvn clean package

Результат:

[INFO] Building jar: /home/olga/api.examples/target/api.examples-0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.685 s
[INFO] Finished at: 2023-10-25T20:40:23Z
[INFO] ------------------------------------------------------------------------

Для запуска каждого приложения в корневом каталоге проекта /api.examples/ (где размещается файл pom.xml) запустите команду:

$ mvn compile exec:java -Dexec.mainClass="myapps.Producer"

где myapps — наименование папки, в которой размещается файл приложения, Producer — наименование файла приложения .java.

Результаты запуска приложений для каждого API описаны ниже.

Producer API

В данном разделе рассматривается пример Kafka Producer API, позволяющий записывать сообщения в топик.

В папке /api.examples/src/main/java/myapps создайте файл приложения:

$ sudo vim Producer.java

Ниже описан код Producer.java.

Producer.java
package myapps;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class Producer {
    private static final Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) {
        log.info("I am a Kafka Producer");

        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // create the producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // create a producer record
        ProducerRecord<String, String> producerRecord =
                new ProducerRecord<>("my_topic", "hello");
        // send data - asynchronous
        producer.send(producerRecord);

        // flush data - synchronous
        producer.flush();
        // flush and close producer
        producer.close();
    }
}

Ниже раскрыты некоторые строки кода Producer.java:

  • properties.setProperty — установка конфигураций Kafka Producer, определяемых API класса ProducerConfig:

    • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG — список пар хост/порт, используемых для исходного подключения к кластеру Kafka.

    • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG — библиотеки сериализации по умолчанию для пар ключ/значение записи.

  • KafkaProducer<String, String> producer — создание объекта класса KafkaProducer.

  • ProducerRecord<String, String> producerRecord — создание значения сообщения hello, которое должно быть записано в топик my_topic c использованием класса ProducerRecord.

  • Методы KafkaProducer:

    • producer.send(producerRecord) — при вызове добавляет запись в буфер ожидающих отправки записей и немедленно возвращает результат;

    • producer.flush() — при вызове делает все буферизованные записи доступными для отправки. Завершение запросов, связанных с этими записями, блокируется. Способ убедиться, что все ранее отправленные сообщения действительно завершились.

    • producer.close() — закрывает производителя. Этот метод блокируется до тех пор, пока не будут завершены все ранее отправленные запросы.

После создания кода выполните сборку и запуск приложения.

В результате запуска приложения выводятся основные параметры продюсера (ProducerConfig), установленные в коде и определяемые по умолчанию, и информация об отработке кода.

Результат запуска Producer.java
3:55:20,816 INFO  myapps.Producer                                               - I am a Kafka Producer
13:55:20,843 INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values:
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = false
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

13:55:21,009 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
13:55:21,010 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
13:55:21,010 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698674121007
13:55:21,361 INFO  org.apache.kafka.clients.Metadata                             - [Producer clientId=producer-1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA
13:55:21,401 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics scheduler closed
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Closing reporter org.apache.kafka.common.metrics.JmxReporter
13:55:21,410 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics reporters closed
13:55:21,411 INFO  org.apache.kafka.common.utils.AppInfoParser                   - App info kafka.producer for producer-1 unregistered
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.097 s
[INFO] Finished at: 2023-10-30T13:55:21Z
[INFO] ------------------------------------------------------------------------

Для демонстрации записи сообщений в интерфейсе командной строки запустите чтение сообщений:

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092

В результате каждого запуска приложения в топик my_topic записывается сообщение hello, что отображается в консоли с запущенным потребителем.

Consumer API

В данном разделе рассматривается пример Kafka Consumer API, позволяющий читать сообщения из топика.

В папке /api.examples/src/main/java/myapps создайте файл приложения:

$ sudo vim Consumer.java

Ниже описан код Consumer.java.

Consumer
package myapps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer");

        String bootstrapServers = "localhost:9092";
        String groupId = "my-group";
        String topic = "my_topic";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // get a reference to the current thread
        final Thread mainThread = Thread.currentThread();

        // adding the shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
                consumer.wakeup();

                // join the main thread to allow the execution of the code in the main thread
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
	});

	try {

            // subscribe consumer to our topic(s)
            consumer.subscribe(Arrays.asList(topic));
            // poll for new data
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                }
            }

	} catch (WakeupException e) {
            log.info("Wake up exception!");
            // we ignore this as this is an expected exception when closing a consumer
        } catch (Exception e) {
            log.error("Unexpected exception", e);
        } finally {
            consumer.close(); // this will also commit the offsets if need be.
            log.info("The consumer is now gracefully closed.");
        }

    }
}

Ниже раскрыты некоторые строки кода Consumer.java:

  • properties.setProperty — установка конфигураций Kafka Consumer, определяемых API класса ConsumerConfig:

    • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG — список пар хост/порт, используемых для исходного подключения к кластеру Kafka.

    • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG — библиотеки десериализации по умолчанию для пар ключ/значение записи.

    • ConsumerConfig.GROUP_ID_CONFIG — обозначение группы потребителей.

    • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG — номер смещения, с которого следует начинать чтение сообщений.

  • KafkaConsumer<String, String> consumer — создание объекта класса KafkaConsumer.

  • ConsumerRecords<String, String> records — создание значения сообщения hello, которое должно быть записано в топик my_topic c использованием класса ConsumerRecords.

  • Методы KafkaConsumer:

    • consumer.subscribe — подключение потребителя к топику. Если использовать Arrays.asList(), возможно подписаться на несколько топиков.

    • consumer.poll — возвращает данные, которые еще не были получены потребителем, подписанным на партиции. Продолжительность вызова опроса (Duration.ofMillis(100)) — время блокировки этого вызова перед возвратом пустого списка в случае, если данные не были возвращены.

  • log.info("Key: " + record.key() + ", Value: " + record.value()); log.info("Partition: " + record.partition() + ", Offset:" + record.offset()) — форма считываемых данных.

  • метод KafkaConsumer consumer.close вызывается при закрытии потребителя и выполняет:

    • фиксацию смещений при необходимости;

    • закрытие соединения с Kafka.

После создания кода выполните сборку и запуск приложения.

В результате запуска приложения выводятся:

  • основные параметры потребителя (ConsumerConfig), установленные в коде и определяемые по умолчанию;

  • информация об отработке кода;

  • сообщения, записанные в топик my_topic.

Результат запуска Consumer.java
13:50:26,590 INFO  myapps.Consumer                                               - I am a Kafka Consumer
13:50:26,619 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-group1-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = group1
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
13:50:26,796 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698673826794
13:50:26,798 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=consumer-group1-1, groupId=group1] Subscribed to topic(s): my_topic
13:50:27,153 INFO  org.apache.kafka.clients.Metadata                             - [Consumer clientId=consumer-group1-1, groupId=group1] Cluster ID: HJEh6i7sSOynNkdYiIEOVA
13:50:27,154 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Discovered group coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null)
13:50:27,160 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
13:50:27,172 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] (Re-)joining group
13:50:30,174 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully joined group with generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'}
13:50:30,175 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Finished assignment for group at generation 3: {consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b=Assignment(partitions=[my_topic-0])}
13:50:30,180 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Successfully synced group in generation Generation{generationId=3, memberId='consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b', protocol='range'}
13:50:30,180 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[my_topic-0])
13:50:30,181 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: my_topic-0
13:50:30,188 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition my_topic-0
13:50:30,198 INFO  org.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition my_topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[sov-test-1.ru-central1.internal:9092 (id: 1001 rack: null)], epoch=0}}.
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:0
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:1
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:2
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:3
13:50:30,221 INFO  myapps.Consumer                                               - Key: null, Value: hello
13:50:30,221 INFO  myapps.Consumer                                               - Partition: 0, Offset:4

Если продолжить записывать в топик сообщения, они будут появляться в консоли с указанием партиции и смещения. Закрытие потребителя осуществляется при помощи нажатия Ctrl+C.

Результат закрытия приложения Consumer.java
^C13:51:17,173 INFO  myapps.Consumer                                               - Detected a shutdown, let's exit by calling consumer.wakeup()...
13:51:17,177 INFO  myapps.Consumer                                               - Wake up exception!
13:51:17,181 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions my_topic-0
13:51:17,181 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-c3608fee-d8cc-404f-97f1-5efbe2a49f4b sending LeaveGroup request to coordinator sov-test-1.ru-central1.internal:9092 (id: 2147482646 rack: null) due to the consumer is being closed
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics scheduler closed
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Closing reporter org.apache.kafka.common.metrics.JmxReporter
13:51:17,183 INFO  org.apache.kafka.common.metrics.Metrics                       - Metrics reporters closed
13:51:17,189 INFO  org.apache.kafka.common.utils.AppInfoParser                   - App info kafka.consumer for consumer-group1-1 unregistered
13:51:17,190 INFO  myapps.Consumer                                               - The consumer is now gracefully closed.

Admin API

В данном разделе рассматривается пример Kafka Admin API, выполняющий:

  • создание нового топика;

  • вывод списка топиков.

В папке /api.examples/src/main/java/myapps создайте файл приложения:

$ sudo vim CreateTopic.java

Ниже описан код CreateTopic.java.

CreateTopic
package myapps;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CreateTopic {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
      Properties properties = new Properties();
      properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(properties);
      //creating new topic
      System.out.println("-- creating --");
      NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
      admin.createTopics(Collections.singleton(newTopic));

      //listing
      System.out.println("-- listing --");
      admin.listTopics().names().get().forEach(System.out::println);
  }
}

Ниже раскрыты некоторые строки кода CreateTopic.java:

  • properties.setProperty — установка конфигураций Admin Configs, определяемых API класса AdminClientConfig:

    • AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG — список пар хост/порт, используемых для исходного подключения к кластеру Kafka.

  • AdminClient admin — создание обьекта класса AdminClient.

  • NewTopic newTopic —  создание обьекта класса NewTopic.

  • используемые методы AdminClient:

    • admin.createTopics — создание топика;

    • admin.listTopics — создание списка топиков.

После создания кода выполните сборку и запуск приложения.

В результате запуска приложения выводятся:

  • основные параметры администратора (AdminClientConfig), установленные в коде и определяемые по умолчанию;

  • информация об отработке кода;

  • список топиков.

Результат запуска CreateTopic.java
14:15:03,656 INFO  org.apache.kafka.clients.admin.AdminClientConfig              - AdminClientConfig values:
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	client.id =
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.8.1
14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: 839b886f9b732b15
14:15:03,803 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1698675303800
-- creating --
-- listing --
demo_java
my-new-topic
my_topic
my-new-topic4

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  16.900 s
[INFO] Finished at: 2023-10-30T14:15:19Z
[INFO] ------------------------------------------------------------------------
----
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней