SMT для Iceberg Sink Connector

Обзор SMT

Single Message Transform (SMT) — готовый компонент с простой логикой преобразования одного сообщения.

SMT-преобразования являются частью архитектуры Kafka Connect, используются для изменения данных перед записью в Kafka, а также для изменения считанных из Kafka данных, упрощая обработку и маршрутизацию событий.

SMT-преобразования не являются обязательными.

Настройка преобразований выполняется внутри конфигурации экземпляра коннектора при помощи следующих параметров:

{
"transforms": "<alias>",
"transforms.<alias>.type": "<alias.type>",
"transforms.<alias>.<transformConfig>": "<valueTransformConfig>"
}

где:

  • <alias> — имя преобразования, имена могут быть перечислены через запятую в порядке применения преобразований.

  • <alias.type> — полное имя класса для преобразования.

  • <transformConfig> — конфигурационный параметр для конкретного преобразования. Для некоторых преобразований параметры могут отсутствовать.

  • <valueTransformConfig> — значение конфигурационного параметра преобразования

Начиная с ADS 3.9.1.1.b1 в ADS Control по умолчанию доступны для настройки в Iceberg Sink Connector несколько типов (классов) SMT:

CopyValue

CopyValue копирует значение из одного поля в новое поле.

Ниже описаны конфигурационные параметры преобразования CopyValue.

Параметр Описание

source.field

Наименование поля, из которого копируется значение

target.field

Наименование поля, в которое копируется значение

DmsTransform

DmsTransform преобразует сообщение в формат, применяющийся в AWS Database Migration Service (AWS DMS), для миграции данных в целевое хранилище с использованием CDC-функции. Применение преобразования переносит поля data на верхний уровень и добавляет поля метаданных: _cdc.op, _cdc.ts и _cdc.source.

Конфигурационные параметры для преобразования DmsTransform отсутствуют.

DebeziumTransform

DebeziumTransform преобразует сообщение в формат, применяющийся в платформе Debezium для миграции данных в целевое хранилище с использованием CDC-функции.

Применение преобразования выполняет:

  • перенос полей before или after, связанных с коллекциями данных, на верхний уровень (если они были добавлены ранее при помощи параметра partition.payload.field);

  • добавление полей метаданных: _cdc.op, _cdc.ts, _cdc.offset, _cdc.source, _cdc.target и _cdc.key.

Ниже описаны конфигурационные параметры преобразования DebeziumTransform.

Параметр Описание Значение по умолчанию

cdc.target.pattern

Шаблон, используемый для установки значения целевого поля CDC

<db>.<table>

JsonToMapTransform

JsonToMapTransform преобразовывает данные типа String в JSON-объекты для создания схем.

Iceberg Sink Connector может быть использован для преобразования данных со структурой Map без схем в структуры Iceberg.

Для плохо структурированных JSON-объектов с динамически меняющимися ключами использование Iceberg Sink Connector может привести к неограниченному росту количества столбцов в таблице Iceberg из-за эволюции схемы. В такой ситуации может быть применен JsonToMapTransform, чтобы можно было занести данные в Iceberg и затем обработать их с помощью запросов.

Применение преобразования JsonToMapTransform обеспечивает:

  • Преобразование вложенных объектов в Map и включение в схему типа Map.

  • Создание таблиц Iceberg с колонками Iceberg Map (String) для JSON-объектов в соответствии со схемой.

ПРИМЕЧАНИЕ
  • При использовании JsonToMapTransform в качестве параметра value.converter для коннектора должен быть указан stringConverter, а не jsonConverter.

  • Ключи сообщений, tombstone и заголовки не преобразуются и передаются SMT в изначальном виде.

  • Ключи с пустыми массивами и пустыми объектами исключаются из итоговой схемы.

  • Eсли массивы JSON содержат элементы разных типов, они преобразуются в массивы строк.

Ниже описаны конфигурационные параметры преобразования JsonToMapTransform.

Параметр Описание Значение по умолчанию

json.root

Указывает вид создаваемой структуры:

  • false — будет создан Struct с выводимыми схемами для примитивных и массивных полей. Вложенные объекты становятся полями типа Map<String, String>.

  • true — предназначен для наиболее непоследовательных данных. Будет создан Struct с одним полем под названием payload со структурой Schema типа Map<String, String>.

false

Ниже в примере показана разница между измененными сообщениями при разных значениях параметра json.root.

Исходное сообщение:

{
  "key": 1,
  "array": [1,"two",3],
  "empty_obj": {},
  "nested_obj": {"some_key": ["one", "two"]}
}

Измененное сообщение при json.root=true:

SinkRecord.schema:
  "payload" : (Optional) Map<String, String>

SinkRecord.value (Struct):
  "payload"  : Map(
    "key" : "1",
    "array" : "[1,"two",3]"
    "empty_obj": "{}"
    "nested_obj": "{"some_key":["one","two"]}"
   )

Измененное сообщение при json.root=false:

SinkRecord.schema:
  "key": (Optional) Int32,
  "array": (Optional) Array<String>,
  "nested_object": (Optional) Map<String, String>

