Iceberg connector

Iceberg Sink Connector overview

Apache Iceberg Sink Connector — sink connector for writing data from Kafka to Iceberg tables.

Using Iceberg Sink Connector provides the following benefits:

  • Message coordination for centralized Iceberg commits.

  • Ensuring exactly once delivery semantic.

  • Ability to fork across multiple tables.

  • Automatic table creation and schema evolution.

  • Field name mapping using Iceberg column mapping functionality.

To illustrate the benefits of the connector, the article shows an implementation of a full pipeline using Kafka Connect — capturing change data (CDC) in PostgreSQL database tables with writing changes to an Iceberg table.

Prerequisites

The environment used to create a complete CDC pipeline is described below.

ADS and ADS Control

  • The ADS cluster is installed according to the Online installation guide. The minimum ADS version is 3.7.2.1.b1.

  • Kafka, Kafka Connect, and Schema-Registry services are installed in the ADS cluster. Schema-Registry Server IP address is 10.92.42.130.

  • To automatically create a Kafka topic, the auto.create.topics.enable parameter is enabled in the server.properties group when configuring the Kafka service.

  • When configuring the Kafka Connect service, a new value /usr/lib/kafka-connect/plugins has been added to the plugin.path parameter group in the connect-distributed.properties parameter as shown below.

    Configuring the path to the Iceberg Sink Connector plugin
    Configuring the path to the Iceberg Sink Connector plugin

    Setting the plugin path is required only for ADS 3.7.2.1.b1.

  • The configuration of the ADH cluster used in the pipeline is imported into the ADS cluster.

    To enable import, on the Import tab of the ADS cluster, select Cluster configuration next to the name of the ADH cluster and click Import.

    Import of ADH data
    Import of ADH data
  • The ADS Control cluster is installed according to the guide Get started with Arenadata Streaming Control and integrated with the ADS cluster being used.

ADPG

  • The ADPG cluster is installed according to the Online installation guide.

  • PostgreSQL server IP address (a host with the ADPG service) is 10.92.43.42. For incoming connections, the default port number is 5432.

  • To create a Debezium connector, similar to the one described in the Debezium Connector for PostgreSQL server article, the ADPG cluster settings are set via ADCM UI (configuring access to the database from the Kafka Connect host in the pg_hba.conf file and the wal_level parameter in the postgresql.conf file).

Below is an example of creating and configuring a PostgreSQL database (in the ADPG cluster).

Configure a PostgreSQL database

Create a database:

CREATE DATABASE my_database;

Connect to the database:

\c my_database

Create a user with the SUPERUSER role:

CREATE USER my_user WITH SUPERUSER PASSWORD 'P@ssword';
CAUTION
The SUPERUSER user role is used for testing purposes only. For more information, see Configuring permissions for the Debezium connector.

Create a test table:

