Создание приложения Kafka Streams

В статье описано создание простого приложения потоковой обработки с использованием Kafka Streams.

Kafka Streams — клиентская библиотека для создания приложений Java и Scala, где входные и выходные данные хранятся в кластерах Kafka.

Для создания приложения Kafka Streams используется Apache Maven — инструмент управления и анализа программных проектов.

В описанном ниже примере приложение создается и запускается на одном из хостов с предустановленным сервисом Kafka кластера ADS.

Подготовка к созданию проекта

ПРИМЕЧАНИЕ

В данном разделе все команды для установки OpenJDK и Apache Maven приведены как справочная информация для ОС Centos 7. Для установки программных пакетов в других ОС необходимо обратиться к документации соответстующей ОС.

Установка OpenJDK

OpenJDK используется как среда разработки и выполнения Java для создания проекта Kafka Streams при помощи инструмента Apache Maven.

Apache Maven версии 3.3 и выше требует установки JDK версии 1.7 или выше.

Для установки OpenJDK необходимо выполнить команду:

$ sudo yum install java-1.8.0-openjdk

Результат:

Complete!

Для проверки успешной установки Java необходимо выполнить следующую команду:

$ java -version

При успешной установке Java результат выглядит следующим образом:

openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)

Установка последней версии Apache Maven

Для создания новых приложений рекомендуется использовать последнюю версию Apache Maven. Для ее установки необходимо выполнить:

  1. Используя команду wget, загрузить Apache Maven в каталог /tmp, указав приведенную на странице загрузки Apache Maven ссылку на Binary tar.gz archive:

    $ wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz -P /tmp

    Дождаться завершения загрузки архива.

  2. Распаковать архив в каталог /opt:

    $ sudo tar xf /tmp/apache-maven-3.9.4-bin.tar.gz -C /opt
  3. Для удобного управления версиями и обновлениями Maven рекомендуется создать символическую ссылку, которая будет указывать на каталог установки Maven, при помощи команды:

    $ sudo ln -s /opt/apache-maven-3.9.4 /opt/maven

Настройка переменных среды

Для обращения Java к установленному Apache Maven необходимо добавить путь к каталогу bin созданного каталога apache-maven-3.9.4 в переменную среды PATH. Это можно выполнить следующим способом:

  1. Cоздать файл скрипта:

    $ sudo vim /etc/profile.d/maven.sh
  2. Заполнить maven.sh строками, приведенными ниже:

    export JAVA_HOME=/usr/lib/jvm/jre-openjdk
    export M2_HOME=/opt/maven
    export MAVEN_HOME=/opt/maven
    export PATH=${M2_HOME}/bin:${PATH}
  3. Сохранить и закрыть файл.

  4. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /etc/profile.d/maven.sh
  5. Выполнить скрипт при помощи команды source:

    $ source /etc/profile.d/maven.sh

Проверка установки Apache Maven

Для проверки установки Apache Maven необходимо использовать команду:

$ mvn -version

При успешной установке Apache Maven результат выглядит следующим образом:

Apache Maven 3.9.4 (dfbb324ad4a7c8fb0bf182e6d91b0ae20e3d2dd9)
Maven home: /opt/maven
Java version: 1.8.0_382, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1160.95.1.el7.x86_64", arch: "amd64", family: "unix"

Создание проекта Kafka Streams

Создание структуры проекта

В Apache Maven используются архетипы — заготовки, позволяющие создавать новые проекты. Для создания приложения Kafka Streams используется архетип Streams Quickstart Java.

Перед созданием проекта необходимо перейти в каталог /bin Apache Maven, выполнив команду:

$ cd /opt/maven/bin

Для создания структуры проекта Kafka Streams в каталоге /bin необходимо выполнить команду создания проекта из архетипа archetype:generate:

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.8.1 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=myapps

