Коннектор Iceberg

Обзор Iceberg Sink Connector

Apache Iceberg Sink Connector — коннектор-приемник для записи данных из Kafka в таблицы Iceberg.

Использование Iceberg Sink Connector дает следующие преимущества:

  • Координация сообщений для централизованных коммитов Iceberg.

  • Обеспечение семантики доставки exactly once.

  • Возможность разветвления нескольких таблиц.

  • Автоматическое создание таблиц и эволюция схем.

  • Отображение имени поля с помощью функции сопоставления столбцов Iceberg.

Для иллюстрации преимуществ коннектора в статье показана реализация полного конвейера с использованием Kafka Connect — захват изменения данных (Change Data Capture, CDC) в таблицах баз данных PostgreSQL с записью изменений в таблицу Iceberg.

Предварительные требования

Ниже описано окружение, используемое для создания полного конвейера CDC.

ADS и ADS Control

  • Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.7.2.1.b1.

  • Сервисы Kafka, Kafka Connect и Schema-Registry установлены в кластере ADS. IP-адрес сервера Schema-Registry — 10.92.42.130.

  • Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

  • При конфигурировании сервиса Kafka Connect в группе параметров connect-distributed.properties для параметра plugin.path добавлено новое значение /usr/lib/kafka-connect/plugins, как показано ниже.

    Настройка пути к плагину Iceberg Sink Connector
    Настройка пути к плагину Iceberg Sink Connector

    Настройка пути к плагину требуется для версий ADS до 3.9.0.1.

  • Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.

ADPG

  • Кластер ADPG развернут согласно руководству Online-установка.

  • IP-адрес PostgreSQL server (хост с сервисом ADPG) — 10.92.43.42. Для входящих подключений по умолчанию используется порт с номером 5432.

  • Для создания коннектора Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server установлены настройки кластера ADPG через пользовательский интерфейс ADCM (настройка доступа к базе данных c хоста Kafka Connect в файле pg_hba.conf и параметра wal_level в файле postgresql.conf).

Ниже приведен пример создания и настройки базы данных PostgreSQL (в кластере ADPG).

Настройка базы данных PostgreSQL

Создание базы данных:

CREATE DATABASE my_database;

Создание пользователя с ролью SUPERUSER:

CREATE USER my_user WITH SUPERUSER PASSWORD 'P@ssword';
ВНИМАНИЕ
Роль пользователя SUPERUSER используются только для тестовых целей. Для получения более подробной информации см. Настройка разрешений для коннектора Debezium.

Создание тестовой таблицы:

CREATE TABLE my_table (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    country VARCHAR(100),
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Настройка идентификации реплики для таблицы, чтобы включить возможность использования индексов BIGSERIAL PRIMARY KEY на стороне подписчика для поиска строк:

ALTER TABLE my_table REPLICA IDENTITY FULL;

Заполнение таблицы данными (приведено как пример, для наглядности необходимо несколько строк):

INSERT INTO my_table(name, country) VALUES ('John Jones','USA');

ADH

  • Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.3.6.2.b1.

  • Сервисы HDFS, ADPG, YARN, Hive и HUE установлены в кластере ADH. IP-адрес сервера Hive (компонент Hive Metastore) — 10.92.43.153.

  • В ADH на каждом хосте необходимо создать пользователя kafka, который должен иметь права на создание таблиц в хранилище HDFS (входить в группу hadoop).

Создание коннектора Debezium для PostgreSQL server

Для захвата изменения данных на уровне строк в таблицах баз данных PostgreSQL при помощи ADS Control создайте коннектор Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server.

Ниже приведен пример конфигурации Debezium-коннектора с описанием новых параметров, применяющихся в конвейере данных.

Пример содержимого JSON-файла конфигурации Debezium-коннектора для PostgreSQL server
{
  "name": "cdc-my-adpg-new",
  "topic.prefix": "cdc__demo",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "publication.autocreate.mode": "filtered",
  "database.user": "my_user",
  "database.dbname": "my_database",
  "tasks.max": 1,
  "database.port": 5432,
  "plugin.name": "pgoutput",
  "database.hostname": "10.92.43.42",
  "database.password": "P@ssword",
  "table.include.list": ["public.my_table"],

  "value.converter.schema.registry.url": "http://10.92.42.130:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://10.92.42.130:8081",
  "key.converter": "io.confluent.connect.avro.AvroConverter",

  "transforms": ["unwrap"],
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields": "op,source.schema,source.table,source.ts_ms,ts_ms",
  "transforms.unwrap.add.headers": "db",
  "transforms.unwrap.delete.handling.mode": "rewrite",

  "slot.name": "cdc__my_database",
  "schema.evolution": "basic"
}
Атрибут Описание

value.converter.schema.registry.url

Адрес сервера Schema-Registry

key.converter.schema.registry.url

Адрес сервера Schema-Registry

transforms

Список трансформаций, которые будут применяться к записям

transforms.unwrap.type

Класс, используемый для трансформации unwrap

transforms.unwrap.add.fields

Дополнительные поля, добавляемые к данным

transforms.unwrap.add.headers

Добавление метаданных в заголовок сообщения Kafka

transforms.unwrap.delete.handling.mode

Определяет, как обрабатывать события удаления. rewrite гарантирует, что информация об удаленных записях будет сохранена и передана в сообщении Kafka

table.include.list

Необязательный список регулярных выражений, разделенных запятыми, соответствующих идентификаторам таблиц, изменения которых необходимо захватить. Если это свойство задано, коннектор захватывает изменения только из указанных таблиц. Каждый идентификатор должен иметь форму <schemaName>.<tableName>, где:

  • <schemaName> — имя схемы базы данных, в которой произошло событие изменения;

  • <tableName> — имя таблицы базы данных, в которой произошло событие изменения.

slot.name

Имя логического слота декодирования PostgreSQL, созданного для потоковой передачи. Сервер использует этот слот для потоковой передачи событий на коннектор Debezium. Должен быть уникальным для каждого коннектора, подключаемого к базе

schema.evolution

Указывает режим эволюции схемы. При указании значения basic коннектор автоматически обнаруживает поля, которые находятся в полезной нагрузке события (новых изменениях), но не существуют в целевой таблице, и изменяет таблицу

Создание коннектора Iceberg

Для создания коннектора Iceberg через ADS Control используется плагин коннектора IcebergSinkConnector.

Для создания коннекторов при помощи ADS Control:

  1. Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.

  2. Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.

  3. Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.

  4. Выберите нужный коннектор для создания.

    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
  5. Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:

    Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.

    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    Пример содержимого JSON-файла конфигурации коннектора Iceberg
    {
        "name": "iceberg-sink-cdc-connector",
        "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
        "tasks.max": 1,
        "topics": ["cdc__demo.public.my_table"],
        "iceberg.tables": ["default.my_table"],
        "iceberg.write.format.default": "parquet",
        "iceberg.tables.auto-create-enabled": true,
        "iceberg.tables.schema-case-insensitive": true,
        "iceberg.tables.upsert-mode-enabled": true,
        "iceberg.tables.cdc-field": "__op",
        "iceberg.tables.default-id-columns": "id"
        "iceberg.catalog.type": "hive",
        "iceberg.catalog.uri": "thrift://10.92.43.153:9083",
        "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/",
        "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://10.92.42.130:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://10.92.42.130:8081",
        "iceberg.control.commit.interval-ms": 30000
    }
    Атрибут Описание

    name

    Название коннектора, которое будет использоваться в сервисе Kafka Connect

    connector.class

    Имя класса для коннектора

    tasks.max

    Максимальное количество создаваемых задач

    topics

    Имя топика, данные которого должны будут переданы в таблицы Iceberg. Для реализации конвейера CDC имя топика имеет форму <topic.prefix>.<table.include.list>, состоящую из параметров Debezium-коннектора, созданного выше

    iceberg.tables

    Список таблиц назначения, разделенных запятыми

    iceberg.write.format.default

    Формат файла по умолчанию для таблицы: parquet, avro или orc

    iceberg.tables.auto-create-enabled

    Установите значение true для автоматического создания целевых таблиц

    iceberg.tables.schema-case-insensitive

    Установите значение true для поиска столбцов таблицы по имени без учета регистра

    iceberg.tables.upsert-mode-enabled

    Флаг для включения/выключения режима UPSERT

    iceberg.tables.cdc-field

    Поле исходной записи, которое идентифицирует тип операции (INSERT, UPDATE или DELETE)

    iceberg.tables.default-id-columns

    Список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в таблицах

    iceberg.catalog.type

    Тип каталога для хранения таблиц Iceberg

    iceberg.catalog.uri

    Адрес для подключения к каталогу с таблицами Iceberg (адрес хоста с установленным компонентом Hive Metastore)

    iceberg.hadoop-conf-dir

    Путь к файлам конфигурации брокера Kafka

    iceberg.catalog.warehouse

    Путь к хранению метаданных

    key.converter

    Тип конвертера для ключа сообщения

    key.converter.schema.registry.url

    Адрес сервера Schema-Registry

    value.converter

    Тип конвертера для значения сообщения

    value.converter.schema.registry.url

    Адрес сервера Schema-Registry

    iceberg.control.commit.interval-ms

    Интервал между коммитами в мс

  6. После заполнения кликните Save и получите сообщение об успешном создании коннектора.

    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
  7. Проверьте, что на открывшейся странице отображается созданный коннектор в рабочем статусе.

    Созданный коннектор
    Созданный коннектор
    Созданный коннектор
    Созданный коннектор

    В случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки restart dark restart light, расположенной в поле Status задачи.

Данные таблицы PostgreSQL в ADS Control

На странице Topics пользовательского интерфейса ADS Control отображается топик со всеми полями и содержимым таблицы PostgreSQL, созданный коннектором Debezium.

Топик, созданный коннектором
Топик, созданный коннектором
Топик, созданный коннектором
Топик, созданный коннектором

На странице Schema-Registry пользовательского интерфейса ADS Control отображаются созданные в Schema-Registry схемы для ключей и значений таблицы.

Созданные схемы
Созданные схемы
Созданные схемы
Созданные схемы

На вкладке схемы отображается описание всех полей таблицы, включая поля, добавленные при помощи параметра transforms.unwrap.add.fields при создании коннектора Debezium.

Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL

В результате создания конвейера данных в таблице коннекторов на странице обзора экземпляра Kafka Connect оба созданных коннектора имеют в столбце Topic имя одного и того же обрабатываемого топика.

Таблица созданных коннекторов в Kafka Connect
Таблица коннекторов с указанием имени топика
Таблица созданных коннекторов в Kafka Connect
Таблица коннекторов с указанием имени топика

Таблицы Iceberg

Для проверки сбора изменений коннектором Debezium добавьте запись в таблицу PostgreSQL:

INSERT INTO my_table(name, country) VALUES ('NAME','COUNTRY');

После подключения к пользовательскому интерфейсу сервиса HUE из состава кластера ADH можно просматривать таблицы Iceberg, созданные коннектором.

Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней