Коннектор Debezium для PostgreSQL Server
В статье показан пример запуска и использования коннектора Debezium для PostgreSQL Server при помощи ADS Control. Коннектор предназначен для создания в топиках Kafka записей об изменениях на уровне строк в таблицах баз данных PostgreSQL.
Для данного примера в качестве PostgreSQL Server используется Arenadata Postgres (ADPG).
Предварительные требования
Для создания коннектора использовано следующее окружение:
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.
-
Сервисы Kafka и Kafka Connect установлены в кластере ADS.
-
Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.
-
Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.
-
Кластер ADPG развернут согласно руководству Online-установка.
-
IP-адрес PostgreSQL Server (кластер ADPG) —
10.92.40.129
. Для входящих подключений по умолчанию используется порт с номером5432
. -
На установленном PostgreSQL Server (кластер ADPG) выполнены подготовительные действия:
-
БД
books_store
существует в кластере ADPG. -
Пользователь с именем
user1
, привилегиямиSUPERUSER
и паролемpassword1
создан в БД. -
В БД созданы таблицы
book
иauthor
и в них добавлены несколько строк с данными. -
Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис Kafka Connect кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADPG добавлена запись об адресе хоста и пользователе:
host books_store user1 10.92.43.206/32 trust
-
В файле postgresql.conf изменено значение параметра wal_level с
replica
наlogical
— это значение добавляет информацию, требующуюся для поддержки логического декодирования, что необходимо для работы коннектора. Для этого в поле postgresql.conf custom section на странице конфигурационных параметров сервиса ADPG добавлена запись о параметре и новом значении.
-
Создание коннектора Debezium для PostgreSQL Server
Для создания коннектора Debezium для PostgreSQL Server через ADS Control используется плагин коннектора PostgresConnector.
Для создания коннекторов при помощи ADS Control:
-
Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.
-
Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.
-
Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.
-
Выберите нужный коннектор для создания.
Выбор коннектора Kafka Connect для созданияВыбор коннектора Kafka Connect для создания -
Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:
-
конфигурации сервиса Kafka Connect в статье Конфигурационные параметры ADS;
Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.
Конфигурация коннектораКонфигурация коннектораJSON-файл конфигурации коннектораJSON-файл конфигурации коннектораПример содержимого JSON-файла простой конфигурации Debezium-коннектора для PostgreSQL Server{ "name": "PostgresConnector", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "10.92.40.129", "database.port": "5432", "database.user": "user1", "database.password": "password1", "database.dbname": "books_store", "topic.prefix": "postgres", "plugin.name": "pgoutput", "publication.autocreate.mode": "filtered", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }
Атрибут Описание name
Название коннектора, которое будет использоваться в сервисе Kafka Connect
connector.class
Имя класса для коннектора
tasks.max
Максимальное количество создаваемых задач
database.hostname
Адрес экземпляра PostgreSQL Server
database.port
Номер порта экземпляра PostgreSQL Server
database.user
Имя пользователя PostgreSQL Serverr
database.password
Пароль для пользователя PostgreSQL Server
database.names
Имя базы данных, из которой будут фиксироваться изменения
topic.prefix
Префикс топика для экземпляра/кластера PostgreSQL Server, который образует пространство имен и используется во всех именах топиков Kafka, куда осуществляет запись коннектор, а также именах схем Kafka Connect и пространствах имен соответствующей схемы Avro, когда используется Avro-конвертер
plugin.name
Имя модуля логического декодирования PostgreSQL, установленного на сервере PostgreSQL
publication.autocreate.mode
Указывает, создает ли коннектор публикацию и если да, то каким образом. Этот параметр применяется только в том случае, если потоки коннектора изменяются с помощью подключаемого модуля
pgoutput
key.converter
Тип конвертера для ключа сообщения
value.converter
Тип конвертера для значения сообщения
-
-
После заполнения кликните Save и получите сообщение об успешном создании коннектора.
Сообщение об успешном создании коннектораСообщение об успешном создании коннектора -
Проверьте, что в результате создания на странице <connector name> → Overview отображается созданный коннектор и задачи коннектора в рабочем статусе. Статус определяется в зависимости от индикатора перед названием коннектора/задачи:
-
— коннектор/задача запущены;
-
— коннектор/задача были административно приостановлены;
-
— ошибка в работе коннектора/задачи, или коннектор в статусе статусе
degraded
; -
— коннектор/задача еще не назначены процессу.
Созданный коннекторСозданный коннекторВ случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки
, расположенной в поле Status задачи.
-
Использование коннектора Debezium для PostgreSQL Server
При первом подключении к серверу или кластеру PostgreSQL коннектор создает согласованный снимок всех схем. После завершения создания моментального снимка коннектор непрерывно фиксирует изменения на уровне строк и передает их в топики Kafka.
На странице Topics пользовательского интерфейса ADS Control можно увидеть топики, созданные коннектором.


Для каждой таблицы коннектор записывает события для всех операций INSERT
, UPDATE
и DELETE
в отдельный топик Kafka, в случае приведенного примера — топики postgres.public.book
и postgres.public.author
.
Коннектор использует следующую форму имени топика для записи событий изменения: <topicPrefix>.<schemaName>.<tableName>
, где:
-
<topicPrefix>
— логическое имя сервера, указанное в свойстве конфигурацииtopic.prefix
; -
<schemaName>
— имя схемы базы данных, в которой произошло событие изменения; -
<tableName>
— имя таблицы базы данных, в которой произошло событие изменения.
ПРИМЕЧАНИЕ
Более подробную информацию о наименованиях топиков, создаваемых коннектором Debezium, можно найти в разделе Topic names документации коннектора Debezium для PostgreSQL Server. |
snapshot.mode
При помощи параметра snapshot.mode можно настроить момент, в который коннектор будет создавать снимки.