SinkRecord.value (Struct):
 "key" 1,
 "array" ["1", "two", "3"]
 "nested_object" Map ("some_key" : "["one", "two"]")

KafkaMetadataTransform

KafkaMetadataTransform добавляет поля метаданных сообщения Kafka: topic, partition, offset, timestamp.

Ниже описаны конфигурационные параметры преобразования KafkaMetadataTransform.

Параметр Описание Значение по умолчанию

field_name

префикс для полей

_kafka_metadata

nested

Определяет уровень для добавления метаданных:

  • false — метаданные добавляются на верхний уровень с префиксом: _kafka_metadata.topic, _kafka_metadata.partition, _kafka_metadata.offset, _kafka_metadata.timestamp;

  • true — метаданные вложены в структуру: _kafka_metdata_topic, _kafka_metadata_partition, _kafka_metadata_offset, _kafka_metadata_timestamp

false

external_field

Добавляет постоянную пару ключ/значение к метаданным (например, название кластера)

 — 

MongoDebeziumTransform

MongoDebeziumTransform преобразует сообщение, полученное при помощи коннектора Mongo Debezium из формата, используемого в базе MongoDB (с BSON-строками и полями before или after), в типизированные структуры before или after, которые могут быть далее изменены при помощи преобразования DebeziumTransform.

Ниже описаны конфигурационные параметры преобразования MongoDebeziumTransform.

Параметр Описание

array_handling_mode

Массив (array) или BSON-документ для установки режима обработки массива

Пример использования SMT-преобразований

ПРИМЕЧАНИЕ

Приведенный в статье пример основан на пайплайне, описанном в статье Коннектор Iceberg.

Ниже показана таблица Iceberg my_table, отображающаяся в интерфейсе HUE в результате запуска Iceberg Sink Connector без использования SMT-преобразований. В таблице отображаются только поля, которые содержит топик cdc__demo.public.my_table.

Таблица Iceberg my_table
Таблица Iceberg my_table
Таблица Iceberg my_table
Таблица Iceberg my_table

Для реализации SMT-преобразований создайте таблицу Iceberg my_newest_table для того же самого топика cdc__demo.public.my_table.

При создании новой таблицы в конфигурацию коннектора добавьте параметры для реализации преобразований CopyValue и KafkaMetadataTransform.

Ниже показаны новые параметры.

{
"transforms": [
    "copyName",
    "kafkaMetadata"
  ],
"transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
"transforms.copyName.source.field": "name",
"transforms.copyName.target.field": "name_copy",
"transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
"transforms.kafkaMetadata.nested": "true",
}

где:

  • преобразование copyName создает новое поле name_copy, которое полностью копирует поле name;

  • преобразование kafkaMetadata создает группу метаданных _kafka_metadata.

Ниже приведен пример JSON-файла конфигурации коннектора с использованием новых параметров.

Пример содержимого JSON-файла конфигурации коннектора Iceberg с использованием SMT-преобразований
{
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "iceberg.tables.cdc-field": "__op",
  "tasks.max": 1,
  "transforms": [
    "copyName",
    "kafkaMetadata"
  ],
  "iceberg.tables.upsert-mode-enabled": true,
  "iceberg.tables.auto-create-enabled": true,
  "transforms.copyName.type": "io.tabular.iceberg.connect.transforms.CopyValue",
  "iceberg.tables": [
    "default.my_newest_table"
  ],
  "transforms.kafkaMetadata.type": "io.tabular.iceberg.connect.transforms.KafkaMetadataTransform",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "transforms.copyName.target.field": "name_copy",
  "topics": [
    "cdc__demo.public.my_table"
  ],
  "iceberg.control.commit.interval-ms": 30000,
  "iceberg.catalog.uri": "thrift://10.92.40.102:9083",
  "value.converter.schema.registry.url": "http://10.92.42.153:8081",
  "transforms.kafkaMetadata.nested": "true",
  "transforms.copyName.source.field": "name",
  "name": "iceberg-sink-cdc-newestconnector",
  "iceberg.tables.schema-case-insensitive": true,
  "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
  "iceberg.tables.default-id-columns": "id",
  "iceberg.catalog.type": "hive",
  "key.converter.schema.registry.url": "http://10.92.42.153:8081",
  "iceberg.write.format.default": "parquet",
  "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/"
}

В результате запуска iceberg-sink-cdc-newestconnector созданная таблица my_newest_table отображает новые поля name_copy и _kafka_metadata в интерфейсе HUE.

Таблица Iceberg с новыми полями
Таблица Iceberg с новыми полями
Таблица Iceberg с новыми полями
Таблица Iceberg с новыми полями

На странице Table Browser → Databases → default → my_newest_table→ Overview находится схема таблицы, содержащая новые поля.

Схема таблицы Iceberg
Схема таблицы Iceberg
Схема таблицы Iceberg
Схема таблицы Iceberg

После наведения на поле _kafka_metadata открывается структура, содержащая поля метаданных записи.

Структура поля _kafka_metadata
Структура поля _kafka_metadata
Структура поля _kafka_metadata
Структура поля _kafka_metadata
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней