Коннектор Debezium для PostgreSQL Server

В статье показан пример запуска и использования коннектора Debezium для PostgreSQL Server при помощи ADS Control. Коннектор предназначен для создания в топиках Kafka записей об изменениях на уровне строк в таблицах баз данных PostgreSQL.

Для данного примера в качестве PostgreSQL Server используется Arenadata Postgres (ADPG).

Предварительные требования

Для создания коннектора использовано следующее окружение:

  • Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.

  • Сервисы Kafka и Kafka Connect установлены в кластере ADS.

  • Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

  • Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.

  • Кластер ADPG развернут согласно руководству Online-установка.

  • IP-адрес PostgreSQL Server (кластер ADPG) — 10.92.40.129. Для входящих подключений по умолчанию используется порт с номером 5432.

  • На установленном PostgreSQL Server (кластер ADPG) выполнены подготовительные действия:

    • БД books_store существует в кластере ADPG.

    • Пользователь с именем user1, привилегиями SUPERUSER и паролем password1 создан в БД.

    • В БД созданы таблицы book и author и в них добавлены несколько строк с данными.

    • Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис Kafka Connect кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADPG добавлена запись об адресе хоста и пользователе:

      host    books_store  user1       10.92.43.206/32     trust
    • В файле postgresql.conf изменено значение параметра wal_level с replica на logical — это значение добавляет информацию, требующуюся для поддержки логического декодирования, что необходимо для работы коннектора. Для этого в поле postgresql.conf custom section на странице конфигурационных параметров сервиса ADPG добавлена запись о параметре и новом значении.

Настройка разрешений для коннектора Debezium

 

Для настройки PostgreSQL Server для запуска коннектора Debezium при использовании плагина pgoutput требуется пользователь базы данных, который имеет следующие привилегии:

  • REPLICATION

  • LOGIN

  • CREATE в базе данных для добавления публикаций.

  • SELECT в таблицах для копирования исходных данных таблицы.

Пользователь с привилегией SUPERUSER, использующийся в данном примере, обладает всеми необходимыми привилегиями, но такую привилегию рекомендуется использовать только в тестовом режиме. Для безопасности сервера рекомендуется настроить для пользователя отдельные привилегии, описанные выше.

Подробнее о настройке привилегий пользователя для коннектора Debezium можно найти в разделе Setting up permissions документации коннектора Debezium для PostgreSQL Server.

Создание коннектора Debezium для PostgreSQL Server

Для создания коннектора Debezium для PostgreSQL Server через ADS Control используется плагин коннектора PostgresConnector.

Для создания коннекторов при помощи ADS Control:

  1. Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.

  2. Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.

  3. Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.

  4. Выберите нужный коннектор для создания.

    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
    Выбор коннектора Kafka Connect для создания
  5. Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:

    Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.

    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    Конфигурация коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    JSON-файл конфигурации коннектора
    Пример содержимого JSON-файла простой конфигурации Debezium-коннектора для PostgreSQL Server
    {
        "name": "PostgresConnector",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "10.92.40.129",
        "database.port": "5432",
        "database.user": "user1",
        "database.password": "password1",
        "database.dbname": "books_store",
        "topic.prefix": "postgres",
        "plugin.name": "pgoutput",
        "publication.autocreate.mode": "filtered",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
    Атрибут Описание

    name

    Название коннектора, которое будет использоваться в сервисе Kafka Connect

    connector.class

    Имя класса для коннектора

    tasks.max

    Максимальное количество создаваемых задач

    database.hostname

    Адрес экземпляра PostgreSQL Server

    database.port

    Номер порта экземпляра PostgreSQL Server

    database.user

    Имя пользователя PostgreSQL Serverr

    database.password

    Пароль для пользователя PostgreSQL Server

    database.names

    Имя базы данных, из которой будут фиксироваться изменения

    topic.prefix

    Префикс топика для экземпляра/кластера PostgreSQL Server, который образует пространство имен и используется во всех именах топиков Kafka, куда осуществляет запись коннектор, а также именах схем Kafka Connect и пространствах имен соответствующей схемы Avro, когда используется Avro-конвертер

    plugin.name

    Имя модуля логического декодирования PostgreSQL, установленного на сервере PostgreSQL

    publication.autocreate.mode

    Указывает, создает ли коннектор публикацию и если да, то каким образом. Этот параметр применяется только в том случае, если потоки коннектора изменяются с помощью подключаемого модуля pgoutput

    key.converter

    Тип конвертера для ключа сообщения

    value.converter

    Тип конвертера для значения сообщения

  6. После заполнения кликните Save и получите сообщение об успешном создании коннектора.

    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
    Сообщение об успешном создании коннектора
  7. Проверьте, что в результате создания на странице <connector name> → Overview отображается созданный коннектор и задачи коннектора в рабочем статусе. Статус определяется в зависимости от индикатора перед названием коннектора/задачи:

    • green — коннектор/задача запущены;

    • yellow — коннектор/задача были административно приостановлены;

    • red — ошибка в работе коннектора/задачи, или коннектор в статусе статусе degraded;

    • unassigned — коннектор/задача еще не назначены процессу.

    Созданный коннектор
    Созданный коннектор
    Созданный коннектор
    Созданный коннектор

    В случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки restart dark restart light, расположенной в поле Status задачи.

Использование коннектора Debezium для PostgreSQL Server

При первом подключении к серверу или кластеру PostgreSQL коннектор создает согласованный снимок всех схем. После завершения создания моментального снимка коннектор непрерывно фиксирует изменения на уровне строк и передает их в топики Kafka.

На странице Topics пользовательского интерфейса ADS Control можно увидеть топики, созданные коннектором.

Топики, созданные коннектором
Топики, созданные коннектором
Топики, созданные коннектором
Топики, созданные коннектором

Для каждой таблицы коннектор записывает события для всех операций INSERT, UPDATE и DELETE в отдельный топик Kafka, в случае приведенного примера — топики postgres.public.book и postgres.public.author.

Коннектор использует следующую форму имени топика для записи событий изменения: <topicPrefix>.<schemaName>.<tableName>, где:

  • <topicPrefix> — логическое имя сервера, указанное в свойстве конфигурации topic.prefix;

  • <schemaName> — имя схемы базы данных, в которой произошло событие изменения;

  • <tableName> — имя таблицы базы данных, в которой произошло событие изменения.

ПРИМЕЧАНИЕ

Более подробную информацию о наименованиях топиков, создаваемых коннектором Debezium, можно найти в разделе Topic names документации коннектора Debezium для PostgreSQL Server.

snapshot.mode

При помощи параметра snapshot.mode можно настроить момент, в который коннектор будет создавать снимки.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней