SMT для Iceberg Sink Connector
Обзор SMT
Single Message Transform (SMT) — готовый компонент с простой логикой преобразования одного сообщения.
SMT-преобразования являются частью архитектуры Kafka Connect, используются для изменения данных перед записью в Kafka, а также для изменения считанных из Kafka данных, упрощая обработку и маршрутизацию событий.
SMT-преобразования не являются обязательными.
Настройка преобразований выполняется внутри конфигурации экземпляра коннектора при помощи следующих параметров:
{
"transforms": "<alias>",
"transforms.<alias>.type": "<alias.type>",
"transforms.<alias>.<transformConfig>": "<valueTransformConfig>"
}
где:
-
<alias>— имя преобразования, имена могут быть перечислены через запятую в порядке применения преобразований. -
<alias.type>— полное имя класса для преобразования. -
<transformConfig>— конфигурационный параметр для конкретного преобразования. Для некоторых преобразований параметры могут отсутствовать. -
<valueTransformConfig>— значение конфигурационного параметра преобразования
Начиная с ADS 3.9.1.1.b1 в ADS Control по умолчанию доступны для настройки в Iceberg Sink Connector несколько типов (классов) SMT:
CopyValue
CopyValue копирует значение из одного поля в новое поле.
Ниже описаны конфигурационные параметры преобразования CopyValue.
| Параметр | Описание |
|---|---|
source.field |
Наименование поля, из которого копируется значение |
target.field |
Наименование поля, в которое копируется значение |
DmsTransform
DmsTransform преобразует сообщение в формат, применяющийся в AWS Database Migration Service (AWS DMS), для миграции данных в целевое хранилище с использованием CDC-функции. Применение преобразования переносит поля data на верхний уровень и добавляет поля метаданных: _cdc.op, _cdc.ts и _cdc.source.
Конфигурационные параметры для преобразования DmsTransform отсутствуют.
DebeziumTransform
DebeziumTransform преобразует сообщение в формат, применяющийся в платформе Debezium для миграции данных в целевое хранилище с использованием CDC-функции.
Применение преобразования выполняет:
-
перенос полей
beforeилиafter, связанных с коллекциями данных, на верхний уровень (если они были добавлены ранее при помощи параметра partition.payload.field); -
добавление полей метаданных:
_cdc.op,_cdc.ts,_cdc.offset,_cdc.source,_cdc.targetи_cdc.key.
Ниже описаны конфигурационные параметры преобразования DebeziumTransform.
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
cdc.target.pattern |
Шаблон, используемый для установки значения целевого поля CDC |
<db>.<table> |
JsonToMapTransform
JsonToMapTransform преобразовывает данные типа String в JSON-объекты для создания схем.
Iceberg Sink Connector может быть использован для преобразования данных со структурой Map без схем в структуры Iceberg.
Для плохо структурированных JSON-объектов с динамически меняющимися ключами использование Iceberg Sink Connector может привести к неограниченному росту количества столбцов в таблице Iceberg из-за эволюции схемы. В такой ситуации может быть применен JsonToMapTransform, чтобы можно было занести данные в Iceberg и затем обработать их с помощью запросов.
Применение преобразования JsonToMapTransform обеспечивает:
-
Преобразование вложенных объектов в Map и включение в схему типа Map.
-
Создание таблиц Iceberg с колонками Iceberg Map (String) для JSON-объектов в соответствии со схемой.
|
ПРИМЕЧАНИЕ
|
Ниже описаны конфигурационные параметры преобразования JsonToMapTransform.
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
json.root |
Указывает вид создаваемой структуры:
|
false |
KafkaMetadataTransform
KafkaMetadataTransform добавляет поля метаданных сообщения Kafka: topic, partition, offset, timestamp.
Ниже описаны конфигурационные параметры преобразования KafkaMetadataTransform.
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
field_name |
префикс для полей |
_kafka_metadata |
nested |
Определяет уровень для добавления метаданных:
|
false |
external_field |
Добавляет постоянную пару ключ/значение к метаданным (например, название кластера) |
— |
MongoDebeziumTransform
MongoDebeziumTransform преобразует сообщение, полученное при помощи коннектора Mongo Debezium из формата, используемого в базе MongoDB (с BSON-строками и полями before или after), в типизированные структуры before или after, которые могут быть далее изменены при помощи преобразования DebeziumTransform.
Ниже описаны конфигурационные параметры преобразования MongoDebeziumTransform.
| Параметр | Описание |
|---|---|
array_handling_mode |
Массив (array) или BSON-документ для установки режима обработки массива |
Пример использования SMT-преобразований
|
ПРИМЕЧАНИЕ
Приведенный в статье пример основан на пайплайне, описанном в статье Коннектор Iceberg. |
Ниже показана таблица Iceberg my_table, отображающаяся в интерфейсе HUE в результате запуска Iceberg Sink Connector без использования SMT-преобразований. В таблице отображаются только поля, которые содержит топик cdc__demo.public.my_table.
Для реализации SMT-преобразований создайте таблицу Iceberg my_newest_table для того же самого топика cdc__demo.public.my_table.
При создании новой таблицы в конфигурацию коннектора добавьте параметры для реализации преобразований CopyValue и KafkaMetadataTransform.
Ниже показаны новые параметры.
{
"transforms": [
"copyName",
"kafkaMetadata"
],
"transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
"transforms.copyName.source.field": "name",
"transforms.copyName.target.field": "name_copy",
"transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
"transforms.kafkaMetadata.nested": "true",
}
где:
-
преобразование
copyNameсоздает новое полеname_copy, которое полностью копирует полеname; -
преобразование
kafkaMetadataсоздает группу метаданных_kafka_metadata.
Ниже приведен пример JSON-файла конфигурации коннектора с использованием новых параметров.
{
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"iceberg.tables.cdc-field": "__op",
"tasks.max": 1,
"transforms": [
"copyName",
"kafkaMetadata"
],
"iceberg.tables.upsert-mode-enabled": true,
"iceberg.tables.auto-create-enabled": true,
"transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
"iceberg.tables": [
"default.my_newest_table"
],
"transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"transforms.copyName.target.field": "name_copy",
"topics": [
"cdc__demo.public.my_table"
],
"iceberg.control.commit.interval-ms": 30000,
"iceberg.catalog.uri": "thrift://10.92.40.102:9083",
"value.converter.schema.registry.url": "http://10.92.42.153:8081",
"transforms.kafkaMetadata.nested": "true",
"transforms.copyName.source.field": "name",
"name": "iceberg-sink-cdc-newestconnector",
"iceberg.tables.schema-case-insensitive": true,
"iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
"iceberg.tables.default-id-columns": "id",
"iceberg.catalog.type": "hive",
"key.converter.schema.registry.url": "http://10.92.42.153:8081",
"iceberg.write.format.default": "parquet",
"iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/"
}
В результате запуска iceberg-sink-cdc-newestconnector созданная таблица my_newest_table отображает новые поля name_copy и _kafka_metadata в интерфейсе HUE.
На странице Table Browser → Databases → default → my_newest_table→ Overview находится схема таблицы, содержащая новые поля.
После наведения на поле _kafka_metadata открывается структура, содержащая поля метаданных записи.