Коннектор Debezium для MS SQL Server
В статье показан пример запуска и использования коннектора Debezium для MS SQL Server при помощи ADS Control. Коннектор предназначен для создания в топиках Kafka записей о событиях изменения таблиц баз данных Microsoft SQL Server.
Предварительные требования
Для создания коннектора использовано следующее окружение:
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.6.2.2.b1.
-
Сервисы Kafka и Kafka Connect установлены в кластере ADS.
-
Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.
-
Кластер ADS Control развернут согласно руководству Установка Arenadata Streaming Control и интегрирован с использующимся кластером ADS.
-
На хосте с операционной системой Ubuntu 20.04 установлен MS SQL Server, где для пользователя
sa
задан парольp@ssword123!
и под этим пользователем созданы база данных c именемtestDB1
и таблица с именемdbo.customers
. -
IP-адрес MS SQL Server —
10.92.40.239
. Для входящих подключений по умолчанию используется порт с номером5432
. -
Чтобы разрешить коннектору Debezium фиксировать записи событий изменений для операций базы данных, включено отслеживание данных об изменениях (Change Data Capture, CDC):
-
для таблицы, изменения в которой должны отслеживаться коннектором.
Создание коннектора Debezium для MS SQL Server
Для создания коннектора Debezium для MS SQL Server через ADS Control используется плагин коннектора SqlServerConnector.
Для создания коннекторов при помощи ADS Control:
-
Перейдите на страницу Kafka Connects в web-интерфейсе ADS Control. Страница Kafka Connects становится доступна после выбора кластера в секции управления кластерами и перехода на нужную вкладку на странице General.
-
Выберите нужный кластер и перейдите на страницу обзора экземпляра Kafka Connect.
-
Нажмите кнопку Create Connector на странице обзора экземпляра Kafka Connect. После нажатия кнопки Create Connector открывается окно выбора плагина коннектора Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins.
-
Выберите нужный коннектор для создания.
Выбор коннектора Kafka Connect для созданияВыбор коннектора Kafka Connect для создания -
Заполните параметры конфигурации коннектора. При необходимости воспользуйтесь информацией о параметрах:
-
конфигурации сервиса Kafka Connect в статье Конфигурационные параметры ADS;
Вы можете использовать заполнение конфигурации в виде файла JSON. Для этого включите переключатель JSON view.
Конфигурация коннектораКонфигурация коннектораJSON-файл конфигурации коннектораJSON-файл конфигурации коннектораПример содержимого JSON-файла простой конфигурации Debezium-коннектора для MS SQL Server{ "name": "SQLServerConnector", "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "10.92.40.239", "tasks.max": "1", "database.port": "1433", "database.user": "sa", "database.password": "p@ssword123!", "database.names": "testDB1", "topic.prefix": "sql", "table.include.list": "dbo.customers", "schema.history.internal.kafka.bootstrap.servers": "10.92.42.28:9092", "schema.history.internal.kafka.topic": "schemahistory.sql", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "database.encrypt":"false" }
Атрибут Описание name
Название коннектора, которое будет использоваться в сервисе Kafka Connect
connector.class
Имя класса для коннектора
tasks.max
Максимальное количество создаваемых задач
database.hostname
Адрес экземпляра SQL Server
database.port
Номер порта экземпляра SQL Server
database.user
Имя пользователя SQL Server
database.password
Пароль для пользователя SQL Server
database.names
Имя базы данных, из которой будут фиксироваться изменения
topic.prefix
Префикс топика для экземпляра/кластера SQL Server, который образует пространство имен и используется во всех именах топиков Kafka, куда осуществляет запись коннектор, а также именах схем Kafka Connect и пространствах имен соответствующей схемы Avro, когда используется Avro-конвертер
table.include.list
Список всех таблиц, изменения которых должен фиксировать Debezium
schema.history.internal.kafka.bootstrap.servers
Список брокеров Kafka, которые этот коннектор будет использовать для записи и восстановления операторов DDL в топик истории схемы базы данных
schema.history.internal.kafka.topic
Имя партиции топика истории схемы базы данных, в которой коннектор будет записывать и восстанавливать операторы DDL. Этот топик предназначен только для внутреннего использования и не должен использоваться потребителями
key.converter
Тип конвертера для ключа сообщения
value.converter
Тип конвертера для значения сообщения
database.encrypt
Отключает шифрование соединения при указании значения
false
(шифрование включено по умолчанию в 18 версии MS SQL Server и новее).Если шифрование не отключено, обязательно указание параметров:
database.ssl.truststore
иdatabase.ssl.truststore.password
-
-
После заполнения кликните Save и получите сообщение об успешном создании коннектора.
Сообщение об успешном создании коннектораСообщение об успешном создании коннектора -
Проверьте, что в результате создания на странице <connector name> → Overview отображается созданный коннектор и задачи коннектора в рабочем статусе. Статус определяется в зависимости от индикатора перед названием коннектора/задачи:
-
— коннектор/задача запущены;
-
— коннектор/задача были административно приостановлены;
-
— ошибка в работе коннектора/задачи, или коннектор в статусе статусе
degraded
; -
— коннектор/задача еще не назначены процессу.
Созданный коннекторСозданный коннекторВ случае, если после создания коннектора задача создана с ошибкой, сообщение об ошибке можно увидеть после нажатия иконки
, расположенной в поле Status задачи.
-
Использование коннектора Debezium для MS SQL Server
При первом подключении коннектора Debezium к базе данных SQL Server он создает согласованный снимок (снепшот) схем в базе данных.
После создания исходного моментального снимка коннектор непрерывно фиксирует изменения на уровне строк для операций INSERT
, UPDATE
или DELETE
. Коннектор создает события для каждой операции изменения данных и передает их в топики Kafka.
На странице Topics пользовательского интерфейса ADS Control можно увидеть топики, созданные коннектором.


Топик для записи событий изменения
Для каждой таблицы коннектор записывает события для всех операций INSERT
, UPDATE
и DELETE
в отдельный топик Kafka, в случае приведенного примера — топик sql.testDB1.dbo.customers
.
Коннектор использует следующую форму имени топика для записи событий изменения: <topicPrefix>.<schemaName>.<tableName>
, где:
-
<topicPrefix>
— логическое имя сервера, указанное в свойстве конфигурацииtopic.prefix
; -
<schemaName>
— имя схемы базы данных, в которой произошло событие изменения; -
<tableName>
— имя таблицы базы данных, в которой произошло событие изменения.
Топик для записи изменений схемы
Для каждой таблицы, для которой включен CDC, коннектор Debezium сохраняет историю событий изменения схемы, которые применяются к таблицам в базе данных. Коннектор записывает события изменения схемы в отдельный топик Kafka, в случае приведенного примера — sql
. Это имя совпадает с параметром topicPrefix
, заданным при создании коннектора.
ПРИМЕЧАНИЕ
Более подробную информацию о наименованиях топиков, создаваемых коннектором Debezium, можно найти в разделе Topic names документации коннектора Debezium для MS SQL Server. |
snapshot.mode
При помощи параметра snapshot.mode можно настроить момент, в который коннектор будет создавать снимки.