Debezium сonnector for PostgreSQL Server
The article shows an example of running and using the Debezium connector for PostgreSQL Server via ADS Control. The connector is designed to record events about row-level changes in PostgreSQL database tables to Kafka topics.
For this example, the PostgreSQL Server is Arenadata Postgres (ADPG).
Prerequisites
The following environment was used to create the connector:
-
The ADS cluster is installed according to the Online installation guide. The minimum ADS version is 3.6.2.2.b1.
-
Kafka and Kafka Connect services have been installed to the ADS cluster.
-
To automatically create a Kafka topic, the auto.create.topics.enable parameter is enabled in the server.properties group when configuring the Kafka service.
-
The ADS Control cluster is installed according to the guide Get started with Arenadata Streaming Control and integrated with the ADS cluster being used.
-
The ADPG cluster is installed according to the Online installation guide.
-
PostgreSQL Server IP address (ADPG cluster) is
10.92.40.129
. For incoming connections, the default port number is5432
. -
On the installed PostgreSQL Server (ADPG cluster), the following preparatory steps have been completed:
-
The
books_store
database exists in the ADPG cluster. -
A user with the
user1
name, privilegeSUPERUSER
, andpassword1
password are created in thebooks_store
database. -
The
book
andauthor
tables were created in the database and several rows of data were added to them. -
The pg_hba.conf file is configured to provide access to the user and the host on which the Kafka Connect service of the ADS cluster is installed. To do this, an entry about the host number and user is added to the PG_HBA field on the configuration page of the ADPG service:
host books_store user1 10.92.43.206/32 trust
-
The wal_level parameter value in the postgresql.conf file is changed from
replica
tological
— this value adds information required to support logical decoding, which is necessary for the connector to work. To do this, an entry about the parameter and the new value has been added to the postgresql.conf custom section field on the configuration parameters page of the ADPG service.
-
Create a Debezium connector for PostgreSQL Server
To create a Debezium connector for PostgreSQL Server via ADS Control, use the PostgresConnector connector plugin.
To create connectors using the ADS Control, you need to:
-
Go to the Kafka Connects page in the ADS Control web interface. The Kafka Connects page becomes available after selecting a cluster in the cluster management section and going to the desired tab on the General page.
-
Select the desired cluster and go to the Kafka Connect instance overview page.
-
Click Create Connector on the Kafka Connect instance overview page. After clicking Create Connector, the window for selecting the connector plugin Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins opens.
-
Select the desired connector to create.
Selecting the Kafka Connect connector to createSelecting the Kafka Connect connector to create -
Fill in the connector configuration parameters. If necessary, use the parameter information:
-
Kafka Connect service configurations in the ADS configuration parameters article;
You can use filling in the configuration in the form of a JSON file. To do this, enable the JSON view switch.
Connector configurationConnector configurationConnector configuration JSON fileConnector configuration JSON fileExample contents of a JSON file with a simple Debezium connector configuration for PostgreSQL Server{ "name": "PostgresConnector", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "10.92.40.129", "database.port": "5432", "database.user": "user1", "database.password": "password1", "database.dbname": "books_store", "topic.prefix": "postgres", "plugin.name": "pgoutput", "publication.autocreate.mode": "filtered", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }
Attribute Description name
The name of the connector that will be used in the Kafka Connect service
connector.class
Class name for the connector
tasks.max
Maximum number of created tasks
database.hostname
PostgreSQL Server instance address
database.port
PostgreSQL Server instance port number
database.user
PostgreSQL Server username
database.password
Password for PostgreSQL Server user
database.names
Name of the database from which changes will be captured
topic.prefix
Topic prefix for a PostgreSQL Server instance/cluster that forms a namespace and is used in all Kafka topic names that the connector writes to, Kafka Connect schema names, and the corresponding Avro schema namespaces when using the Avro Converter
plugin.name
Name of the PostgreSQL logical decoding module installed on the PostgreSQL server
publication.autocreate.mode
Indicates whether and how the connector creates a publication. This option only applies if the connector streams are modified using the
pgoutput
plugin -
-
After filling, click Save and get a message about the successful creation of the connector.
Message about the successful creation of the connectorMessage about the successful creation of the connector -
Check that as a result of creation, the <connector name> → Overview page displays the created connector and the connector tasks in working status. The status is determined depending on the indicator in front of the connector/task name:
-
— the connector/task is running;
-
— the connector/task has been administratively paused;
-
— the connector/task has failed (usually by raising an exception, which is reported in the status output);
-
— the connector/task has not yet been assigned to a worker.
Created connectorCreated connectorIf after creating the connector the task is created with an error, the contents of the error can be seen after clicking , located in the Status field of the task.
-
Use the Debezium connector for PostgreSQL Server
The first time you connect to a PostgreSQL server or cluster, the connector creates a consistent snapshot of all schemas. Once the snapshot is complete, the connector continuously captures row-level changes and pushes them to Kafka topics.
On the Topics page of the ADS Control user interface, you can see the topics created by the connector.
For each table, the connector writes events for all INSERT
, UPDATE
, and DELETE
operations to a separate Kafka topic, in the case of the above example — the postgres.public.book
and postgres.public.author
topics.
The connector uses the following form of the topic name to record change events: <topicPrefix>.<schemaName>.<tableName>
, where:
-
<topicPrefix>
— the logical name of the server specified in thetopic.prefix
configuration property; -
<schemaName>
— the name of the database schema in which the change event occurred; -
<tableName>
— the name of the database table in which the change event occurred.
NOTE
More detailed information about the topic names created by the Debezium connector can be found in the section Topic names of the Debezium for PostgreSQL Server connector documentation. |
snapshot.mode
Using the snapshot.mode parameter, you can configure the moment at which the connector takes snapshots.