Debezium сonnector for PostgreSQL Server

The article shows an example of running and using the Debezium connector for PostgreSQL Server via ADS Control. The connector is designed to record events about row-level changes in PostgreSQL database tables to Kafka topics.

For this example, the PostgreSQL Server is Arenadata Postgres (ADPG).

Prerequisites

The following environment was used to create the connector:

  • The ADS cluster is installed according to the Online installation guide. The minimum ADS version is 3.6.2.2.b1.

  • Kafka and Kafka Connect services have been installed to the ADS cluster.

  • To automatically create a Kafka topic, the auto.create.topics.enable parameter is enabled in the server.properties group when configuring the Kafka service.

  • The ADS Control cluster is installed according to the guide Get started with Arenadata Streaming Control and integrated with the ADS cluster being used.

  • The ADPG cluster is installed according to the Online installation guide.

  • PostgreSQL Server IP address (ADPG cluster) is 10.92.40.129. For incoming connections, the default port number is 5432.

  • On the installed PostgreSQL Server (ADPG cluster), the following preparatory steps have been completed:

    • The books_store database exists in the ADPG cluster.

    • A user with the user1 name, privilege SUPERUSER, and password1 password are created in the books_store database.

    • The book and author tables were created in the database and several rows of data were added to them.

    • The pg_hba.conf file is configured to provide access to the user and the host on which the Kafka Connect service of the ADS cluster is installed. To do this, an entry about the host number and user is added to the PG_HBA field on the configuration page of the ADPG service:

      host    books_store  user1       10.92.43.206/32     trust
    • The wal_level parameter value in the postgresql.conf file is changed from replica to logical — this value adds information required to support logical decoding, which is necessary for the connector to work. To do this, an entry about the parameter and the new value has been added to the postgresql.conf custom section field on the configuration parameters page of the ADPG service.

Set permissions for the Debezium connector

 

Configuring PostgreSQL Server to run the Debezium connector when using the pgoutput plugin requires a database user who has the following privileges:

  • REPLICATION

  • LOGIN

  • CREATE in the database to add publications.

  • SELECT on tables to copy the original table data.

The user with the SUPERUSER privilege used in this example has all the necessary privileges, but it is recommended to use this privilege only in test mode. For server security, it is recommended to configure individual privileges for the user as described above.

More information about setting up user privileges for the Debezium connector can be found in the Setting up permissions section of the Debezium connector for PostgreSQL Server documentation.

Create a Debezium connector for PostgreSQL Server

To create a Debezium connector for PostgreSQL Server via ADS Control, use the PostgresConnector connector plugin.

To create connectors using the ADS Control, you need to:

  1. Go to the Kafka Connects page in the ADS Control web interface. The Kafka Connects page becomes available after selecting a cluster in the cluster management section and going to the desired tab on the General page.

  2. Select the desired cluster and go to the Kafka Connect instance overview page.

  3. Click Create Connector on the Kafka Connect instance overview page. After clicking Create Connector, the window for selecting the connector plugin Clusters → <cluster name> → Kafka Connects → <cluster name> connector → Kafka connector plugins opens.

  4. Select the desired connector to create.

    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
    Selecting the Kafka Connect connector to create
  5. Fill in the connector configuration parameters. If necessary, use the parameter information:

    You can use filling in the configuration in the form of a JSON file. To do this, enable the JSON view switch.

    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Connector configuration JSON file
    Example contents of a JSON file with a simple Debezium connector configuration for PostgreSQL Server
    {
        "name": "PostgresConnector",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "10.92.40.129",
        "database.port": "5432",
        "database.user": "user1",
        "database.password": "password1",
        "database.dbname": "books_store",
        "topic.prefix": "postgres",
        "plugin.name": "pgoutput",
        "publication.autocreate.mode": "filtered",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
    Attribute Description

    name

    The name of the connector that will be used in the Kafka Connect service

    connector.class

    Class name for the connector

    tasks.max

    Maximum number of created tasks

    database.hostname

    PostgreSQL Server instance address

    database.port

    PostgreSQL Server instance port number

    database.user

    PostgreSQL Server username

    database.password

    Password for PostgreSQL Server user

    database.names

    Name of the database from which changes will be captured

    topic.prefix

    Topic prefix for a PostgreSQL Server instance/cluster that forms a namespace and is used in all Kafka topic names that the connector writes to, Kafka Connect schema names, and the corresponding Avro schema namespaces when using the Avro Converter

    plugin.name

    Name of the PostgreSQL logical decoding module installed on the PostgreSQL server

    publication.autocreate.mode

    Indicates whether and how the connector creates a publication. This option only applies if the connector streams are modified using the pgoutput plugin

  6. After filling, click Save and get a message about the successful creation of the connector.

    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
    Message about the successful creation of the connector
  7. Check that as a result of creation, the <connector name> → Overview page displays the created connector and the connector tasks in working status. The status is determined depending on the indicator in front of the connector/task name:

    • green — the connector/task is running;

    • yellow — the connector/task has been administratively paused;

    • red — the connector/task has failed (usually by raising an exception, which is reported in the status output);

    • unassigned — the connector/task has not yet been assigned to a worker.

    Created connector
    Created connector
    Created connector
    Created connector

    If after creating the connector the task is created with an error, the contents of the error can be seen after clicking restart dark restart light, located in the Status field of the task.

Use the Debezium connector for PostgreSQL Server

The first time you connect to a PostgreSQL server or cluster, the connector creates a consistent snapshot of all schemas. Once the snapshot is complete, the connector continuously captures row-level changes and pushes them to Kafka topics.

On the Topics page of the ADS Control user interface, you can see the topics created by the connector.

Topics created by the connector
Topics created by the connector
Topics created by the connector
Topics created by the connector

For each table, the connector writes events for all INSERT, UPDATE, and DELETE operations to a separate Kafka topic, in the case of the above example — the postgres.public.book and postgres.public.author topics.

The connector uses the following form of the topic name to record change events: <topicPrefix>.<schemaName>.<tableName>, where:

  • <topicPrefix> — the logical name of the server specified in the topic.prefix configuration property;

  • <schemaName> — the name of the database schema in which the change event occurred;

  • <tableName> — the name of the database table in which the change event occurred.

NOTE

More detailed information about the topic names created by the Debezium connector can be found in the section Topic names of the Debezium for PostgreSQL Server connector documentation.

snapshot.mode

Using the snapshot.mode parameter, you can configure the moment at which the connector takes snapshots.

Found a mistake? Seleсt text and press Ctrl+Enter to report it