Коннектор Iceberg

Обзор Iceberg Sink Connector

Apache Iceberg Sink Connector — коннектор-приемник для записи данных из Kafka в таблицы Iceberg.

Использование Iceberg Sink Connector дает следующие преимущества:

  • Координация сообщений для централизованных коммитов Iceberg.

  • Обеспечение семантики доставки exactly once.

  • Возможность разветвления нескольких таблиц.

  • Автоматическое создание таблиц и эволюция схем.

  • Отображение имени поля с помощью функции сопоставления столбцов Iceberg.

Для иллюстрации преимуществ коннектора в статье показана реализация полного пайплайна с использованием Kafka Connect — захват изменения данных (Change Data Capture, CDC) в таблицах баз данных PostgreSQL с записью изменений в таблицу Iceberg.

Предварительные требования

Ниже описано окружение, используемое для создания полного пайплайна CDC.

ADS и ADS Control

  • Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.7.2.1.b1.

  • Сервисы Kafka, Kafka Connect и Schema-Registry установлены в кластере ADS. IP-адрес сервера Schema-Registry — 10.92.42.130.

  • Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

  • При конфигурировании сервиса Kafka Connect в группе параметров connect-distributed.properties для параметра plugin.path добавлено новое значение /usr/lib/kafka-connect/plugins, как показано ниже.

    Настройка пути к плагину Iceberg Sink Connector
    Настройка пути к плагину Iceberg Sink Connector

    Настройка пути к плагину требуется только для ADS 3.7.2.1.b1.

  • Конфигурация кластера ADH, используемого в пайплайне, импортирована в кластер ADS.

    Для включения импорта на вкладке Import кластера ADS выберите Cluster configuration напротив названия кластера ADH и нажмите Import.

    Импорт данных ADH
    Импорт данных ADH
  • Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.

ADPG

  • Кластер ADPG развернут согласно руководству Online-установка.

  • IP-адрес PostgreSQL server (хост с сервисом ADPG) — 10.92.43.42. Для входящих подключений по умолчанию используется порт с номером 5432.

  • Для создания коннектора Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server установлены настройки кластера ADPG через пользовательский интерфейс ADCM (настройка доступа к базе данных c хоста Kafka Connect в файле pg_hba.conf и параметра wal_level в файле postgresql.conf).

Ниже приведен пример создания и настройки базы данных PostgreSQL (в кластере ADPG).

Настройка базы данных PostgreSQL

Создание базы данных:

CREATE DATABASE my_database;

Подключение к базе:

\c my_database

Создание пользователя с ролью SUPERUSER:

CREATE USER my_user WITH SUPERUSER PASSWORD 'P@ssword';
ВНИМАНИЕ
Роль пользователя SUPERUSER используются только для тестовых целей. Для получения более подробной информации см. Настройка разрешений для коннектора Debezium.

Создание тестовой таблицы:

CREATE TABLE my_table (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    country VARCHAR(100),
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Настройка идентификации реплики для таблицы, чтобы включить возможность использования индексов BIGSERIAL PRIMARY KEY на стороне подписчика для поиска строк:

ALTER TABLE my_table REPLICA IDENTITY FULL;

Заполнение таблицы данными (приведено как пример, для наглядности необходимо несколько строк):

INSERT INTO my_table(name, country) VALUES ('John Jones','USA');
ПРИМЕЧАНИЕ

ADH

  • Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.3.6.2.b1.

  • Сервисы HDFS, ADPG, YARN, Hive и HUE установлены в кластере ADH. IP-адрес сервера Hive (компонент Hive Metastore) — 10.92.43.153.

  • В ADH на каждом хосте необходимо создать пользователя kafka, который должен иметь права на создание таблиц в хранилище HDFS (входить в группу hadoop).

Iceberg Sink Connector с Kerberos

 
Начиная с версии 3.9.0.1.b1 коннектор Iceberg может быть подключен к кластеру ADH, в котором включена аутентификация по протоколу Kerberos.

Также при включенной аутентификации по протоколу Kerberos для подключения к сервису ADH Hive требуется активация SSL.

На стороне кластеров ADS и ADS Control также должны быть выполнены соответствующие настройки:

При этом для доверия между кластерами ADS, ADS Control и ADH должны быть выполнены условия:

  • при включении аутентификации по протоколу Kerberos для всех кластеров должна использоваться одна и та же область керберизации (realm);

  • сертификаты SSL хостов всех кластеров должны взаимно доверять сертификатам SSL хостов друг друга, то есть truststore каждого хоста одного кластера должен содержать сертификат *.crt каждого хоста другого кластера.

Также файл /etc/hive/conf/hive-site.xml должен быть скопирован с хоста ADH, на котором установлен компонент Hive, в директорию /usr/lib/kafka/config/ на хосте ADS c компонентом Kafka Connect.

Создание коннектора Debezium для PostgreSQL server

Для захвата изменения данных на уровне строк в таблицах баз данных PostgreSQL при помощи ADS Control создайте коннектор Debezium аналогично описанному в статье Коннектор Debezium для PostgreSQL server.

Ниже приведен пример конфигурации Debezium-коннектора с описанием новых параметров, применяющихся в пайплайне данных.

Пример содержимого JSON-файла конфигурации Debezium-коннектора для PostgreSQL server
{
  "name": "cdc-my-adpg-new",
  "topic.prefix": "cdc__demo",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "publication.autocreate.mode": "filtered",
  "database.user": "my_user",
  "database.dbname": "my_database",
  "tasks.max": 1,
  "database.port": 5432,
  "plugin.name": "pgoutput",
  "database.hostname": "10.92.43.42",
  "database.password": "P@ssword",
  "table.include.list": ["public.my_table"],

  "value.converter.schema.registry.url": "http://10.92.42.130:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://10.92.42.130:8081",
  "key.converter": "io.confluent.connect.avro.AvroConverter",

  "transforms": ["unwrap"],
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields": "op,source.schema,source.table,source.ts_ms,ts_ms",
  "transforms.unwrap.add.headers": "db",
  "transforms.unwrap.delete.handling.mode": "rewrite",

  "slot.name": "cdc__my_database",
  "schema.evolution": "basic"
}
Атрибут Описание

value.converter.schema.registry.url

Адрес сервера Schema-Registry

key.converter.schema.registry.url

Адрес сервера Schema-Registry

transforms

Список трансформаций, которые будут применяться к записям

transforms.unwrap.type

Класс, используемый для трансформации unwrap

transforms.unwrap.add.fields

Дополнительные поля, добавляемые к данным

transforms.unwrap.add.headers

Добавление метаданных в заголовок сообщения Kafka

transforms.unwrap.delete.handling.mode

Определяет, как обрабатывать события удаления. rewrite гарантирует, что информация об удаленных записях будет сохранена и передана в сообщении Kafka

table.include.list

Необязательный список регулярных выражений, разделенных запятыми, соответствующих идентификаторам таблиц, изменения которых необходимо захватить. Если это свойство задано, коннектор захватывает изменения только из указанных таблиц. Каждый идентификатор должен иметь форму <schemaName>.<tableName>, где:

  • <schemaName> — имя схемы базы данных, в которой произошло событие изменения;

  • <tableName> — имя таблицы базы данных, в которой произошло событие изменения.

slot.name

Имя логического слота декодирования PostgreSQL, созданного для потоковой передачи. Сервер использует этот слот для потоковой передачи событий на коннектор Debezium. Должен быть уникальным для каждого коннектора, подключаемого к базе

schema.evolution

Указывает режим эволюции схемы. При указании значения basic коннектор автоматически обнаруживает поля, которые находятся в полезной нагрузке события (новых изменениях), но не существуют в целевой таблице, и изменяет таблицу

Создание коннектора Iceberg

Для создания коннектора Iceberg через ADS Control используется плагин коннектора IcebergSinkConnector.

Для создания коннекторов при помощи ADS Control:

  1. Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.

  2. Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.

  3. Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.

  4. Выберите нужный коннектор для создания.

    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
    Выбор плагина для создания коннектора
  5. Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:

    Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.

    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    Пример содержимого JSON-файла конфигурации коннектора Iceberg
    {
      "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.cdc-field": "__op",
      "tasks.max": 1,
      "topics": [
        "cdc__demo.public.my_table"
      ],
      "iceberg.tables.upsert-mode-enabled": true,
      "iceberg.control.commit.interval-ms": 30000,
      "iceberg.catalog.uri": "thrift://10.92.43.153:9083",
      "iceberg.connect.hdfs.keytab": "/etc/security/keytabs/kafka-connect.service.keytab",
      "iceberg.tables.auto-create-enabled": true,
      "value.converter.schema.registry.url": "http://10.92.42.130:8081",
      "iceberg.connect.hdfs.principal": "kafka-connect/sov-ads-6.ru-central1.internal@ADS-KAFKA.LOCAL",
      "iceberg.tables": [
        "default.my_table"
      ],
      "name": "iceberg-sink-cdc-connector",
      "iceberg.tables.schema-case-insensitive": true,
      "iceberg.catalog.warehouse": "hdfs://adh/apps/hive/warehouse",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "iceberg.tables.default-id-columns": "id",
      "iceberg.hdfs.authentication.kerberos": true,
      "iceberg.catalog.type": "hive",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://10.92.42.130:8081",
      "iceberg.write.format.default": "parquet",
      "iceberg.hadoop-conf-dir": "/usr/lib/kafka/config/"
    }
    Атрибут Описание

    name

    Название коннектора, которое будет использоваться в сервисе Kafka Connect

    connector.class

    Имя класса для коннектора

    tasks.max

    Максимальное количество создаваемых задач

    topics

    Имя топика, данные которого должны будут переданы в таблицы Iceberg. Для реализации пайплайна CDC имя топика имеет форму <topic.prefix>.<table.include.list>, состоящую из параметров Debezium-коннектора, созданного выше

    iceberg.tables

    Список таблиц назначения, разделенных запятыми

    iceberg.write.format.default

    Формат файла по умолчанию для таблицы: parquet, avro или orc

    iceberg.tables.auto-create-enabled

    Установите значение true для автоматического создания целевых таблиц

    iceberg.tables.schema-case-insensitive

    Установите значение true для поиска столбцов таблицы по имени без учета регистра

    iceberg.tables.upsert-mode-enabled

    Установите значение true для включения UPSERT-режима

    iceberg.tables.cdc-field

    Поле исходной записи, которое идентифицирует тип операции (INSERT, UPDATE или DELETE) при включенном UPSERT-режиме

    iceberg.tables.default-id-columns

    Список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме

    iceberg.catalog.type

    Тип каталога для хранения таблиц Iceberg

    iceberg.catalog.uri

    Адрес для подключения к каталогу с таблицами Iceberg (адрес хоста с установленным компонентом Hive Metastore)

    iceberg.hdfs.authentication.kerberos

    Флаг для включения/выключения аутентификации в HDFS по протоколу Kerberos (по умолчанию выключен). Используется при включенной Kerberos-аутентификации в ADS и ADH. Доступен к использованию начиная с ADS 3.9.0.1.b1

    iceberg.connect.hdfs.principal

    Имя принципала Kerberos для использования в аутентификации. Является обязательным параметром при включенной Kerberos-аутентификации. Доступен к использованию начиная с ADS 3.9.0.1.b1

    iceberg.connect.hdfs.keytab

    Путь к keytab-файлу Kerberos, который содержит ключ для принципала. Является обязательным параметром при включенной Kerberos-аутентификации. Доступен к использованию начиная с ADS 3.9.0.1.b1

    iceberg.hadoop-conf-dir

    Путь к файлам конфигурации брокера Kafka

    iceberg.catalog.warehouse

    Путь к хранению метаданных

    key.converter

    Тип конвертера для ключа сообщения

    key.converter.schema.registry.url

    Адрес сервера Schema-Registry

    value.converter

    Тип конвертера для значения сообщения

    value.converter.schema.registry.url

    Адрес сервера Schema-Registry

    iceberg.control.commit.interval-ms

    Интервал между коммитами в мс

  6. После заполнения кликните Save и получите сообщение об успешном создании коннектора.

    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
  7. Проверьте, что на открывшейся странице отображается созданный коннектор в рабочем статусе.

    Созданный коннектор
    Созданный коннектор
    Созданный коннектор
    Созданный коннектор

    В случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки restart dark restart light, расположенной в поле Status задачи.

Данные таблицы PostgreSQL в ADS Control

На странице Topics пользовательского интерфейса ADS Control отображается топик со всеми полями и содержимым таблицы PostgreSQL, созданный коннектором Debezium.

Топик, созданный коннектором
Топик, созданный коннектором
Топик, созданный коннектором
Топик, созданный коннектором

На странице Schema-Registry пользовательского интерфейса ADS Control отображаются созданные в Schema-Registry схемы для ключей и значений таблицы.

Созданные схемы
Созданные схемы
Созданные схемы
Созданные схемы

На вкладке схемы отображается описание всех полей таблицы, включая поля, добавленные при помощи параметра transforms.unwrap.add.fields при создании коннектора Debezium.

Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL
Схема для таблицы PostgreSQL

В результате создания пайплайна данных в таблице коннекторов на странице обзора экземпляра Kafka Connect оба созданных коннектора имеют в столбце Topic имя одного и того же обрабатываемого топика.

Таблица созданных коннекторов в Kafka Connect
Таблица коннекторов с указанием имени топика
Таблица созданных коннекторов в Kafka Connect
Таблица коннекторов с указанием имени топика

Таблицы Iceberg

Для проверки сбора изменений коннектором Debezium добавьте запись в таблицу PostgreSQL:

INSERT INTO my_table(name, country) VALUES ('NAME','COUNTRY');

После подключения к пользовательскому интерфейсу сервиса HUE из состава кластера ADH можно просматривать таблицы Iceberg, созданные коннектором.

Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE
Таблицы Iceberg в интерфейсе HUE

Режим UPSERT

Начиная с ADS 3.9.0.1.b1 существует возможность указания в конфигурации Iceberg Sink Connector параметров для включения и настройки UPSERT-режима.

После включения UPSERT-режима (при помощи параметра коннектора iceberg.tables.upsert-mode-enabled) внесение любого изменения в таблицы Iceberg выполняется согласно следующим правилам:

  • Каждая операция является UPSERT — комбинацией операций DELETE и INSERT. Строка обновляется в таблице Iceberg с тем же идентификатором, что и во входящей записи. В случае, если строка не найдена, просто добавляется запись. Это позволяет вставлять, обновлять или удалять записи в одной операции без перезаписи данных.

  • Если в записи Kafka присутствует поле, где указан тип выполняемой операции (INSERT, UPDATE или DELETE) и этот тип операции указан в качестве значения параметра коннектора iceberg.tables.cdc-field, то эти операции выполняются независимо друг от друга (как они выполнялись бы без включения UPSERT-режима). При попытке новой записи коннектор сопоставит значение поля операции записи Kafka со значением параметра iceberg.tables.cdc-field и при совпадении применит данную операцию к поступающей строке. Функция указания типа операции может быть использована при получении данных из CDC-платформ.

    Пользователи также могут настроить значения параметра iceberg.tables.cdc-field, соответствующие различным операциям, применив следующие опции:

    • iceberg.tables.cdc.ops.insert

    • iceberg.tables.cdc.ops.update

    • iceberg.tables.cdc.ops.delete

  • Если в записи Kafka присутствует поле, где указан тип выполняемой операции (INSERT, UPDATE или DELETE) и этот тип операции указан в качестве значения параметра коннектора iceberg.tables.cdc.ops.ignored, то коннектор просто проигнорирует входящую запись.

ПРИМЕЧАНИЕ

В настоящее время операция TRUNCATE TABLE не поддерживается из-за ее воздействия на уровне таблицы и специфики внутренней архитектуры коннектора.

Ниже описаны параметры режима UPSERT, а также указана версия ADS, начиная с которой параметры могут быть применены.

Параметры режима UPSERT
Параметр Описание Значение по умолчанию Версия ADS

iceberg.tables.upsert-mode-enabled

Установите значение true для включения UPSERT-режима

false

3.9.0.1.b1

iceberg.tables.cdc-field

Поле исходной записи, которое идентифицирует тип операции (INSERT, UPDATE или DELETE)

__op

3.9.0.1.b1

iceberg.tables.default-id-columns

Список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в таблицах (primary key). Является обязательным параметром

 — 

3.9.0.1.b1

iceberg.tables.cdc.ops.insert

Разделенные запятыми значения поля операции CDC, соответствующие INSERT

r,c

3.9.1.1.b1

iceberg.tables.cdc.ops.update

Разделенные запятыми значения поля операции CDC, соответствующие UPDATE

u

3.9.1.1.b1

iceberg.tables.cdc.ops.delete

Разделенные запятыми значения поля операции CDC, соответствующие DELETE

d

3.9.1.1.b1

iceberg.tables.cdc.ops.ignored

Разделенные запятыми значения поля операции CDC, которые должны игнорироваться коннектором

t,m

3.9.1.1.b1

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней