Создание приложения Kafka Streams
В статье описано создание простого приложения потоковой обработки с использованием Kafka Streams.
Kafka Streams — клиентская библиотека для создания приложений Java и Scala, где входные и выходные данные хранятся в кластерах Kafka.
Для создания приложения Kafka Streams используется Apache Maven — инструмент управления и анализа программных проектов.
В описанном ниже примере приложение создается и запускается на одном из хостов с предустановленным сервисом Kafka кластера ADS.
Подготовка к созданию проекта
ПРИМЕЧАНИЕ
В данном разделе все команды для установки OpenJDK и Apache Maven приведены как справочная информация для ОС Centos 7. Для установки программных пакетов в других ОС необходимо обратиться к документации соответстующей ОС. |
Установка OpenJDK
OpenJDK используется как среда разработки и выполнения Java для создания проекта Kafka Streams при помощи инструмента Apache Maven.
Apache Maven версии 3.3 и выше требует установки JDK версии 1.7 или выше.
Для установки OpenJDK необходимо выполнить команду:
$ sudo yum install java-1.8.0-openjdk
Результат:
Complete!
Для проверки успешной установки Java необходимо выполнить следующую команду:
$ java -version
При успешной установке Java результат выглядит следующим образом:
openjdk version "1.8.0_191" OpenJDK Runtime Environment (build 1.8.0_191-b12) OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
Установка последней версии Apache Maven
Для создания новых приложений рекомендуется использовать последнюю версию Apache Maven. Для ее установки необходимо выполнить:
-
Используя команду
wget
, загрузить Apache Maven в каталог /tmp, указав приведенную на странице загрузки Apache Maven ссылку на Binary tar.gz archive:$ wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz -P /tmp
Дождаться завершения загрузки архива.
-
Распаковать архив в каталог /opt:
$ sudo tar xf /tmp/apache-maven-3.9.4-bin.tar.gz -C /opt
-
Для удобного управления версиями и обновлениями Maven рекомендуется создать символическую ссылку, которая будет указывать на каталог установки Maven, при помощи команды:
$ sudo ln -s /opt/apache-maven-3.9.4 /opt/maven
Настройка переменных среды
Для обращения Java к установленному Apache Maven необходимо добавить путь к каталогу bin созданного каталога apache-maven-3.9.4 в переменную среды PATH
. Это можно выполнить следующим способом:
-
Cоздать файл скрипта:
$ sudo vim /etc/profile.d/maven.sh
-
Заполнить maven.sh строками, приведенными ниже:
export JAVA_HOME=/usr/lib/jvm/jre-openjdk export M2_HOME=/opt/maven export MAVEN_HOME=/opt/maven export PATH=${M2_HOME}/bin:${PATH}
-
Сохранить и закрыть файл.
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /etc/profile.d/maven.sh
-
Выполнить скрипт при помощи команды
source
:$ source /etc/profile.d/maven.sh
Проверка установки Apache Maven
Для проверки установки Apache Maven необходимо использовать команду:
$ mvn -version
При успешной установке Apache Maven результат выглядит следующим образом:
Apache Maven 3.9.4 (dfbb324ad4a7c8fb0bf182e6d91b0ae20e3d2dd9) Maven home: /opt/maven Java version: 1.8.0_382, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-1160.95.1.el7.x86_64", arch: "amd64", family: "unix"
Создание проекта Kafka Streams
Создание структуры проекта
В Apache Maven используются архетипы — заготовки, позволяющие создавать новые проекты. Для создания приложения Kafka Streams используется архетип Streams Quickstart Java.
Перед созданием проекта необходимо перейти в каталог /bin Apache Maven, выполнив команду:
$ cd /opt/maven/bin
Для создания структуры проекта Kafka Streams в каталоге /bin необходимо выполнить команду создания проекта из архетипа archetype:generate:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.8.1 \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
Ниже описаны параметры генерации, указываемые при помощи -D
:
-
archetypeGroupId
— групповой идентификатор архетипа (название каталога, в котором находится используемый архетип). -
archetypeArtifactId
— название используемого архетипа. -
archetypeVersion
— необходимая версия используемого архетипа.ВНИМАНИЕВерсия архетипа, указанная при помощи параметра
archetypeVersion
, должна соответствовать текущей версии Kafka кластера ADS, указанной в статье Поддерживаемые сервисы ADS. -
groupId
— групповое название ПО, указываемое пользователем. Обычно здесь устанавливают имя корневого каталога. -
artifactId
— описание содержимого артефакта. -
version
— версия, с которой начнется создание проекта. -
package
— название папки для сборки проекта.
При успешном создании проекта результат выглядит следующим образом:
[INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 14.917s [INFO] Finished at: Wed Aug 23 14:16:51 UTC 2023 [INFO] Final Memory: 13M/150M [INFO] ------------------------------------------------------------------------
Описание структуры проекта
Проект автоматически создается по пути /opt/maven/bin/streams.examples/.
Структура дерева созданного проекта представлена ниже:
> tree streams.examples streams-quickstart |-- pom.xml |-- src |-- main |-- java | |-- myapps | |-- LineSplit.java | |-- Pipe.java | |-- WordCount.java |-- resources |-- log4j.properties
Настройка pom.xml
В каталоге /opt/maven/bin/streams.examples/ автоматически генерируется pom.xml — XML-файл, содержащий информацию о проекте и сведения о конфигурации, используемые Maven для сборки проекта.
Для создания простых приложений Kafka Streams сгенерированный pom.xml готов к использованию, также здесь можно указать и другие зависимости проекта, плагины или цели, которые могут быть выполнены, профили сборки, описание, разработчики, списки рассылки и т.д.
Ниже представлено содержание pom.xml, сгенерированного для Kafka Streams, и описаны использующиеся теги.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>streams.examples</groupId>
<artifactId>streams.examples</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Kafka Streams Quickstart :: Java</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.8.1</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>
Теги, использующиеся в pom.xml:
-
project
— базовый тег, содержащий всю информацию о проекте. В заголовке указана информация, необходимая Maven для понимания файла pom.xml. ТегmodelVersion
указывает на текущую версию POM. Эти два тега обычно генерируются автоматически, менять их не нужно. -
groupId
,artifactId
,version
— основные параметры проекта, которые определяются при генерации из архетипа. -
name
— содержит имя проекта, отображаемое в логах. -
блок properties — особые настройки, такие как кодировка файла и используемая версия компилятора Java. В случае отсутствия блока используются настройки по умолчанию.
-
блок repositories — указывает на использование репозиториев.
-
блок build — включает плагины из архетипа.
-
блок dependencies — описывает все используемые в проекте зависимости. Каждую необходимо выделить тегом
dependency
и указать уникальные идентификационные данные:groupId
,artifactId
иversion
.
Готовые приложения Kafka Streams
После создания проекта в папке /opt/maven/bin/streams.examples/src/main/java/myapps находятся сгенерированные файлы-заготовки для запуска приложений Kafka Streams:
-
Pipe.java — простое приложение, которое читает из исходного топика
streams-plaintext-input
и записывает сообщения без изменений в топик-приемникstreams-pipe-output
. -
LineSplit.java — приложение, которое читает из исходного топика
streams-plaintext-input
, разбивает каждую текстовую строку на слова, а затем записывает в топикstreams-linesplit-output
, где каждая запись представляет одно слово. -
WordCount.java — приложение, которое читает из исходного топика
streams-plaintext-input
, разбивает каждую текстовую строку на слова, а затем вычисляет гистограмму встречаемости слов и записывает в топикstreams-wordcount-output
, где каждая запись представляет собой обновленное количество для каждого встречаемого слова.
В следующем разделе рассматривается код и запуск приложения Pipe.java, а также показано, как можно проверить работу приложения на хостах с предустановленным сервисом Kafka кластера ADS.
По аналогии можно выполнить запуск и проверку приложений LineSplit.java и WordCount.java, а также других приложений для Kafka Streams, созданных самостоятельно.
Запуск и работа с приложением Kafka Streams
Код Pipe.java
Ниже рассматривается код приложения Pipe, реализующий простую программу с использованием высокоуровневого Streams DSL, которая читает из исходного топика streams-plaintext-input
сообщения и записывает их без изменений в топик-приемник streams-pipe-output
.
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
System.out.println("Started");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
System.out.println("Thread started");
System.out.println(topology.describe());
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
В коде Pipe.java используются:
-
props.put
— конфигурации Kafka Streams, определяемые API класса StreamsConfig:-
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
— список пар хост/порт, используемых для исходного подключения к кластеру Kafka. -
StreamsConfig.APPLICATION_ID_CONFIG
— уникальный идентификатор потока. -
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
,StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
— библиотеки сериализации и десериализации по умолчанию для пар ключ/значение записи.
-
-
final StreamsBuilder builder = new StreamsBuilder()
— обьявление "cтроителя" потока, определяющего логику приложения при помощи API класса StreamsBuilder:-
builder.stream("streams-plaintext-input").to("streams-pipe-output")
— создание топологии, где исходный поток из топикаstreams-plaintext-input
непрерывно записывается в топикstreams-pipe-output
. -
final Topology topology = builder.build()
— определение топологии потока, обьявленной "cтроителем" потока.
-
-
final KafkaStreams streams = new KafkaStreams(topology, props)
— создание клиента Streams при помощи API класса KafkaStreams с заданными конфигурациями (props
) и топологией (topology
). -
final CountDownLatch latch = new CountDownLatch(1)
— логика завершения приложения, определяемая классом CountDownLatch. Вы можете завершить выполнение программы при помощи нажатияCtrl+C
. -
Вывод на экран состояний
Started
,Thread started
и описания топологии при помощи оператораSystem. out. println
.
Запуск приложения
Приложение запускается в каталоге, где расположен файл pom.xml, то есть /opt/maven/bin/streams.examples/.
Перед запуском приложения необходимо очистить проект и удалить все файлы, созданные предыдущей сборкой, запустив команду:
$ mvn clean package
Результат:
[INFO] Building jar: /opt/apache-maven-3.9.4/bin/streams.examples/target/streams.examples-0.1.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10.415s [INFO] Finished at: Thu Aug 24 07:09:19 UTC 2023 [INFO] Final Memory: 14M/126M [INFO] ------------------------------------------------------------------------
Далее для запуска приложения необходимо скомпилировать исходный код из файла Pipe.java при помощи команды:
$ mvn compile exec:java -Dexec.mainClass="myapps.Pipe"
В результате успешного запуска приложения выводится сообщение Thread started
и описание топологии, которая имеет два узла процессора, узел-источник KSTREAM-SOURCE-0000000000
и узел-приемник KSTREAM-SINK-0000000001
и соответствующие топики, прикрепленные к каждому узлу:
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ streams.examples --- Started SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Thread started Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input]) --> KSTREAM-SINK-0000000001 Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Приложение запущено. Завершение выполнения программы происходит при нажатии Ctrl+C
.
Проверка работы созданного приложения
ПРИМЕЧАНИЕ
Для получения дополнительной информации об основных принципах работы в Kafka можно обратиться к статье Начало работы c Kafka. |
Для проверки запущенного приложения необходимо:
-
Для создания топиков, использующихся в приложении, на хосте с установленным брокером Kafka выполнить команды:
-
Создание топика-источника
streams-plaintext-input
:$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
-
Создание топика-приемника
streams-pipe-output
:$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-pipe-output --bootstrap-server sov-test-2.ru-central1.internal:9092
-
-
Создать производителя сообщений для топика-источника
streams-plaintext-input
, используя команду:$ /usr/lib/kafka/bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
-
В другой консоли создать потребителя сообщений из топика-приемника
streams-pipe-output
, используя команду:$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server sov-test-2.ru-central1.internal:9092
-
Записывать сообщения в формате ключ/значение в топик-источник
streams-plaintext-input
и читать сообщения без изменений из топика-приемникаstreams-pipe-output
.