Коннектор Iceberg
Обзор Iceberg Sink Connector
Apache Iceberg Sink Connector — коннектор-приемник для записи данных из Kafka в таблицы Iceberg.
Использование Iceberg Sink Connector дает следующие преимущества:
-
Координация сообщений для централизованных коммитов Iceberg.
-
Обеспечение семантики доставки exactly once.
-
Возможность разветвления нескольких таблиц.
-
Автоматическое создание таблиц и эволюция схем.
-
Отображение имени поля с помощью функции сопоставления столбцов Iceberg.
Для иллюстрации преимуществ коннектора в статье показана реализация полного конвейера с использованием Kafka Connect — захват изменения данных (Change Data Capture, CDC) в таблицах баз данных PostgreSQL с записью изменений в таблицу Iceberg.
Предварительные требования
Ниже описано окружение, используемое для создания полного конвейера CDC.
ADS и ADS Control
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.7.2.1.b1.
-
Сервисы Kafka, Kafka Connect и Schema-Registry установлены в кластере ADS. IP-адрес сервера Schema-Registry —
10.92.42.130
. -
Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.
-
При конфигурировании сервиса Kafka Connect в группе параметров connect-distributed.properties для параметра plugin.path добавлено новое значение
/usr/lib/kafka-connect/plugins
, как показано ниже.Настройка пути к плагину Iceberg Sink ConnectorНастройка пути к плагину требуется для версий ADS до 3.9.0.1.
-
Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.
ADPG
-
Кластер ADPG развернут согласно руководству Online-установка.
-
IP-адрес PostgreSQL server (хост с сервисом ADPG) —
10.92.43.42
. Для входящих подключений по умолчанию используется порт с номером5432
. -
Для создания коннектора Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server установлены настройки кластера ADPG через пользовательский интерфейс ADCM (настройка доступа к базе данных c хоста Kafka Connect в файле pg_hba.conf и параметра wal_level в файле postgresql.conf).
Ниже приведен пример создания и настройки базы данных PostgreSQL (в кластере ADPG).
Создание базы данных:
CREATE DATABASE my_database;
Создание пользователя с ролью SUPERUSER
:
CREATE USER my_user WITH SUPERUSER PASSWORD 'P@ssword';
ВНИМАНИЕ
Роль пользователя SUPERUSER используются только для тестовых целей. Для получения более подробной информации см. Настройка разрешений для коннектора Debezium.
|
Создание тестовой таблицы:
CREATE TABLE my_table (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
country VARCHAR(100),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Настройка идентификации реплики для таблицы, чтобы включить возможность использования индексов BIGSERIAL PRIMARY KEY
на стороне подписчика для поиска строк:
ALTER TABLE my_table REPLICA IDENTITY FULL;
Заполнение таблицы данными (приведено как пример, для наглядности необходимо несколько строк):
INSERT INTO my_table(name, country) VALUES ('John Jones','USA');
ADH
-
Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.3.6.2.b1.
-
Сервисы HDFS, ADPG, YARN, Hive и HUE установлены в кластере ADH. IP-адрес сервера Hive (компонент Hive Metastore) —
10.92.43.153
. -
В ADH на каждом хосте необходимо создать пользователя
kafka
, который должен иметь права на создание таблиц в хранилище HDFS (входить в группуhadoop
).
Создание коннектора Debezium для PostgreSQL server
Для захвата изменения данных на уровне строк в таблицах баз данных PostgreSQL при помощи ADS Control создайте коннектор Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server.
Ниже приведен пример конфигурации Debezium-коннектора с описанием новых параметров, применяющихся в конвейере данных.
{
"name": "cdc-my-adpg-new",
"topic.prefix": "cdc__demo",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"publication.autocreate.mode": "filtered",
"database.user": "my_user",
"database.dbname": "my_database",
"tasks.max": 1,
"database.port": 5432,
"plugin.name": "pgoutput",
"database.hostname": "10.92.43.42",
"database.password": "P@ssword",
"table.include.list": ["public.my_table"],
"value.converter.schema.registry.url": "http://10.92.42.130:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://10.92.42.130:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": ["unwrap"],
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.schema,source.table,source.ts_ms,ts_ms",
"transforms.unwrap.add.headers": "db",
"transforms.unwrap.delete.handling.mode": "rewrite",
"slot.name": "cdc__my_database",
"schema.evolution": "basic"
}
Атрибут | Описание |
---|---|
value.converter.schema.registry.url |
Адрес сервера Schema-Registry |
key.converter.schema.registry.url |
Адрес сервера Schema-Registry |
transforms |
Список трансформаций, которые будут применяться к записям |
transforms.unwrap.type |
Класс, используемый для трансформации |
transforms.unwrap.add.fields |
Дополнительные поля, добавляемые к данным |
transforms.unwrap.add.headers |
Добавление метаданных в заголовок сообщения Kafka |
transforms.unwrap.delete.handling.mode |
Определяет, как обрабатывать события удаления. |
table.include.list |
Необязательный список регулярных выражений, разделенных запятыми, соответствующих идентификаторам таблиц, изменения которых необходимо захватить. Если это свойство задано, коннектор захватывает изменения только из указанных таблиц. Каждый идентификатор должен иметь форму
|
slot.name |
Имя логического слота декодирования PostgreSQL, созданного для потоковой передачи. Сервер использует этот слот для потоковой передачи событий на коннектор Debezium. Должен быть уникальным для каждого коннектора, подключаемого к базе |
schema.evolution |
Указывает режим эволюции схемы. При указании значения |
Создание коннектора Iceberg
Для создания коннектора Iceberg через ADS Control используется плагин коннектора IcebergSinkConnector.
Для создания коннекторов при помощи ADS Control:
-
Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.
-
Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.
-
Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.
-
Выберите нужный коннектор для создания.
Выбор плагина для создания коннектораВыбор плагина для создания коннектора -
Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:
-
конфигурации сервиса Kafka Connect в статье Конфигурационные параметры ADS;
Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.
Конфигурация коннектораКонфигурация коннектораJSON-файл конфигурации коннектораJSON-файл конфигурации коннектораПример содержимого JSON-файла конфигурации коннектора Iceberg{ "name": "iceberg-sink-cdc-connector", "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": 1, "topics": ["cdc__demo.public.my_table"], "iceberg.tables": ["default.my_table"], "iceberg.write.format.default": "parquet", "iceberg.tables.auto-create-enabled": true, "iceberg.tables.schema-case-insensitive": true, "iceberg.tables.upsert-mode-enabled": true, "iceberg.tables.cdc-field": "__op", "iceberg.tables.default-id-columns": "id" "iceberg.catalog.type": "hive", "iceberg.catalog.uri": "thrift://10.92.43.153:9083", "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/", "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://10.92.42.130:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://10.92.42.130:8081", "iceberg.control.commit.interval-ms": 30000 }
Атрибут Описание name
Название коннектора, которое будет использоваться в сервисе Kafka Connect
connector.class
Имя класса для коннектора
tasks.max
Максимальное количество создаваемых задач
topics
Имя топика, данные которого должны будут переданы в таблицы Iceberg. Для реализации конвейера CDC имя топика имеет форму
<topic.prefix>.<table.include.list>
, состоящую из параметров Debezium-коннектора, созданного вышеiceberg.tables
Список таблиц назначения, разделенных запятыми
iceberg.write.format.default
Формат файла по умолчанию для таблицы:
parquet
,avro
илиorc
iceberg.tables.auto-create-enabled
Установите значение
true
для автоматического создания целевых таблицiceberg.tables.schema-case-insensitive
Установите значение
true
для поиска столбцов таблицы по имени без учета регистраiceberg.tables.upsert-mode-enabled
Флаг для включения/выключения режима UPSERT
iceberg.tables.cdc-field
Поле исходной записи, которое идентифицирует тип операции (
INSERT
,UPDATE
илиDELETE
)iceberg.tables.default-id-columns
Список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в таблицах
iceberg.catalog.type
Тип каталога для хранения таблиц Iceberg
iceberg.catalog.uri
Адрес для подключения к каталогу с таблицами Iceberg (адрес хоста с установленным компонентом Hive Metastore)
iceberg.hadoop-conf-dir
Путь к файлам конфигурации брокера Kafka
iceberg.catalog.warehouse
Путь к хранению метаданных
key.converter
Тип конвертера для ключа сообщения
key.converter.schema.registry.url
Адрес сервера Schema-Registry
value.converter
Тип конвертера для значения сообщения
value.converter.schema.registry.url
Адрес сервера Schema-Registry
iceberg.control.commit.interval-ms
Интервал между коммитами в мс
-
-
После заполнения кликните Save и получите сообщение об успешном создании коннектора.
Сообщение об успешном создании коннектораСообщение об успешном создании коннектора -
Проверьте, что на открывшейся странице отображается созданный коннектор в рабочем статусе.
Созданный коннекторСозданный коннекторВ случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки
, расположенной в поле Status задачи.
Данные таблицы PostgreSQL в ADS Control
На странице Topics пользовательского интерфейса ADS Control отображается топик со всеми полями и содержимым таблицы PostgreSQL, созданный коннектором Debezium.


На странице Schema-Registry пользовательского интерфейса ADS Control отображаются созданные в Schema-Registry схемы для ключей и значений таблицы.


На вкладке схемы отображается описание всех полей таблицы, включая поля, добавленные при помощи параметра transforms.unwrap.add.fields
при создании коннектора Debezium.


В результате создания конвейера данных в таблице коннекторов на странице обзора экземпляра Kafka Connect оба созданных коннектора имеют в столбце Topic имя одного и того же обрабатываемого топика.


Таблицы Iceberg
Для проверки сбора изменений коннектором Debezium добавьте запись в таблицу PostgreSQL:
INSERT INTO my_table(name, country) VALUES ('NAME','COUNTRY');
После подключения к пользовательскому интерфейсу сервиса HUE из состава кластера ADH можно просматривать таблицы Iceberg, созданные коннектором.

