Build a streaming ETL pipeline with Flink

Overview

This section shows how to build a streaming ETL pipeline using the Flink service (ADH) and other Arenadata products. At the heart of the demo pipeline is a Flink application that analyzes a data stream (Kafka) in real time, extracts specific messages, and forwards them to another data stream based on a condition.

The pipeline flow is visualized in the following diagram.

Flink pipeline
Streaming ETL pipeline
Flink pipeline
Streaming ETL pipeline

The major steps are highlighted below:

  1. A PostgreSQL (ADPG) server stores the transactions table like the one below:

     txn_id | acc_id |   txn_value  |      txn_date       |  txn_type
    --------+--------+-----------+---------------------+------------
          1 |   1001 |       130.00 | 2025-07-01 10:15:00 | spb
          2 |   1002 |       -50.00 | 2025-07-02 09:45:00 | withdrawal
          3 |   1002 |       -50.25 | 2025-07-01 11:00:00 | withdrawal
         ...|    ... |          ... |                 ... |       ...

    The table continuously receives INSERT/UPDATE/DELETE commands from external clients.

  2. An ADS cluster is hosting a Kafka server. The ADS cluster also has a CDC plugin (Debezium) configured to monitor the changes made to the transactions table. The plugin polls the ADPG’s write-ahead log (WAL) for new entries and submits each change to the Kafka queue as a separate message.

  3. In the ADH cluster, a Flink application subscribes to the Kafka topic using the Flink connector for Kafka. Flink analyzes each message from the topic, and if a specific condition is met (txn_type=withdrawal), sends the message to a special Kafka queue.

    NOTE
    In the example pipeline described in this article, Flink performs a basic check by looking for a specific substring in the message body. In real-life pipelines, Flink can be used for much deeper analysis of the data streams and is capable of revealing non-obvious patterns using stateful processing.

The pipeline demonstrated in this article is built using the following Arenadata software stack.

Product Version

Arenadata Postgres (ADPG)

16.3.4

Arenadata Streaming (ADS)

3.7.2.1

Arenadata Streaming Control (ADS Control)

2.5.0

Arenadata Hyperwave (ADH)

4.0.0

Prepare Arenadata products

The sections below show how to install and configure the Arenadata products involved in the pipeline flow from scratch:

ADPG

  1. Install an ADPG cluster according to the Online installation guide.

  2. Ensure the ADPG service uses the default port number (5432) for incoming connections.

  3. Create the demo_user role. For this, run psql on the host with the ADPG service:

    $ sudo su - postgres
    $ psql

    And execute the command:

    CREATE ROLE demo_user WITH LOGIN SUPERUSER PASSWORD 'password';
  4. In the installed ADPG cluster, create the demo_db database. For this, run in psql:

    CREATE DATABASE demo_db OWNER demo_user;
  5. In the demo_db database, create the transactions table in the public schema:

    \c demo_db
    CREATE TABLE transactions (
        txn_id     BIGINT,
        acc_id     INTEGER,
        txn_value  DOUBLE PRECISION,
        txn_date   TIMESTAMP,
        txn_type   TEXT
    );
  6. Modify the pg_hba.conf configuration to allow the access to ADPG. For this, add the following line to the PG_HBA field on the configuration page of the ADPG service:

    host demo_db demo_user all trust
    NOTE
    This line allows TCP/IP connections to ADPG from any IP address, which is insecure for a production environment. For better security, specify explicit IP addresses of hosts that will connect to the ADPG server.
  7. Set the wal_level=logical configuration parameter. This adds information required to support logical decoding to the log, which is required by Debezium. To set the parameter, add an entry in the postgresql.conf custom section field on the configuration parameters page of the ADPG service.

Debezium permissions

 

To allow Debezium to interact with ADPG, ensure that the corresponding PostgreSQL role used by Debezium has the following privileges:

  • REPLICATION

  • LOGIN

  • CREATE

  • SELECT

In this article, the role with the SUPERUSER permissions includes all the required privileges, but it is recommended that you use this role only for testing. For better security, it is recommended to grant only necessary privileges.

More information on setting up user privileges for Debezium can be found in the Debezium documentation.