CREATE TABLE my_table (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    country VARCHAR(100),
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Configure replica identification for a table to enable the use of BIGSERIAL PRIMARY KEY indexes on the subscriber side for row lookups:

ALTER TABLE my_table REPLICA IDENTITY FULL;

Fill the table with data (given as an example, several lines are needed for clarity):

INSERT INTO my_table(name, country) VALUES ('John Jones','USA');
NOTE

ADH

  • The ADH cluster is installed according to the Online installation guide. The minimum ADH version is 3.7.2.1.b1.

  • HDFS, ADPG, YARN, Hive, and HUE services are installed in the ADH cluster. Hive server IP (Hive Metastore component) is 10.92.43.153.

  • In ADH, on each host, you need to create a kafka user, which must have rights to create tables in the HDFS storage (be part of the hadoop group).

Iceberg Sink Connector with Kerberos

 
Starting from ADS 3.9.0.1.b1, the Iceberg connector can be connected to an ADH cluster where Kerberos authentication is enabled.

Also, when Kerberos authentication is enabled for connecting to the Hive service from ADH, SSL activation is required.

On the side of the ADS and ADS Control clusters, the corresponding settings should also be made:

For trust among the ADS, ADS Control, and ADH clusters, the following conditions should be met:

  • The same Kerberos realm is used for all clusters during the enabling of Kerberos authentication.

  • Clusters mutually trust the SSL certificates of each other’s hosts, that is, the truststore of each host in one cluster contains the *.crt certificate of the each host in the other cluster.

The /etc/hive/conf/hive-site.xml file should also be copied from the ADH host, where the Hive component is installed, to the /usr/lib/kafka/config/ directory on the ADS host with the Kafka Connect component.

Create a Debezium connector for PostgreSQL server

To capture row-level data changes in PostgreSQL database tables using ADS Control, create a Debezium connector similar to the one described in the article Debezium connector for PostgreSQL server.

Below is an example Debezium connector configuration with a description of the new parameters used in the data pipeline.

Example of the contents of the Debezium connector configuration JSON file for PostgreSQL server
{
  "name": "cdc-my-adpg",
  "topic.prefix": "cdc__demo",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "publication.autocreate.mode": "filtered",
  "database.user": "my_user",
  "database.dbname": "my_database",
  "tasks.max": 1,
  "database.port": 5432,
  "plugin.name": "pgoutput",
  "database.hostname": "10.92.43.42",
  "database.password": "P@ssword",
  "table.include.list": ["public.my_table"],

  "value.converter.schema.registry.url": "http://10.92.42.130:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://10.92.42.130:8081",
  "key.converter": "io.confluent.connect.avro.AvroConverter",

  "transforms": ["unwrap"],
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields": "op,source.schema,source.table,source.ts_ms,ts_ms",
  "transforms.unwrap.add.headers": "db",
  "transforms.unwrap.delete.handling.mode": "rewrite",

  "slot.name": "cdc__my_database",
  "schema.evolution": "basic"
}
Attribute Description

value.converter.schema.registry.url

Schema-Registry server address

key.converter.schema.registry.url

Schema-Registry server address

transforms.unwrap.type

Specifies the class used for the unwrap transformation

transforms.unwrap.add.fields

Additional fields to add to the data

transforms.unwrap.add.headers

Defines whether to add metadata to the Kafka message header

transforms.unwrap.delete.handling.mode

Defines how to handle deletion events. rewrite ensures that information about deleted records is preserved and transmitted in the Kafka message

table.include.list

Optional list of comma-separated regular expressions that match the table IDs of the tables whose changes are to be captured. If this property is set, the connector captures changes only from the specified tables. Each ID must be of the form <schemaName>.<tableName>, where:

  • <schemaName> — name of the database schema in which the change event occurred;

  • <tableName> — name of the database table in which the change event occurred.

slot.name

Name of the PostgreSQL logical decoding slot created for streaming. The server uses this slot to stream events to the Debezium connector. Must be unique for each connector connecting to the database

schema.evolution

Specifies the schema evolution mode. When set to basic, the connector automatically detects fields that are in the event payload (new changes) but do not exist in the target table, and modifies the table

Create an Iceberg connector

To create an Iceberg connector via ADS Control, the connector plugin IcebergSinkConnector is used.

To create connectors using the ADS Control, you need to:

  1. Go to the Kafka Connects page in the ADS Control web interface. The Kafka Connects page becomes available after selecting a cluster in the cluster management section and going to the desired tab on the General page.

  2. Select the desired cluster and go to the Kafka Connect instance overview page.

  3. Click Create Connector on the Kafka Connect instance overview page. After clicking Create Connector, the window for selecting the connector plugin Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins opens.

  4. Select the desired connector to create.

    Selecting a plugin to create a connector
    Selecting a plugin to create a connector
    Selecting a plugin to create a connector
    Selecting a plugin to create a connector
  5. Fill in the connector configuration parameters. If necessary, use the parameter information:

    You can fill in the configuration in the form of a JSON file. To do this, enable the JSON view switch.

    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Example contents of a JSON file with an Iceberg connector configuration
    {
      "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.cdc-field": "__op",
      "tasks.max": 1,
      "topics": [
        "cdc__demo.public.my_table"
      ],
      "iceberg.tables.upsert-mode-enabled": true,
      "iceberg.control.commit.interval-ms": 30000,
      "iceberg.catalog.uri": "thrift://10.92.43.153:9083",
      "iceberg.connect.hdfs.keytab": "/etc/security/keytabs/kafka-connect.service.keytab",
      "iceberg.tables.auto-create-enabled": true,
      "value.converter.schema.registry.url": "http://10.92.42.130:8081",
      "iceberg.connect.hdfs.principal": "kafka-connect/sov-ads-6.ru-central1.internal@ADS-KAFKA.LOCAL",
      "iceberg.tables": [
        "default.my_table"
      ],
      "name": "iceberg-sink-cdc-connector",
      "iceberg.tables.schema-case-insensitive": true,
      "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "iceberg.tables.default-id-columns": "id",
      "iceberg.hdfs.authentication.kerberos": true,
      "iceberg.catalog.type": "hive",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://10.92.42.130:8081",
      "iceberg.write.format.default": "parquet",
      "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/"
    }
    Attribute Description

    name

    Name of the connector that will be used in the Kafka Connect service

    connector.class

    Class name for the connector

    tasks.max

    Maximum number of tasks to create

    topics

    Name of the topic whose data should be passed to the Iceberg tables. For the CDC pipeline implementation, the topic name has the form <topic.prefix>.<table.include.list>, consisting of the parameters of the Debezium connector created above

    iceberg.tables

    Comma-separated list of target tables

    iceberg.write.format.default

    Default file format for a table: parquet, avro, or orc

    iceberg.tables.auto-create-enabled

    Set to true to automatically create target tables

    iceberg.tables.schema-case-insensitive

    Set to true to search for table columns by name in a case-insensitive manner

    iceberg.tables.upsert-mode-enabled

    Set the value to true to enable UPSERT mode

    iceberg.tables.cdc-field

    The field of the source record that identifies the type of operation (INSERT, UPDATE, or DELETE) when UPSERT mode is enabled

    iceberg.tables.default-id-columns

    A list of default comma-separated columns that define the identifier row in tables (primary key). Is a mandatory parameter when UPSERT mode is enabled

    iceberg.catalog.type

    Type of a catalog for storing Iceberg tables

    iceberg.catalog.uri

    Address to connect to the catalog with Iceberg tables (address of the host with the Hive Metastore component installed)

    iceberg.hdfs.authentication.kerberos

    Flag for enabling/disabling HDFS Kerberos authentication (disabled by default). Used when Kerberos authentication is enabled in ADS and ADH. Available starting from ADS 3.9.0.1.b1

    iceberg.connect.hdfs.principal

    The name of the Kerberos principal to be used in authentication. It is a mandatory parameter if Kerberos authentication is enabled. Available starting from ADS 3.9.0.1.b1

    iceberg.connect.hdfs.keytab

    The path to the keytab file for Kerberos, which contains the key for the principal. It is a mandatory parameter if Kerberos authentication is enabled. Available starting from ADS 3.9.0.1.b1

    iceberg.hadoop-conf-dir

    Path to Kafka broker configuration files

    iceberg.catalog.warehouse

    Path to metadata storage

    key.converter

    Converter type for message key

    key.converter.schema.registry.url

    Schema-Registry server address

    value.converter

    Converter type for message value

    value.converter.schema.registry.url

    Schema-Registry server address

    iceberg.control.commit.interval-ms

    Interval between commits in ms

  6. After filling, click Save and get a message about the successful creation of the connector.

    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
  7. Check that the opened page displays the created connector in the working status.

    Created connector
    Created connector
    Created connector
    Created connector

    If after creating the connector the task is created with an error, the contents of the error can be seen after clicking restart dark restart light, located in the Status field of the task.

PostgreSQL database table data in ADS Control

The Topics page of the ADS Control user interface displays a topic with all PostgreSQL table fields and their contents created by the Debezium connector.

Topic created by the connector
Topic created by the connector
Topic created by the connector
Topic created by the connector

The ADS Control user interface page Schema-Registry displays the schemas created in Schema-Registry for table keys and values.

Created schemas
Created schemas
Created schemas
Created schemas

The schemas tab displays a description of all table fields, including fields added using the transforms.unwrap.add.fields parameter when creating the Debezium connector.

Schema for the PostgreSQL table
Schema for the PostgreSQL table
Schema for the PostgreSQL table
Schema for the PostgreSQL table

As a result of creating the data pipeline, in the connectors table on the Kafka Connect instance overview page, both connectors created have the same topic name in the Topic column.

Table of created connectors in Kafka Connect
Connectors table with topic name specified
Table of created connectors in Kafka Connect
Connectors table with topic name specified

Iceberg tables

To test change collection using the Debezium connector, make changes to the PostgreSQL table:

INSERT INTO my_table(name, country) VALUES ('NAME','COUNTRY');

After connecting to the user interface of the HUE service from the ADH cluster, you can view Iceberg tables created by the connector.

Iceberg tables in the HUE interface
Iceberg tables in the HUE interface
.Iceberg tables in the HUE interface
Iceberg tables in the HUE interface

UPSERT mode

Starting from ADS 3.9.0.1.b1, there is an option to specify parameters to enable and configure the UPSERT mode in the Iceberg Sink Connector configuration.

After enabling the UPSERT mode (using the connector parameter iceberg.tables.upsert-mode-enabled), any changes to the Iceberg tables are made in accordance with the following rules:

  • Each operation is an UPSERT — a combination of DELETE and INSERT operations. The row is updated in the Iceberg table with the same identifier as in the incoming record. If the row is not found, a record is simply added. This allows for inserting, updating, or deleting records in a single operation without overwriting data.

  • If there is a field in the Kafka record indicating the type of operation being performed (INSERT, UPDATE, or DELETE) and this operation type is specified as the value of the connector parameter iceberg.tables.cdc-field, then these operations are executed independently of each other (as they would have been performed without enabling the UPSERT mode). When a new record is attempted, the connector will match the value of the Kafka record operation field with the value of the iceberg.tables.cdc-field parameter, and if they match, it will apply this operation to the incoming row. The function for specifying the type of operation can be used when retrieving data from CDC platforms.

    Users can also configure the values of the iceberg.tables.cdc-field parameter, corresponding to various operations, by applying the following options:

    • iceberg.tables.cdc.ops.insert

    • iceberg.tables.cdc.ops.update

    • iceberg.tables.cdc.ops.delete

  • If a Kafka record contains a field specifying the type of operation being performed (INSERT, UPDATE, or DELETE), and this operation type is listed as the value for the connector parameter iceberg.tables.cdc.ops.ignored, then the connector will simply ignore the incoming record.

NOTE

Currently, the TRUNCATE TABLE operation is not supported due to its impact at the table level and the specifics of the internal architecture of the connector.

The parameters of the UPSERT mode are described below, as well as the version of ADS starting from which these parameters can be applied.

UPSERT mode parameters
Parameter Description Default value ADS version

iceberg.tables.upsert-mode-enabled

Set the value to true to enable UPSERT mode

false

3.9.0.1.b1

iceberg.tables.cdc-field

The field of the source record that identifies the type of operation (INSERT, UPDATE, or DELETE)

__op

3.9.0.1.b1

iceberg.tables.default-id-columns

A list of default comma-separated columns that define the identifier row in tables (primary key). Is a mandatory parameter

 — 

3.9.0.1.b1

iceberg.tables.cdc.ops.insert

Comma-separated values of the CDC operation field that correspond to INSERT

r,c

3.9.1.1.b1

iceberg.tables.cdc.ops.update

Comma-separated values of the CDC operation field that correspond to UPDATE

u

3.9.1.1.b1

iceberg.tables.cdc.ops.delete

Comma-separated values of the CDC operation field that correspond to DELETE

d

3.9.1.1.b1

iceberg.tables.cdc.ops.ignored

Comma-separated values of the CDC operation field that should be ignored by the connector

t,m

3.9.1.1.b1

Found a mistake? Seleсt text and press Ctrl+Enter to report it