Write a Kafka Streams application

This article describes how to create a simple streaming application using Kafka Streams.

Kafka Streams is a client library for building Java and Scala applications where inputs and outputs are stored in Kafka clusters.

To create a Kafka Streams application, Apache Maven is used, a tool for managing and analyzing software projects.

In the example below, the application is created and run on one of the hosts with the pre-installed Kafka service of the ADS cluster.

Prepare to create an application

NOTE

In this section, all installation commands for OpenJDK and Apache Maven are given as reference information for Centos 7 OS. To install software packages on other operating systems, please refer to the documentation of the corresponding operating system.

Install OpenJDK

OpenJDK is used as a Java development and runtime environment to create a Kafka Streams project using the Apache Maven tool.

Apache Maven version 3.3 and higher requires JDK version 1.7 or higher to be installed.

To install OpenJDK, run the command:

$ sudo yum install java-1.8.0-openjdk

Result:

Complete!

To verify a successful Java installation, run the following command:

$ java -version

With a successful Java installation, the result looks like this:

openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)

Install the latest version of Apache Maven

It is recommended to use the latest version of Apache Maven to create new applications.

To install the latest version of Apache Maven, do the following:

  1. Using the wget command, download Apache Maven to the /tmp directory, referring to the Binary tar.gz archive link provided on the page of Downloading Apache Maven:

    $ wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz -P /tmp

    Wait for the download of the archive to complete.

  2. Unpack the archive to the /opt directory:

    $ sudo tar xf /tmp/apache-maven-3.9.4-bin.tar.gz -C /opt
  3. For easy management of Maven versions and updates, it is recommended to create a symbolic link that will point to the Maven installation directory using the command:

    $ sudo ln -s /opt/apache-maven-3.9.4 /opt/maven

Set environment variables

In order for Java to access the installed Apache Maven, you need to add the path to the bin directory of the created apache-maven-3.9.4 directory to the PATH environment variable. This can be done in the following way:

  1. Create a script file:

    $ sudo vim /etc/profile.d/maven.sh
  2. Fill maven.sh with the lines below:

    export JAVA_HOME=/usr/lib/jvm/jre-openjdk
    export M2_HOME=/opt/maven
    export MAVEN_HOME=/opt/maven
    export PATH=${M2_HOME}/bin:${PATH}
  3. Save and close the file.

  4. Give permission to execute the script:

    $ sudo chmod +x /etc/profile.d/maven.sh
  5. Execute the script using the source command:

    $ source /etc/profile.d/maven.sh

Verify Apache Maven installation

To verify the installation of Apache Maven, use the command:

$ mvn -version

Upon successful installation of Apache Maven, the result looks like this:

Apache Maven 3.9.4 (dfbb324ad4a7c8fb0bf182e6d91b0ae20e3d2dd9)
Maven home: /opt/maven
Java version: 1.8.0_382, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1160.95.1.el7.x86_64", arch: "amd64", family: "unix"

Create a Kafka Streams project

Create a project structure

Apache Maven uses archetypes — templates that allow you to create new projects. The Streams Quickstart Java archetype is used to create a Kafka Streams application.

Before creating a project, you need to go to the Apache Maven /bin directory by running the command:

$ cd /opt/maven/bin

To create a Kafka Streams project structure in the /bin directory, you must run the archetype:generate command that creates a project from an archetype:

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.8.1 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=myapps

The generation options specified with -D are described below:

  • archetypeGroupId — archetype group ID (name of the directory where the used archetype is located).

  • archetypeArtifactId — name of the archetype to use.

  • archetypeVersion — required version of the archetype to use.

    CAUTION

    The archetype version specified with the archetypeVersion parameter must match the current Kafka version of the ADS cluster, as defined in Supported services.

  • groupId — group name of the software specified by the user. This is usually set to the name of the root directory.

  • artifactId — description of the contents of the artifact.

  • version — version from which the project creation will start.

  • package — name of the folder to build the project.

Upon successful creation of the project, the result looks like this:

[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.917s
[INFO] Finished at: Wed Aug 23 14:16:51 UTC 2023
[INFO] Final Memory: 13M/150M
[INFO] ------------------------------------------------------------------------

Project structure

The project is automatically created along the path /opt/maven/bin/streams.examples/.

The tree structure of the created project is shown below:

> tree streams.examples
    streams-quickstart
    |-- pom.xml
    |-- src
        |-- main
            |-- java
            |   |-- myapps
            |       |-- LineSplit.java
            |       |-- Pipe.java
            |       |-- WordCount.java
            |-- resources
                |-- log4j.properties

Configure pom.xml

In the /opt/maven/bin/streams.examples/ directory, pom.xml is located. It is the XML file containing project information and configuration information used by Maven to build the project.

To create simple Kafka Streams applications, the generated pom.xml is ready to use, and you can also specify other project dependencies, plugins, or targets that you can run, create profiles, descriptions, developers, mailing lists, etc.

Below is the content of the pom.xml generated for Kafka Streams and the description of the tags used.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>streams.examples</groupId>
    <artifactId>streams.examples</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Kafka Streams Quickstart :: Java</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.8.1</kafka.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>


        <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <!-- Apache Kafka dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>
</project>

Tags used in pom.xml:

  • project — base tag containing all information about the project. The header contains information that Maven needs to understand the pom.xml file. The modelVersion tag points to the current version of the POM. These two tags are usually generated automatically and do not need to be changed.

  • groupId, artifactId, version — main project parameters, which are determined when generating from the archetype.

  • name — contains the name of the project displayed in the logs.

  • properties block — specific settings such as file encoding and Java compiler version used. You can do without this block, then the default settings are used.

  • repositories block — indicates the use of repositories.

  • build block — includes plugins from the archetype.

  • dependencies block — describes all dependencies used in the project. Each must be marked with a dependency tag and specified unique identification data: groupId, artifactId, and version.

Ready made Kafka Streams applications

After creating the project, the /opt/maven/bin/streams.examples/src/main/java/myapps folder contains the generated stub files for running Kafka Streams applications:

  • Pipe.java is a simple application that reads from a source topic streams-plaintext-input and writes messages unchanged to the topic-destination streams-pipe-output.

  • LineSplit.java is an application that reads from a source streams-plaintext-input topic, splits each text line into words, and then writes to the streams-linesplit-output topic, where each entry represents one word.

  • WordCount.java is an application that reads from a source streams-plaintext-input topic, splits each text line into words, and then computes a histogram of word occurrences, and writes to the streams-wordcount-output topic, where each entry is an updated count for each word encountered.

The next section discusses the code and running of the Pipe.java application and shows how you can test the application on hosts that have the Kafka service of the ADS cluster preinstalled.

By analogy, you can run and test the LineSplit.java and WordCount.java applications, as well as other applications for Kafka Streams, created by yourself.

Run the Kafka Streams application

Pipe.java code

The code below shows a Pipe application that implements a simple program using the high-level Streams DSL that reads messages from a source streams-plaintext-input topic and writes them unchanged to a destination streams-pipe-output topic.

Pipe
package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        System.out.println("Started");
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();


        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
	});

	try {
            System.out.println("Thread started");
            System.out.println(topology.describe());
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
	System.exit(0);
    }
}

The Pipe.java code uses:

  • props.put — Kafka Streams configurations defined by the class API StreamsConfig:

    • StreamsConfig.BOOTSTRAP_SERVERS_CONFIG — list of host/port pairs to use to establish the initial connection to the Kafka cluster.

    • StreamsConfig.APPLICATION_ID_CONFIG — unique stream ID.

    • StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()), StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()) — default serialization and deserialization libraries for key/value pairs.

  • final StreamsBuilder builder = new StreamsBuilder() — declaration of a stream builder that defines the application logic using the class API StreamsBuilder:

    • builder.stream("streams-plaintext-input").to("streams-pipe-output") — creation of a topology where the source stream from the streams-plaintext-input topic is continuously written to the streams topic -pipe-output.

    • final Topology topology = builder.build() — definition of the stream topology declared by the stream builder.

  • final KafkaStreams streams = new KafkaStreams(topology, props) — creation of a Streams client using the class API KafkaStreams with given configurations (props) and topology (topology).

  • final CountDownLatch latch = new CountDownLatch(1) — application termination logic defined by class CountDownLatch. You can terminate the program by pressing Ctrl+C.

  • Displaying the states Started, Thread started and topology descriptions using the System. out. println.

Run the application

The application is launched in the directory where the pom.xml file is located, that is, /opt/maven/bin/streams.examples/.

Before running the application, you need to clean up the project and remove all files created by the previous build by running the command:

$ mvn clean package

Result:

[INFO] Building jar: /opt/apache-maven-3.9.4/bin/streams.examples/target/streams.examples-0.1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.415s
[INFO] Finished at: Thu Aug 24 07:09:19 UTC 2023
[INFO] Final Memory: 14M/126M
[INFO] ------------------------------------------------------------------------

Next, to run the application, you need to compile the source code from the Pipe.java file using the command:

$ mvn compile exec:java -Dexec.mainClass="myapps.Pipe"

As a result of successful launch of the application, the message Thread started and a description of the topology are displayed, which has two processor nodes, a source node KSTREAM-SOURCE-0000000000, a destination node KSTREAM-SINK-0000000001, and the corresponding topics attached to each node:

[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ streams.examples ---
Started
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Thread started
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)
      <-- KSTREAM-SOURCE-0000000000

The application is running. The program is terminated by pressing Ctrl+C.

Check the work of the created application

NOTE

For more information on the fundamentals of working with Kafka, see Quick start with Kafka.

To check the running application, you need to:

  1. Create topics used in the application, on a host with a Kafka broker installed, run the following commands:

    • Create a source topic streams-plaintext-input:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
    • Create a destination topic streams-pipe-output:

      $ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-pipe-output --bootstrap-server sov-test-2.ru-central1.internal:9092
  2. Create a producer for the source topic streams-plaintext-input using the command:

    $ /usr/lib/kafka/bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
  3. In another console, create a message consumer from the streams-pipe-output destination using the command:

    $ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server sov-test-2.ru-central1.internal:9092
  4. Write messages in the key/value format to the source topic streams-plaintext-input and read messages unchanged from the destination topic streams-pipe-output.

Found a mistake? Seleсt text and press Ctrl+Enter to report it