ADS and ADS Control

  1. Install an ADS cluster according to the Online installation guide. The cluster must include the ZooKeeper, Kafka, and Kafka Connect services.

  2. Enable the auto.create.topics.enable Kafka configuration parameter in the server.properties group. This is required to allow Debezium to create Kafka topics automatically.

  3. Install an ADS Control cluster according to the instructions.

  4. Integrate ADS Control with the ADS cluster according to the instructions.

  5. In the ADS Control web UI, create a Debezium connector for the ADPG server according to the instructions. The Debezium connector used in this scenario has the following JSON configuration:

    {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "publication.autocreate.mode": "filtered",
      "database.user": "demo_user", (1)
      "database.dbname": "demo_db", (2)
      "tasks.max": 1,
      "database.port": 5432,
      "plugin.name": "pgoutput",
      "topic.prefix": "postgres",
      "database.hostname": "10.92.43.174", (3)
      "database.password": "password", (4)
      "name": "PostgresConnector",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
    1 The PostgreSQL role for Debezium to connect to ADPG.
    2 The ADPG database to monitor for changes.
    3 The address of the ADPG server. Replace with your ADPG server IP address.
    4 The password to access the ADPG server.

As a result, Debezium reads the ADPG WAL file and produces messages to the postgres.public.transactions Kafka topic. An example message is below:

Struct{after=Struct{txn_id=1,acc_id=1001,txn_value=250.0,txn_date=1751364900000000,txn_type=deposit},source=Struct{version=2.7.0.Final,connector=postgresql,name=postgres,ts_ms=1752485024943,snapshot=first,db=demo_db,sequence=[null,"29241720"],ts_us=1752485024943896,ts_ns=1752485024943896000,schema=public,table=transactions,txId=758,lsn=29241720},op=r,ts_ms=1752485025133,ts_us=1752485025133518,ts_ns=1752485025133518357}

ADH

Install an ADH cluster according to the Online installation guide. The ADH cluster must include the following services:

  • Core configuration

  • HDFS

  • YARN

  • ZooKeeper

  • Flink

The following steps show how to build a Flink application in Java using Maven.

NOTE
For more information on running Flink applications in Python, refer to the PyFlink usage examples article.

Install Maven (if not yet installed):

  1. Download Maven:

    $ wget https://dlcdn.apache.org/maven/maven-3/3.9.10/binaries/apache-maven-3.9.10-bin.tar.gz -P /tmp
  2. Unpack the archive:

    $ sudo tar -zxf /tmp/apache-maven-3.9.10-bin.tar.gz -C /opt
  3. Check the Maven version:

    $ /opt/apache-maven-3.9.10/bin/mvn -version

    The output:

    Apache Maven 3.9.10 (5f519b97e944483d878815739f519b2eade0a91d)
    Maven home: /opt/apache-maven-3.9.10
    Java version: 1.8.0_412, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64/jre
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "3.10.0-1160.59.1.el7.x86_64", arch: "amd64", family: "unix"

Once Maven is installed, create the project:

  1. Create a new Maven project with the following structure:

    flink-app-demo
    │   pom.xml
    └───src
        └───main
            └───java
                └───io
                    └───arenadata
                           └─KafkaFlinkApp.java

    The example pom.xml is presented below.

    pom.xml

     

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>io.arenadata</groupId>
      <artifactId>flink-app-demo</artifactId>
      <version>1.0-SNAPSHOT</version>
      <packaging>jar</packaging>
      <name>Flink-app-demo</name>
    
      <properties>
        <flink.version>1.20.1</flink.version>
        <kafka.connector.version>3.3.0-1.20</kafka.connector.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
          <version>${kafka.connector.version}</version>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.17.1</version>
        </dependency>
      </dependencies>
    </project>
    NOTE

    This scenario shows how to build and run a Flink JAR in the ADH cluster, which provides core libraries, mandatory for Flink operation. Depending on the Flink version, running the project in an IDE (outside of ADH cluster) may result in java.lang.NoClassDefFoundError exceptions due to lack of dependencies. To test your Flink application in IDE outside the ADH cluster, be sure to include core Flink dependencies in your pom.xml. An example of such "fat" pom.xml is below.

    pom.xml for running in IDE

     

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>io.arenadata</groupId>
      <artifactId>flink-app-demo</artifactId>
      <version>1.0-SNAPSHOT</version>
      <packaging>jar</packaging>
      <name>Flink-app-demo</name>
    
      <properties>
        <flink.version>1.20.1</flink.version>
        <kafka.connector.version>3.3.0-1.20</kafka.connector.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
      </properties>
    
    
        <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
      <dependencies>
        <!-- Flink Streaming API -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <!-- Flink Kafka Connector -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
          <version>${kafka.connector.version}</version>
        </dependency>
    
        <!-- Kafka client (optional, provided by connector) -->
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.7.2</version>
        </dependency>
    
        <!-- Flink Core (optional) -->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-core</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <!-- Logging (required for output visibility) -->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.36</version>
        </dependency>
    
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.17.1</version>
        </dependency>
    
        <!-- To make it work in IntelliJ IDEA: -->
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web</artifactId>
          <version>${flink.version}</version>
          <optional>true</optional>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-test-utils</artifactId>
          <version>${flink.version}</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    </project>
  2. Paste the following code to KafkaFlinkApp.java:

    package io.arenadata;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class KafkaFlinkApp {
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            KafkaSource<String> src_kafka_transactions = KafkaSource.<String>builder() (1)
                    .setBootstrapServers("ka-ads-1.ru-central1.internal:9092,ka-ads-2.ru-central1.internal:9092,ka-ads-3.ru-central1.internal:9092") (2)
                    .setTopics("postgres.public.transactions") (3)
                    .setGroupId("flink-consumer-group")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .build();
    
            DataStream<String> stream = env.fromSource(src_kafka_transactions, (4)
                    WatermarkStrategy.noWatermarks(),
                    "Kafka Source");
    
    
            DataStream<String> filtered = stream.filter(msg -> { (5)
                boolean isWithdrawal = msg.contains("txn_type=withdrawal");
                if (isWithdrawal) {
                    System.out.println("Withdrawal detected in message: " + msg);
                }
                return isWithdrawal;
            });
    
            KafkaSink<String> sink_kafka_withdrawals = KafkaSink.<String>builder() (6)
                    .setBootstrapServers("ka-ads-1.ru-central1.internal:9092,ka-ads-2.ru-central1.internal:9092,ka-ads-3.ru-central1.internal:9092")
                    .setRecordSerializer(
                            KafkaRecordSerializationSchema.<String>builder()
                                    .setTopic("postgres.public.transactions.withdrawals")
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build())
                    .build();
    
            filtered.sinkTo(sink_kafka_withdrawals); (7)
    
            env.execute("My Flink demo app"); (8)
        }
    }
    1 Specifies a Kafka server as a source of input data.
    2 A list of Kafka broker addresses to connect to the ADS cluster. The up-to-date connection string can be found in ADCM (Clusters → <ADS_cluster> → Services → Kafka → Info).
    3 A Kafka topic to subscribe to. Debezium automatically creates this topic and pushes messages with information about the changes in the ADPG table.
    4 Creates a Flink DataStream object based on the Kafka source.
    5 Creates another DataStream that includes only the messages containing txn_type=withdrawal. In real-life pipelines, Flink can perform more complicated analysis of a DataStream using advanced Flink API tools (for example, detecting X subsequent transactions committed by the same account within a specific time window).
    6 Specifies a Kafka sink to send the filtered stream.
    7 Directs the filtered stream to ADS.
    8 Runs the application.
  3. Compile the application:

    $ cd flink-app-demo
    $ /opt/apache-maven-3.9.10/bin/mvn clean package