Ниже описаны параметры генерации, указываемые при помощи -D:

  • archetypeGroupId — групповой идентификатор архетипа (название каталога, в котором находится используемый архетип).

  • archetypeArtifactId — название используемого архетипа.

  • archetypeVersion — необходимая версия используемого архетипа.

    ВНИМАНИЕ

    Версия архетипа, указанная при помощи параметра archetypeVersion, должна соответствовать текущей версии Kafka кластера ADS, указанной в статье Поддерживаемые сервисы ADS.

  • groupId — групповое название ПО, указываемое пользователем. Обычно здесь устанавливают имя корневого каталога.

  • artifactId — описание содержимого артефакта.

  • version — версия, с которой начнется создание проекта.

  • package —  название папки для сборки проекта.

При успешном создании проекта результат выглядит следующим образом:

[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.917s
[INFO] Finished at: Wed Aug 23 14:16:51 UTC 2023
[INFO] Final Memory: 13M/150M
[INFO] ------------------------------------------------------------------------

Описание структуры проекта

Проект автоматически создается по пути /opt/maven/bin/streams.examples/.

Структура дерева созданного проекта представлена ниже:

> tree streams.examples
    streams-quickstart
    |-- pom.xml
    |-- src
        |-- main
            |-- java
            |   |-- myapps
            |       |-- LineSplit.java
            |       |-- Pipe.java
            |       |-- WordCount.java
            |-- resources
                |-- log4j.properties

Настройка pom.xml

В каталоге /opt/maven/bin/streams.examples/ автоматически генерируется pom.xml —  XML-файл, содержащий информацию о проекте и сведения о конфигурации, используемые Maven для сборки проекта.

Для создания простых приложений Kafka Streams сгенерированный pom.xml готов к использованию, также здесь можно указать и другие зависимости проекта, плагины или цели, которые могут быть выполнены, профили сборки, описание, разработчики, списки рассылки и т.д.

Ниже представлено содержание pom.xml, сгенерированного для Kafka Streams, и описаны использующиеся теги.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>streams.examples</groupId>
    <artifactId>streams.examples</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Kafka Streams Quickstart :: Java</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.8.1</kafka.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>


        <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <!-- Apache Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>
</project>

Теги, использующиеся в pom.xml:

  • project — базовый тег, содержащий всю информацию о проекте. В заголовке указана информация, необходимая Maven для понимания файла pom.xml. Тег modelVersion указывает на текущую версию POM. Эти два тега обычно генерируются автоматически, менять их не нужно.

  • groupId, artifactId, version — основные параметры проекта, которые определяются при генерации из архетипа.

  • name — содержит имя проекта, отображаемое в логах.

  • блок properties — особые настройки, такие как кодировка файла и используемая версия компилятора Java. В случае отсутствия блока используются настройки по умолчанию.

  • блок repositories — указывает на использование репозиториев.

  • блок build — включает плагины из архетипа.

  • блок dependencies  — описывает все используемые в проекте зависимости. Каждую необходимо выделить тегом dependency и указать уникальные идентификационные данные: groupId, artifactId и version.

Готовые приложения Kafka Streams

После создания проекта в папке /opt/maven/bin/streams.examples/src/main/java/myapps находятся сгенерированные файлы-заготовки для запуска приложений Kafka Streams:

  • Pipe.java — простое приложение, которое читает из исходного топика streams-plaintext-input и записывает сообщения без изменений в топик-приемник streams-pipe-output.

  • LineSplit.java — приложение, которое читает из исходного топика streams-plaintext-input, разбивает каждую текстовую строку на слова, а затем записывает в топик streams-linesplit-output, где каждая запись представляет одно слово.

  • WordCount.java — приложение, которое читает из исходного топика streams-plaintext-input, разбивает каждую текстовую строку на слова, а затем вычисляет гистограмму встречаемости слов и записывает в топик streams-wordcount-output, где каждая запись представляет собой обновленное количество для каждого встречаемого слова.

В следующем разделе рассматривается код и запуск приложения Pipe.java, а также показано, как можно проверить работу приложения на хостах с предустановленным сервисом Kafka кластера ADS.

По аналогии можно выполнить запуск и проверку приложений LineSplit.java и WordCount.java, а также других приложений для Kafka Streams, созданных самостоятельно.

Запуск и работа с приложением Kafka Streams

Код Pipe.java

Ниже рассматривается код приложения Pipe, реализующий простую программу с использованием высокоуровневого Streams DSL, которая читает из исходного топика streams-plaintext-input сообщения и записывает их без изменений в топик-приемник streams-pipe-output.

Pipe
package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        System.out.println("Started");
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();


        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
	});

	try {
            System.out.println("Thread started");
            System.out.println(topology.describe());
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
	System.exit(0);
    }
}

