Write a Kafka Streams application
This article describes how to create a simple streaming application using Kafka Streams.
Kafka Streams is a client library for building Java and Scala applications where inputs and outputs are stored in Kafka clusters.
To create a Kafka Streams application, Apache Maven is used, a tool for managing and analyzing software projects.
In the example below, the application is created and run on one of the hosts with the pre-installed Kafka service of the ADS cluster.
Prepare to create an application
NOTE
In this section, all installation commands for OpenJDK and Apache Maven are given as reference information for Centos 7 OS. To install software packages on other operating systems, please refer to the documentation of the corresponding operating system. |
Install OpenJDK
OpenJDK is used as a Java development and runtime environment to create a Kafka Streams project using the Apache Maven tool.
Apache Maven version 3.3 and higher requires JDK version 1.7 or higher to be installed.
To install OpenJDK, run the command:
$ sudo yum install java-1.8.0-openjdk
Result:
Complete!
To verify a successful Java installation, run the following command:
$ java -version
With a successful Java installation, the result looks like this:
openjdk version "1.8.0_191" OpenJDK Runtime Environment (build 1.8.0_191-b12) OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
Install the latest version of Apache Maven
It is recommended to use the latest version of Apache Maven to create new applications.
To install the latest version of Apache Maven, do the following:
-
Using the
wget
command, download Apache Maven to the /tmp directory, referring to the Binary tar.gz archive link provided on the page of Downloading Apache Maven:$ wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz -P /tmp
Wait for the download of the archive to complete.
-
Unpack the archive to the /opt directory:
$ sudo tar xf /tmp/apache-maven-3.9.4-bin.tar.gz -C /opt
-
For easy management of Maven versions and updates, it is recommended to create a symbolic link that will point to the Maven installation directory using the command:
$ sudo ln -s /opt/apache-maven-3.9.4 /opt/maven
Set environment variables
In order for Java to access the installed Apache Maven, you need to add the path to the bin directory of the created apache-maven-3.9.4 directory to the PATH
environment variable. This can be done in the following way:
-
Create a script file:
$ sudo vim /etc/profile.d/maven.sh
-
Fill maven.sh with the lines below:
export JAVA_HOME=/usr/lib/jvm/jre-openjdk export M2_HOME=/opt/maven export MAVEN_HOME=/opt/maven export PATH=${M2_HOME}/bin:${PATH}
-
Save and close the file.
-
Give permission to execute the script:
$ sudo chmod +x /etc/profile.d/maven.sh
-
Execute the script using the
source
command:$ source /etc/profile.d/maven.sh
Verify Apache Maven installation
To verify the installation of Apache Maven, use the command:
$ mvn -version
Upon successful installation of Apache Maven, the result looks like this:
Apache Maven 3.9.4 (dfbb324ad4a7c8fb0bf182e6d91b0ae20e3d2dd9) Maven home: /opt/maven Java version: 1.8.0_382, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-1160.95.1.el7.x86_64", arch: "amd64", family: "unix"
Create a Kafka Streams project
Create a project structure
Apache Maven uses archetypes — templates that allow you to create new projects. The Streams Quickstart Java archetype is used to create a Kafka Streams application.
Before creating a project, you need to go to the Apache Maven /bin directory by running the command:
$ cd /opt/maven/bin
To create a Kafka Streams project structure in the /bin directory, you must run the archetype:generate command that creates a project from an archetype:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.8.1 \
-DgroupId=streams.examples \
-DartifactId=streams.examples \
-Dversion=0.1 \
-Dpackage=myapps
The generation options specified with -D
are described below:
-
archetypeGroupId
— archetype group ID (name of the directory where the used archetype is located). -
archetypeArtifactId
— name of the archetype to use. -
archetypeVersion
— required version of the archetype to use.CAUTIONThe archetype version specified with the
archetypeVersion
parameter must match the current Kafka version of the ADS cluster, as defined in Supported services. -
groupId
— group name of the software specified by the user. This is usually set to the name of the root directory. -
artifactId
— description of the contents of the artifact. -
version
— version from which the project creation will start. -
package
— name of the folder to build the project.
Upon successful creation of the project, the result looks like this:
[INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 14.917s [INFO] Finished at: Wed Aug 23 14:16:51 UTC 2023 [INFO] Final Memory: 13M/150M [INFO] ------------------------------------------------------------------------
Project structure
The project is automatically created along the path /opt/maven/bin/streams.examples/.
The tree structure of the created project is shown below:
> tree streams.examples streams-quickstart |-- pom.xml |-- src |-- main |-- java | |-- myapps | |-- LineSplit.java | |-- Pipe.java | |-- WordCount.java |-- resources |-- log4j.properties
Configure pom.xml
In the /opt/maven/bin/streams.examples/ directory, pom.xml is located. It is the XML file containing project information and configuration information used by Maven to build the project.
To create simple Kafka Streams applications, the generated pom.xml is ready to use, and you can also specify other project dependencies, plugins, or targets that you can run, create profiles, descriptions, developers, mailing lists, etc.
Below is the content of the pom.xml generated for Kafka Streams and the description of the tags used.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>streams.examples</groupId>
<artifactId>streams.examples</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Kafka Streams Quickstart :: Java</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.8.1</kafka.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<!-- Apache Kafka dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>
Tags used in pom.xml:
-
project
— base tag containing all information about the project. The header contains information that Maven needs to understand the pom.xml file. ThemodelVersion
tag points to the current version of the POM. These two tags are usually generated automatically and do not need to be changed. -
groupId
,artifactId
,version
— main project parameters, which are determined when generating from the archetype. -
name
— contains the name of the project displayed in the logs. -
properties block — specific settings such as file encoding and Java compiler version used. You can do without this block, then the default settings are used.
-
repositories block — indicates the use of repositories.
-
build block — includes plugins from the archetype.
-
dependencies block — describes all dependencies used in the project. Each must be marked with a
dependency
tag and specified unique identification data:groupId
,artifactId
, andversion
.
Ready made Kafka Streams applications
After creating the project, the /opt/maven/bin/streams.examples/src/main/java/myapps folder contains the generated stub files for running Kafka Streams applications:
-
Pipe.java is a simple application that reads from a source topic
streams-plaintext-input
and writes messages unchanged to the topic-destinationstreams-pipe-output
. -
LineSplit.java is an application that reads from a source
streams-plaintext-input
topic, splits each text line into words, and then writes to thestreams-linesplit-output
topic, where each entry represents one word. -
WordCount.java is an application that reads from a source
streams-plaintext-input
topic, splits each text line into words, and then computes a histogram of word occurrences, and writes to thestreams-wordcount-output
topic, where each entry is an updated count for each word encountered.
The next section discusses the code and running of the Pipe.java application and shows how you can test the application on hosts that have the Kafka service of the ADS cluster preinstalled.
By analogy, you can run and test the LineSplit.java and WordCount.java applications, as well as other applications for Kafka Streams, created by yourself.
Run the Kafka Streams application
Pipe.java code
The code below shows a Pipe application that implements a simple program using the high-level Streams DSL that reads messages from a source streams-plaintext-input
topic and writes them unchanged to a destination streams-pipe-output
topic.
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
System.out.println("Started");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
System.out.println("Thread started");
System.out.println(topology.describe());
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
The Pipe.java code uses:
-
props.put
— Kafka Streams configurations defined by the class API StreamsConfig:-
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
— list of host/port pairs to use to establish the initial connection to the Kafka cluster. -
StreamsConfig.APPLICATION_ID_CONFIG
— unique stream ID. -
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
,StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
— default serialization and deserialization libraries for key/value pairs.
-
-
final StreamsBuilder builder = new StreamsBuilder()
— declaration of a stream builder that defines the application logic using the class API StreamsBuilder:-
builder.stream("streams-plaintext-input").to("streams-pipe-output")
— creation of a topology where the source stream from thestreams-plaintext-input
topic is continuously written to thestreams topic -pipe-output
. -
final Topology topology = builder.build()
— definition of the stream topology declared by the stream builder.
-
-
final KafkaStreams streams = new KafkaStreams(topology, props)
— creation of a Streams client using the class API KafkaStreams with given configurations (props
) and topology (topology
). -
final CountDownLatch latch = new CountDownLatch(1)
— application termination logic defined by class CountDownLatch. You can terminate the program by pressingCtrl+C
. -
Displaying the states
Started
,Thread started
and topology descriptions using theSystem. out. println
.
Run the application
The application is launched in the directory where the pom.xml file is located, that is, /opt/maven/bin/streams.examples/.
Before running the application, you need to clean up the project and remove all files created by the previous build by running the command:
$ mvn clean package
Result:
[INFO] Building jar: /opt/apache-maven-3.9.4/bin/streams.examples/target/streams.examples-0.1.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10.415s [INFO] Finished at: Thu Aug 24 07:09:19 UTC 2023 [INFO] Final Memory: 14M/126M [INFO] ------------------------------------------------------------------------
Next, to run the application, you need to compile the source code from the Pipe.java file using the command:
$ mvn compile exec:java -Dexec.mainClass="myapps.Pipe"
As a result of successful launch of the application, the message Thread started
and a description of the topology are displayed, which has two processor nodes, a source node KSTREAM-SOURCE-0000000000
, a destination node KSTREAM-SINK-0000000001
, and the corresponding topics attached to each node:
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ streams.examples --- Started SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Thread started Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input]) --> KSTREAM-SINK-0000000001 Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
The application is running. The program is terminated by pressing Ctrl+C
.
Check the work of the created application
NOTE
For more information on the fundamentals of working with Kafka, see Quick start with Kafka. |
To check the running application, you need to:
-
Create topics used in the application, on a host with a Kafka broker installed, run the following commands:
-
Create a source topic
streams-plaintext-input
:$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
-
Create a destination topic
streams-pipe-output
:$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic streams-pipe-output --bootstrap-server sov-test-2.ru-central1.internal:9092
-
-
Create a producer for the source topic
streams-plaintext-input
using the command:$ /usr/lib/kafka/bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server sov-test-2.ru-central1.internal:9092
-
In another console, create a message consumer from the
streams-pipe-output
destination using the command:$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server sov-test-2.ru-central1.internal:9092
-
Write messages in the key/value format to the source topic
streams-plaintext-input
and read messages unchanged from the destination topicstreams-pipe-output
.