Run the pipeline

  1. Restart the ADPG, ADS, ADS Control, and ADH clusters.

  2. On an ADH host with the Flink client component, run the Flink application:

    $ flink run flink-app-demo/target/flink-app-demo-1.0-SNAPSHOT.jar

    This submits the JAR to the Flink JobManager and further execution is performed by TaskManagers. The sample output:

    Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    Job has been submitted with JobID c917cc83500f4866e3aeb67d1d17dbf7
  3. Verify that the application has started successfully by running flink list or by using Flink web UI.

    Sample output:

    [konstantin@ka-adh-2 ~]$ flink list
    ------------------ Running/Restarting Jobs -------------------
    14.07.2025 12:36:13 : 57ff8e66a5a76cf80da5b760b44ac0ad : My Flink demo app (RUNNING)
    --------------------------------------------------------------
    No scheduled jobs.
  4. Write some data to the test ADPG table. For this, run psql on the ADPG server host:

    $ sudo su - postgres
    $ psql

    Connect to the test database and insert some records:

    \c demo_db
    INSERT INTO public.transactions VALUES
    (1, 1002,  130.00, '2025-07-14 14:21:00', 'withdrawal');
    (2, 1001,  50.00, '2025-07-14 14:22:00', 'deposit');
    (3, 1002,  50.25, '2025-07-14 14:23:00', 'spb');
    (4, 1003,  75.15, '2025-07-14 14:24:00', 'withdrawal');
  5. In the ADS Control web UI, go to Clusters → <ADS_cluster> → Topics → postgres.public.transactions.withdrawals → Messages as shown below.

    ADS Control web UI
    ADS Control web UI
    ADS Control web UI
    ADS Control web UI

    After a short delay, two new messages filtered by Flink (txn_type=withdrawal) appear in the postgres.public.transactions.withdrawals topic. Verify the messages' content.

  6. Stop the Flink application manually. For this, run the following commands on any ADH host with the Flink client component.

    Get the Flink application ID:

    $ flink list

    Cancel the application:

    $ flink cancel <application_id>

    Sample output:

    Cancelling job c917cc83500f4866e3aeb67d1d17dbf7.
    Cancelled job c917cc83500f4866e3aeb67d1d17dbf7.
Found a mistake? Seleсt text and press Ctrl+Enter to report it