В коде Pipe.java используются:

  • props.put — конфигурации Kafka Streams, определяемые API класса StreamsConfig:

    • StreamsConfig.BOOTSTRAP_SERVERS_CONFIG — список пар хост/порт, используемых для исходного подключения к кластеру Kafka.

    • StreamsConfig.APPLICATION_ID_CONFIG — уникальный идентификатор потока.

    • StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()), StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()) — библиотеки сериализации и десериализации по умолчанию для пар ключ/значение записи.

  • final StreamsBuilder builder = new StreamsBuilder() — обьявление "cтроителя" потока, определяющего логику приложения при помощи API класса StreamsBuilder:

    • builder.stream("streams-plaintext-input").to("streams-pipe-output") — создание топологии, где исходный поток из топика streams-plaintext-input непрерывно записывается в топик streams-pipe-output.

    • final Topology topology = builder.build() — определение топологии потока, обьявленной "cтроителем" потока.

  • final KafkaStreams streams = new KafkaStreams(topology, props) — создание клиента Streams при помощи API класса KafkaStreams с заданными конфигурациями (props) и топологией (topology).

  • final CountDownLatch latch = new CountDownLatch(1) — логика завершения приложения, определяемая классом CountDownLatch. Вы можете завершить выполнение программы при помощи нажатия Ctrl+C.

  • Вывод на экран состояний Started, Thread started и описания топологии при помощи оператора System. out. println.

Запуск приложения

Приложение запускается в каталоге, где расположен файл pom.xml, то есть /opt/maven/bin/streams.examples/.

Перед запуском приложения необходимо очистить проект и удалить все файлы, созданные предыдущей сборкой, запустив команду:

$ mvn clean package

Результат:

[INFO] Building jar: /opt/apache-maven-3.9.4/bin/streams.examples/target/streams.examples-0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.415s
[INFO] Finished at: Thu Aug 24 07:09:19 UTC 2023
[INFO] Final Memory: 14M/126M
[INFO] ------------------------------------------------------------------------

Далее для запуска приложения необходимо скомпилировать исходный код из файла Pipe.java при помощи команды:

$ mvn compile exec:java -Dexec.mainClass="myapps.Pipe"

В результате успешного запуска приложения выводится сообщение Thread started и описание топологии, которая имеет два узла процессора, узел-источник KSTREAM-SOURCE-0000000000 и узел-приемник KSTREAM-SINK-0000000001 и соответствующие топики, прикрепленные к каждому узлу:

[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ streams.examples ---
Started
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Thread started
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)
      <-- KSTREAM-SOURCE-0000000000

Приложение запущено. Завершение выполнения программы происходит при нажатии Ctrl+C.

Проверка работы созданного приложения

ПРИМЕЧАНИЕ

Для получения дополнительной информации об основных принципах работы в Kafka можно обратиться к статье Начало работы c Kafka.

Для проверки запущенного приложения необходимо:

  1. Для создания топиков, использующихся в приложении, на хосте с установленным брокером Kafka выполнить команды:

    • Создание топика-источника streams-plaintext-input:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
    • Создание топика-приемника streams-pipe-output:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-pipe-output --bootstrap-server sov-test-2.ru-central1.internal:9092
  2. Создать производителя сообщений для топика-источника streams-plaintext-input, используя команду:

    $ /usr/lib/kafka/bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
  3. В другой консоли создать потребителя сообщений из топика-приемника streams-pipe-output, используя команду:

    $ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server sov-test-2.ru-central1.internal:9092
  4. Записывать сообщения в формате ключ/значение в топик-источник streams-plaintext-input и читать сообщения без изменений из топика-приемника streams-pipe-output.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней