SMT for Iceberg Sink Connector

Overview of SMT

Single Message Transform (SMT) — a ready-made component with simple logic for a single message transformation.

SMT transformations are part of the Kafka Connect architecture, used to modify data before writing it to Kafka and to modify data read from Kafka, simplifying event processing and routing.

SMT transformations are not mandatory.

The configuration of transformations is performed as part of the instance configuration of the connector via the following parameters:

{
"transforms": "<alias>",
"transforms.<alias>.type": "<alias.type>",
"transforms.<alias>.<transformConfig>": "<valueTransformConfig>"
}

where:

  • <alias> — the name of the transformation; names can be listed separated by commas in the order the transformations are applied.

  • <alias.type> — the full class name for the transformation.

  • <transformConfig> — the configuration parameter for a specific transformation. Some transformations may not have parameters.

  • <valueTransformConfig> — the value of the transformation’s configuration parameter.

Starting with ADS 3.9.1.1.b1, in ADS Control, several types (classes) of SMT are available for configuration by default for the Iceberg Sink Connector:

CopyValue

CopyValue copies a value from one field to a new field.

The configuration parameters for the CopyValue transformation are described below.

Parameter Description

source.field

Name of the field from which the value is copied

target.field

Name of the field into which the value is copied

DmsTransform

DmsTransform converts a message into the format used in AWS Database Migration Service (AWS DMS) for migrating data to a target store using the CDC feature. Applying the transformation promotes the data fields to the top level and adds metadata fields: _cdc.op, _cdc.ts, and _cdc.source.

There are no configuration parameters for the DmsTransform transformation.

DebeziumTransform

DebeziumTransform converts a message into the format used in the Debezium platform for migrating data to the target storage using the CDC function.

The application of the transformation performs:

  • moving before or after fields associated with data collections to the top level (if they were previously added using the partition.payload.field parameter);

  • adding metadata fields: _cdc.op, _cdc.ts, _cdc.offset, _cdc.source, _cdc.target, and _cdc.key.

The configuration parameters for `DebeziumTransform `are described below.

Parameter Description Default value

cdc.target.pattern

Template used to set the value of the CDC target field

<db>.<table>

JsonToMapTransform

JsonToMapTransform converts data of type String into JSON objects for schema creation.

Iceberg Sink Connector can be used to transform data with a Map structure without a schema into Iceberg structures.

For poorly structured JSON objects with dynamically changing keys, using the Iceberg Sink Connector can lead to an unlimited increase in the number of columns in an Iceberg table due to schema evolution. In such a situation, JsonToMapTransform can be used so that data can be loaded into Iceberg and then processed with queries.

Using the JsonToMapTransform transformation provides:

  • Conversion of nested objects into a Map and inclusion in a Map-type schema.

  • Creation of Iceberg tables with Iceberg Map (String) columns for JSON objects according to the schema.

NOTE
  • When using JsonToMapTransform, value.converter should be set to stringConverter, not jsonConverter.

  • Message keys, tombstones, and headers are not transformed and are passed to the SMT in their original form.

  • Keys with empty arrays and empty objects are excluded from the resulting schema.

  • If JSON arrays contain elements of different types, they are converted into arrays of strings.

The configuration parameters for JsonToMapTransform are described below.

Parameter Description Default value

json.root

Specifies the type of structure being created:

  • false — a Struct will be created with inferred schemas for primitive and array fields. Nested objects will become fields of type Map<String, String>.

  • true — intended for the most inconsistent data. It will create a Struct with a single field named payload with a Schema structure of type Map<String, String>.

false

Below is the difference between the modified messages with different values of the json.root parameter.

Original message:

{
  "key": 1,
  "array": [1,"two",3],
  "empty_obj": {},
  "nested_obj": {"some_key": ["one", "two"]}
}

Modified message with json.root=true:

SinkRecord.schema:
  "payload" : (Optional) Map<String, String>

SinkRecord.value (Struct):
  "payload"  : Map(
    "key" : "1",
    "array" : "[1,"two",3]"
    "empty_obj": "{}"
    "nested_obj": "{"some_key":["one","two"]}"
   )

Modified message with json.root=false:

SinkRecord.schema:
  "key": (Optional) Int32,
  "array": (Optional) Array<String>,
  "nested_object": (Optional) Map<String, String>

SinkRecord.value (Struct):
 "key" 1,
 "array" ["1", "two", "3"]
 "nested_object" Map ("some_key" : "["one", "two"]")

KafkaMetadataTransform

KafkaMetadataTransform adds Kafka message metadata fields: topic, partition, offset, timestamp.

The configuration parameters for the KafkaMetadataTransform transformation are described below.

Parameter Description Default value

field_name

Prefix for fields

_kafka_metadata

nested

Determines the level for adding metadata:

  • false — metadata is added at the top level with the prefix: _kafka_metadata.topic, _kafka_metadata.partition, _kafka_metadata.offset, _kafka_metadata.timestamp;

  • true — metadata is nested within a structure: _kafka_metadata_topic, _kafka_metadata_partition, _kafka_metadata_offset, _kafka_metadata_timestamp

false

external_field

Adds a permanent key/value pair to the metadata (for example, the cluster name)

 — 

MongoDebeziumTransform

MongoDebeziumTransform transforms a message received via the connector Mongo Debezium from the format used in the MongoDB database (with BSON strings and before or after fields) into typed before or after structures, which can then be modified using the DebeziumTransform transformation.

The configuration parameters for MongoDebeziumTransform are described below.

Parameter Description

array_handling_mode

An array or a BSON document for setting the array processing mode

Example of SMT

NOTE

The example provided in the article is based on the pipeline described in the Iceberg connector article.

Below is the my_table Iceberg table, displayed in the HUE interface as a result of running the Iceberg Sink Connector without using SMT transformations. The table shows only the fields contained in the cdc__demo.public.my_table topic .

Iceberg table
Iceberg table
Iceberg table
Iceberg table

To implement SMT transformations, create the my_newest_table Iceberg table for the cdc__demo.public.my_table topic .

When creating a new table in the connector configuration, add parameters for implementing the CopyValue and KafkaMetadataTransform transformations.

The new parameters are shown below.

{
"transforms": [
    "copyName",
    "kafkaMetadata"
  ],
"transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
"transforms.copyName.source.field": "name",
"transforms.copyName.target.field": "name_copy",
"transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
"transforms.kafkaMetadata.nested": "true"
}

where:

  • the copyName transformation creates a name_copy field, which fully copies the name field;

  • the kafkaMetadata transformation creates a _kafka_metadata metadata group.

Below is an example of a connector configuration JSON file using the new parameters.

Example of a JSON file of an Iceberg connector configuration with SMT
{
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "iceberg.tables.cdc-field": "__op",
  "tasks.max": 1,
  "transforms": [
    "copyName",
    "kafkaMetadata"
  ],
  "iceberg.tables.upsert-mode-enabled": true,
  "iceberg.tables.auto-create-enabled": true,
  "transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
  "iceberg.tables": [
    "default.my_newest_table"
  ],
  "transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "transforms.copyName.target.field": "name_copy",
  "topics": [
    "cdc__demo.public.my_table"
  ],
  "iceberg.control.commit.interval-ms": 30000,
  "iceberg.catalog.uri": "thrift://10.92.40.102:9083",
  "value.converter.schema.registry.url": "http://10.92.42.153:8081",
  "transforms.kafkaMetadata.nested": "true",
  "transforms.copyName.source.field": "name",
  "name": "iceberg-sink-cdc-newestconnector",
  "iceberg.tables.schema-case-insensitive": true,
  "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
  "iceberg.tables.default-id-columns": "id",
  "iceberg.catalog.type": "hive",
  "key.converter.schema.registry.url": "http://10.92.42.153:8081",
  "iceberg.write.format.default": "parquet",
  "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/"
}

As a result of running iceberg-sink-cdc-newestconnector, the created my_newest_table table displays the name_copy and _kafka_metadata fields in the HUE interface.

Iceberg table with new fields
Iceberg table with new fields
Iceberg table with new fields
Iceberg table with new fields

On the Table Browser → Databases → default → my_newest_table → Overview HUE page, there is a table schema containing the new fields.

Iceberg table schema
Iceberg table schema
Iceberg table schema
Iceberg table schema

Hovering over the _kafka_metadata field opens a structure that contains the record’s metadata fields.

_kafka_metadata field structure
_kafka_metadata field structure
_kafka_metadata field structure
_kafka_metadata field structure
Found a mistake? Seleсt text and press Ctrl+Enter to report it