Iceberg connector
Iceberg Sink Connector overview
Apache Iceberg Sink Connector — sink connector for writing data from Kafka to Iceberg tables.
Using Iceberg Sink Connector provides the following benefits:
-
Message coordination for centralized Iceberg commits.
-
Ensuring exactly once delivery semantic.
-
Ability to fork across multiple tables.
-
Automatic table creation and schema evolution.
-
Field name mapping using Iceberg column mapping functionality.
To illustrate the benefits of the connector, the article shows an implementation of a full pipeline using Kafka Connect — capturing change data (CDC) in PostgreSQL database tables with writing changes to an Iceberg table.
Prerequisites
The environment used to create a complete CDC pipeline is described below.
ADS and ADS Control
-
The ADS cluster is installed according to the Online installation guide. The minimum ADS version is 3.7.2.1.b1.
-
Kafka, Kafka Connect, and Schema-Registry services are installed in the ADS cluster. Schema-Registry Server IP address is
10.92.42.130
. -
To automatically create a Kafka topic, the auto.create.topics.enable parameter is enabled in the server.properties group when configuring the Kafka service.
-
When configuring the Kafka Connect service, a new value
/usr/lib/kafka-connect/plugins
has been added to the plugin.path parameter group in the connect-distributed.properties parameter as shown below.Configuring the path to the Iceberg Sink Connector pluginSetting the plugin path is required for ADS versions before 3.9.0.1.
-
The ADS Control cluster is installed according to the guide Get started with Arenadata Streaming Control and integrated with the ADS cluster being used.
ADPG
-
The ADPG cluster is installed according to the Online installation guide.
-
PostgreSQL server IP address (a host with the ADPG service) is
10.92.43.42
. For incoming connections, the default port number is5432
. -
To create a Debezium connector, similar to the one described in the Debezium Connector for PostgreSQL server article, the ADPG cluster settings are set via ADCM UI (configuring access to the database from the Kafka Connect host in the pg_hba.conf file and the wal_level parameter in the postgresql.conf file).
Below is an example of creating and configuring a PostgreSQL database (in the ADPG cluster).
Create a database:
CREATE DATABASE my_database;
Create a user with the SUPERUSER
role:
CREATE USER my_user WITH SUPERUSER PASSWORD 'P@ssword';
CAUTION
The SUPERUSER user role is used for testing purposes only. For more information, see Configuring permissions for the Debezium connector.
|
Create a test table:
CREATE TABLE my_table (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
country VARCHAR(100),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Configure replica identification for a table to enable the use of BIGSERIAL PRIMARY KEY
indexes on the subscriber side for row lookups:
ALTER TABLE my_table REPLICA IDENTITY FULL;
Fill the table with data (given as an example, several lines are needed for clarity):
INSERT INTO my_table(name, country) VALUES ('John Jones','USA');
ADH
-
The ADH cluster is installed according to the Online installation guide. The minimum ADH version is 3.7.2.1.b1.
-
HDFS, ADPG, YARN, Hive, and HUE services are installed in the ADH cluster. Hive server IP (Hive Metastore component) is
10.92.43.153
. -
In ADH, on each host, you need to create a
kafka
user, which must have rights to create tables in the HDFS storage (be part of thehadoop
group).
Create a Debezium connector for PostgreSQL server
To capture row-level data changes in PostgreSQL database tables using ADS Control, create a Debezium connector similar to the one described in the article Debezium connector for PostgreSQL server.
Below is an example Debezium connector configuration with a description of the new parameters used in the data pipeline.
{
"name": "cdc-my-adpg",
"topic.prefix": "cdc__demo",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"publication.autocreate.mode": "filtered",
"database.user": "my_user",
"database.dbname": "my_database",
"tasks.max": 1,
"database.port": 5432,
"plugin.name": "pgoutput",
"database.hostname": "10.92.43.42",
"database.password": "P@ssword",
"table.include.list": ["public.my_table"],
"value.converter.schema.registry.url": "http://10.92.42.130:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://10.92.42.130:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": ["unwrap"],
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.schema,source.table,source.ts_ms,ts_ms",
"transforms.unwrap.add.headers": "db",
"transforms.unwrap.delete.handling.mode": "rewrite",
"slot.name": "cdc__my_database",
"schema.evolution": "basic"
}
Attribute | Description |
---|---|
value.converter.schema.registry.url |
Schema-Registry server address |
key.converter.schema.registry.url |
Schema-Registry server address |
transforms.unwrap.type |
Specifies the class used for the |
transforms.unwrap.add.fields |
Additional fields to add to the data |
transforms.unwrap.add.headers |
Defines whether to add metadata to the Kafka message header |
transforms.unwrap.delete.handling.mode |
Defines how to handle deletion events. |
table.include.list |
Optional list of comma-separated regular expressions that match the table IDs of the tables whose changes are to be captured. If this property is set, the connector captures changes only from the specified tables. Each ID must be of the form
|
slot.name |
Name of the PostgreSQL logical decoding slot created for streaming. The server uses this slot to stream events to the Debezium connector. Must be unique for each connector connecting to the database |
schema.evolution |
Specifies the schema evolution mode. When set to |
Create an Iceberg connector
To create an Iceberg connector via ADS Control, the connector plugin IcebergSinkConnector is used.
To create connectors using the ADS Control, you need to:
-
Go to the Kafka Connects page in the ADS Control web interface. The Kafka Connects page becomes available after selecting a cluster in the cluster management section and going to the desired tab on the General page.
-
Select the desired cluster and go to the Kafka Connect instance overview page.
-
Click Create Connector on the Kafka Connect instance overview page. After clicking Create Connector, the window for selecting the connector plugin Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins opens.
-
Select the desired connector to create.
Selecting a plugin to create a connectorSelecting a plugin to create a connector -
Fill in the connector configuration parameters. If necessary, use the parameter information:
-
Kafka Connect service configurations in the ADS configuration parameters article;
You can fill in the configuration in the form of a JSON file. To do this, enable the JSON view switch.
Connector configurationConnector configurationConnector configuration JSON fileConnector configuration JSON fileExample contents of a JSON file with an Iceberg connector configuration{ "name": "iceberg-sink-cdc-connector", "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": 1, "topics": ["cdc__demo.public.my_table"], "iceberg.tables": ["default.my_table"], "iceberg.write.format.default": "parquet", "iceberg.tables.auto-create-enabled": true, "iceberg.tables.schema-case-insensitive": true, "iceberg.tables.upsert-mode-enabled": true, "iceberg.tables.cdc-field": "__op", "iceberg.tables.default-id-columns": "id" "iceberg.catalog.type": "hive", "iceberg.catalog.uri": "thrift://10.92.43.153:9083", "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/", "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://10.92.42.130:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://10.92.42.130:8081", "iceberg.control.commit.interval-ms": 30000 }
Attribute Description name
Name of the connector that will be used in the Kafka Connect service
connector.class
Class name for the connector
tasks.max
Maximum number of tasks to create
topics
Name of the topic whose data should be passed to the Iceberg tables. For the CDC pipeline implementation, the topic name has the form
<topic.prefix>.<table.include.list>
, consisting of the parameters of the Debezium connector created aboveiceberg.tables
Comma-separated list of target tables
iceberg.write.format.default
Default file format for a table:
parquet
,avro
, ororc
iceberg.tables.auto-create-enabled
Set to
true
to automatically create target tablesiceberg.tables.schema-case-insensitive
Set to
true
to search for table columns by name in a case-insensitive mannericeberg.tables.upsert-mode-enabled
Flag to enable/disable UPSERT mode
iceberg.tables.cdc-field
Field of the source record that identifies the type of operation (
INSERT
,UPDATE
, orDELETE
)iceberg.tables.default-id-columns
List of default columns separated by commas that identify the ID row in tables
iceberg.catalog.type
Type of a catalog for storing Iceberg tables
iceberg.catalog.uri
Address to connect to the catalog with Iceberg tables (address of the host with the Hive Metastore component installed)
iceberg.hadoop-conf-dir
Path to Kafka broker configuration files
iceberg.catalog.warehouse
Path to metadata storage
key.converter
Converter type for message key
key.converter.schema.registry.url
Schema-Registry server address
value.converter
Converter type for message value
value.converter.schema.registry.url
Schema-Registry server address
iceberg.control.commit.interval-ms
Interval between commits in ms
-
-
After filling, click Save and get a message about the successful creation of the connector.
Message about the successful creation of the connectorMessage about the successful creation of the connector -
Check that the opened page displays the created connector in the working status.
Created connectorCreated connectorIf after creating the connector the task is created with an error, the contents of the error can be seen after clicking
, located in the Status field of the task.
PostgreSQL database table data in ADS Control
The Topics page of the ADS Control user interface displays a topic with all PostgreSQL table fields and their contents created by the Debezium connector.


The ADS Control user interface page Schema-Registry displays the schemas created in Schema-Registry for table keys and values.


The schemas tab displays a description of all table fields, including fields added using the transforms.unwrap.add.fields
parameter when creating the Debezium connector.


As a result of creating the data pipeline, in the connectors table on the Kafka Connect instance overview page, both connectors created have the same topic name in the Topic column.


Iceberg tables
To test change collection using the Debezium connector, make changes to the PostgreSQL table:
INSERT INTO my_table(name, country) VALUES ('NAME','COUNTRY');
After connecting to the user interface of the HUE service from the ADH cluster, you can view Iceberg tables created by the connector.

