ADB to Kafka Connector usage examples

This article describes how to transfer data from ADB to Kafka (as a part of ADS) via ADB to Kafka Connector. The following prerequisites are met:

  • The ADS cluster is installed according to the Online installation guide on the following hosts: bds-ads1, bds-ads2, bds-ads3.

  • The ADB cluster is installed according to the Online installation guide on the following hosts: bds-mdw, bds-sdw1, bds-sdw2.

  • In ADB, the PXF and ADB to Kafka services are added and installed.

  • The adb database exists in ADB.

  • All ADB nodes have access to the 9092 port on all ADS nodes through which you plan to load data.

NOTE

Since ADB to Kafka Connector uses the AVRO OCF format for data serialization, you cannot deserialize data using kafka-avro-console-consumer. In all examples mentioned above, the kafka-console-consumer.sh script is used, which returns the schema of data obtained from ADB and the data itself in the HEX representation. Deserialization can be done with the Kafka to ADB Connector (see Kafka to ADB Connector usage examples).

PLAINTEXT security protocol (by default)

Without SERVER option

  1. Connect to any ADS host where the Kafka service is installed. Go to the root directory and create a test topic via the following command:

    $ bin/kafka-topics.sh --create --topic topic_adb_to_kafka --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092

    The result:

    Created topic topic_adb_to_kafka.
    TIP
    • For more information on adding and reading Kafka topics, see Quick start with Kafka in the ADS documentation.

    • Topic creation is optional. If you specify a non-existent topic in step 3, it will be created automatically unless the TOPIC_AUTO_CREATE_FLAG value is changed to false.

  2. Connect to the adb database on ADB master under the gpadmin user (for example, via psql):

    $ sudo su - gpadmin
    $ psql adb
  3. Create an external table with the following structure:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  4. Insert some data into the external table:

    INSERT INTO ext_adb_to_kafka VALUES
    (1,'test1'),
    (2,'test2'),
    (3,'test3'),
    (4,'test4'),
    (5,'test5');
  5. In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run bin/kafka-console-consumer.sh:

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true

    The command output contains information about 5 messages (with offset number):

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E
    Offset:1        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4
    Offset:2        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr
    Offset:3        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13
    Offset:4        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809
    ^CProcessed a total of 5 messages

With SERVER option

  1. Perform steps 1-6 that are described in the Use the SERVER option section of the ADB to Kafka Connector configuration article. As <server_name>, use kafka_test.

  2. In ADB, create an external table ext_adb_to_kafka2. Unlike the table from the previous example, specify the SERVER option:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka2 (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka?PROFILE=kafka&SERVER=kafka_test'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  3. Insert some data into the ext_adb_to_kafka2 table:

    INSERT INTO ext_adb_to_kafka2 VALUES
    (1,'test1');
  4. In ADS, check that data from ADB has been successfully added to the topic_adb_to_kafka topic:

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true

    The command output contains 6 messages:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E
    Offset:1        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4
    Offset:2        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr
    Offset:3        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13
    Offset:4        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809
    Offset:5        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs\x02\x12\x02\x02\x02\x0Atest1%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs
    ^CProcessed a total of 6 messages

SASL_PLAINTEXT security protocol

PLAIN mechanism

  1. Connect to any ADS host where the Kafka service is installed. Configure SASL_PLAINTEXT authentication for ADS cluster according to the SASL PLAINTEXT article.

  2. Ensure that Kafka authentication is configured correctly by following steps from the Check the installed SASL PLAINTEXT section.

  3. Create a new user adb-to-kafka with password 123 in ADS by following steps from the Create a user section.

  4. According to the User authentication in the Kafka environment section, update the /etc/kafka/conf/client.properties file contents:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    # Uncomment and set necessary username/password
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="adb-to-kafka" \
      password="123";
  5. Connect to the adb database on ADB master under the gpadmin user (for example, via psql):

    $ sudo su - gpadmin
    $ psql adb
  6. Create an external table with the following structure in ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_sasl (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka_sasl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=PLAIN&SASL_USER=adb-to-kafka&SASL_USER_PASSWORD=123'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  7. Insert some data into the ext_adb_to_kafka_sasl external table:

    INSERT INTO ext_adb_to_kafka_sasl VALUES
    (1,'test1');
  8. In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run bin/kafka-console-consumer.sh.

    NOTE

    Unlike the examples mentioned above, when using the SASL_PLAINTEXT protocol, you should fill in the --consumer.config option. Use it to specify a path to the client.properties file described in step 4 above.

    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_sasl --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /etc/kafka/conf/client.properties

    The command output contains one message:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81\x02\x12\x02\x02\x02\x0Atest1c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81
    ^CProcessed a total of 1 messages
Authentication errors

 

If user credentials are not specified correctly in the external table definition, the following error occurs when you try to add data to that table:

ERROR:  PXF server error : Cannot acquire list of topics for server: bds-ads1:9092,bds-ads2:9092,bds-ads3:9092!  (seg5 10.92.40.70:10001 pid=15516)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw2' or 'set client_min_messages=LOG' for additional details.

To find out the original error, you can view the /var/lib/pxf/logs/pxf-service.log file on the specified segment host under the gpadmin user:

$ sudo su - gpadmin
$ tail /var/lib/pxf/logs/pxf-service.log

The result indicates a failed authentication attempt:

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

GSSAPI mechanism (Kerberos)

CAUTION

If you configured SASL_PLAINTEXT authentication for your ADS cluster according to the SASL PLAINTEXT article (e.g. when running the previous example), you should disable it before ADS kerberization. To do this, deactivate the SASL_PLAINTEXT Authentication switcher on the ADS cluster configuration page and apply changes by clicking Save.

  1. Configure MIT Kerberos KDC on a separate host (bds-mit-kerberos.ru-central1.internal in the current example) and kerberize your ADS cluster according to the MIT Kerberos article. As a realm name, use ADS-KAFKA.LOCAL.

  2. Ensure that Kafka authentication is configured correctly by following steps from the Check installed Kerberos SASL section.

  3. On the host where MIT Kerberos KDC is deployed, create a new principal adb-to-kafka, which will be used by ADB to Kafka Connector. Specify 123 as a password:

    $ sudo kadmin.local -q "add_principal -pw 123 adb-to-kafka@ADS-KAFKA.LOCAL"

    The result:

    Principal "adb-to-kafka@ADS-KAFKA.LOCAL" created.
  4. According to the Create a configuration file .properties for the user section, create the /tmp/client.properties file in ADS.

  5. Follow the steps from the Create a JAAS file for a user section.

  6. In ADB, install the following packages on master and segment hosts:

    $ sudo yum install -y krb5-libs krb5-server krb5-workstation openldap-clients cyrus-sasl-gssapi
  7. On ADB master and each segment host, edit the /etc/krb5.conf file contents in the same way as you configured it for MIT Kerberos KDC:

    $ sudo vi /etc/krb5.conf

    The file contents:

    [logging]
     default = FILE:/var/log/krb5libs.log
     kdc = FILE:/var/log/krb5kdc.log
     admin_server = FILE:/var/log/kadmind.log
    
    [libdefaults]
     default_realm = ADS-KAFKA.LOCAL
     kdc_timesync = 1
     ticket_lifetime = 24h
    
    [realms]
     ADS-KAFKA.LOCAL = {
     admin_server = bds-mit-kerberos.ru-central1.internal
     kdc = bds-mit-kerberos.ru-central1.internal
     }
  8. On ADB master and each segment host, edit the /var/kerberos/krb5kdc/kdc.conf file contents in the same way as you configured it for MIT Kerberos KDC:

    $ sudo vi /var/kerberos/krb5kdc/kdc.conf

    The file contents:

    [kdcdefaults]
     kdc_ports = 88
     kdc_tcp_ports = 88
     default_realm = ADS-KAFKA.LOCAL
    
    [realms]
     ADS-KAFKA.LOCAL = {
      #master_key_type = aes256-cts
      acl_file = /var/kerberos/krb5kdc/kadm5.acl
      dict_file = /usr/share/dict/words
      admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
      supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
     }
  9. On ADB master, create a new directory to store keytab files. Assign an owner and necessary permissions to that directory:

    $ sudo mkdir /var/lib/pxf/conf/kerberos
    $ sudo chmod 744 /var/lib/pxf/conf/kerberos
    $ sudo chown gpadmin:gpadmin /var/lib/pxf/conf/kerberos
  10. Connect to ADB master under the gpadmin user:

    $ sudo su - gpadmin
  11. On ADB master, run the interactive utility ktutil:

    $ ktutil
  12. In the console that opens, run the following commands one by one (after the ktutil: prompt). After running each command, you should specify a password for the adb-to-kafka principal (123 in the current example):

    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes128-cts-hmac-sha1-96
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes256-cts-hmac-sha1-96
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e arcfour-hmac
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia256-cts-cmac
    add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1  -e camellia128-cts-cmac
  13. As the last command, send the following request to store a keytab file with the entered data. Then, type q to quit ktutil:

    write_kt /var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab
  14. To copy adb-to-kafka.service.keytab to all ADB segment hosts, run the following commands on master in the given order:

    • PXF synchronization:

      $ pxf cluster sync
    • PXF restart:

      $ pxf cluster restart
  15. Check the adb-to-kafka principal by sequentially running the following commands. When running the kinit command, you need to specify the principal password (123 in the current example):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ klist

    The result indicates that a valid ticket is present:

    Ticket cache: FILE:/tmp/krb5cc_2042
    Default principal: adb-to-kafka@ADS-KAFKA.LOCAL
    
    Valid starting       Expires              Service principal
    12/14/2023 18:13:16  12/15/2023 18:13:16  krbtgt/ADS-KAFKA.LOCAL@ADS-KAFKA.LOCAL
  16. Connect to the adb database on ADB master (for example, via psql):

    $ psql adb
  17. Create an external table with the following structure in ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_kerberos (a INT, b TEXT)
    LOCATION (
        'pxf://topic_adb_to_kafka_kerberos?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  18. Insert some data into the ext_adb_to_kafka_kerberos table:

    INSERT INTO ext_adb_to_kafka_kerberos VALUES
    (1,'kerberos');
  19. In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run the following commands. When running the kinit command, specify a password for the adb-to-kafka principal (123 in the current example):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas"
    $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_kerberos --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client.properties

    The command output contains one message:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3\x02\x18\x02\x02\x02\x10kerberos\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3
    ^C[2023-12-14 18:18:35,250] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
    Processed a total of 1 messages
NOTE

In the --consumer.config option, specify a path to the client.properties file that is described in step 4 above.

Authentication errors

 

If a Kerberos principal or keytab path are not specified correctly in the external table definition, the following error occurs when you try to add data to that table:

ERROR:  PXF server error : Failed to create new KafkaAdminClient  (seg4 10.92.40.70:10000 pid=25863)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw2' or 'set client_min_messages=LOG' for additional details.

Details can be found in PXF logs.

SASL_SSL security protocol with GSSAPI mechanism (Kerberos)

NOTE

Actions with certificates that are described in steps 2-8 can be automated via a bash script (see generate.sh in SSL channel security). However, the script is not recommended for a production environment.

  1. Perform steps 1-15 that are described in the GSSAPI mechanism (Kerberos) section (if you have not passed them yet). Ensure that a valid ticket exists for the adb-to-kafka principal on the ADB side.

  2. Generate an RSA key pair and create keystore.jks on each ADB and ADS host:

    $ keytool -genkeypair -noprompt -keyalg RSA -ext san=dns:<hostname> -alias <hostname> -dname "CN=<hostname>, OU=AD, O=AD, L=MSK, S=MO, C=RU" -keystore /tmp/keystore.jks -storepass bigdata -keypass bigdata -validity 360 -keysize 2048

    where <hostname> — FQDN of the host where the command runs.

  3. To export SSL certificates, run the following command on each ADB and ADS host:

    $ keytool -exportcert -file /tmp/<hostname>.crt -keystore /tmp/keystore.jks -storepass bigdata -alias <hostname> -rfc

    where <hostname> — FQDN of the host where the command runs.

    The result is below (for the host with FQDN bds-mdw.ru-central1.internal):

    Certificate stored in file </tmp/bds-mdw.ru-central1.internal.crt>
  4. Copy certificates from each ADB and ADS host to the local machine. The following is an example for Windows:

    $ pscp <user>@<hostname>:/tmp/<hostname>.crt /<local_folder>/

    where:

    • <user> — a username that is used to connect to the ADB/ADS host via SSH;

    • <hostname> — an FQDN of the ADB/ADS host;

    • <local_folder> — a directory that is created on the local machine to store all certificates.

    For example, for FQDN bds-mdw.ru-central1.internal the command can look like this:

    $ pscp dasha@bds-mdw:/tmp/bds-mdw.ru-central1.internal.crt /cert/

    As a result, all certificates are stored in one place:

        Directory: C:\cert
    
    
    Mode                 LastWriteTime         Length Name
    ----                 -------------         ------ ----
    -a----        12/25/2023  10:58 AM           1275 bds-ads1.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-ads2.ru-central1.internal.crt
    -a----        12/25/2023  10:59 AM           1275 bds-ads3.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1271 bds-mdw.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-sdw1.ru-central1.internal.crt
    -a----        12/25/2023  10:58 AM           1275 bds-sdw2.ru-central1.internal.crt
  5. Copy all certificates from the local machine to the /tmp directory of each ADB and ADS host. To do this, run the following command for each host:

    $ pscp /<local_folder>/*.crt <user>@<hostname>:/tmp/

    For example, for ADB host with FQDN bds-mdw.ru-central1.internal the command can look like this:

    $ pscp /cert/*.crt dasha@bds-mdw:/tmp/
  6. Import all certificates into truststore.jks and ca-bundle.crt on each ADB/ADS host as follows:

    $ keytool -importcert -noprompt -alias <hostname> -file /tmp/<hostname>.crt -keystore /tmp/truststore.jks -storepass bigdata
    $ sudo bash -c "cat /tmp/<hostname>.crt >> /etc/pki/tls/certs/ca-bundle.crt"

    where <hostname> — FQDN of each ADB/ADS host. In the current example, two commands listed above should be run 6 times on each of 6 ADB/ADS hosts (for each certificate).

  7. Import trustore.jks to Java CA store by running the following command on each ADB/ADS host:

    $ sudo keytool -importkeystore -noprompt -srckeystore /tmp/truststore.jks -destkeystore /etc/pki/java/cacerts -deststorepass changeit -srcstorepass bigdata

    The result:

    Importing keystore /tmp/truststore.jks to /etc/pki/java/cacerts...
    Entry for alias bds-sdw1.ru-central1.internal successfully imported.
    Entry for alias bds-mdw.ru-central1.internal successfully imported.
    Entry for alias bds-ads1.ru-central1.internal successfully imported.
    Entry for alias bds-sdw2.ru-central1.internal successfully imported.
    Entry for alias bds-ads2.ru-central1.internal successfully imported.
    Entry for alias bds-ads3.ru-central1.internal successfully imported.
    Import command completed:  6 entries successfully imported, 0 entries failed or cancelled
  8. Create and import OpenSSL certificates by running the following commands on each ADB/ADS host:

    $ sudo openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj "/C=RU/ST=Denial/L=MSK/O=AD/CN=<hostname>" -keyout /etc/ssl/host_cert.key  -out /etc/ssl/certs/host_cert.cert
    $ sudo bash -c "cat /etc/ssl/certs/host_cert.cert >> /etc/pki/tls/certs/ca-bundle.crt"

    where <hostname> — FQDN of the host where commands run.

    The first command result:

    Generating a 4096 bit RSA private key
    .....................................................++
    ...............................................................++
    writing new private key to '/etc/ssl/host_cert.key'
  9. Enable SSL in ADS according to the Start SSL on an ADS cluster using ADCM section.

  10. Ensure that Kafka authentication is configured correctly by following steps from the Check the installed SSL section.

  11. In ADS, create a new file /tmp/client_ssl.properties:

    $ sudo vi /tmp/client_ssl.properties

    Add the file contents:

    security.protocol=SASL_SSL
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
  12. Connect to the adb database on ADB master under the gpadmin user (for example, via psql):

    $ sudo su - gpadmin
    $ psql adb
  13. Create an external table with the following structure in ADB:

    CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_ssl (a INT, b TEXT)
    LOCATION (
        'pxf://topic_ssl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka&TRUSTSTORE_LOCATION=/tmp/truststore.jks&TRUSTSTORE_PASSWORD=bigdata'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

    Unlike the previous examples, when using SASL_SSL, you need to specify FQDN of ADS hosts in the BOOTSTRAP_SERVERS option. Otherwise, you will get the following error when adding data to the table:

    ERROR:  PXF server error : Cannot acquire list of topics for server: bds-ads1:9092,bds-ads2:9092,bds-ads3:9092!  (seg1 10.92.40.81:10001 pid=10510)
    HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw1' or 'set client_min_messages=LOG' for additional details.

    You should also specify FQDN when reading messages on the Kafka side (see step 15 below).

  14. Insert some data into the ext_adb_to_kafka_ssl table:

    INSERT INTO ext_adb_to_kafka_ssl VALUES
    (1,'ssl-test-data');
  15. In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run the following commands. When running the kinit command, specify a password for the adb-to-kafka principal (123 in the current example):

    $ kinit adb-to-kafka@ADS-KAFKA.LOCAL
    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas"
    $ bin/kafka-console-consumer.sh --topic topic_ssl --from-beginning --bootstrap-server bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client_ssl.properties

    The command output contains one message:

    Offset:0        Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?\x02"\x02\x02\x02\x1Assl-test-data 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?
    ^C[2023-12-26 08:30:14,242] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
    Processed a total of 1 messages
NOTE

In the --consumer.config option, specify a path to the client_ssl.properties file that is described in step 11 above.

Authentication errors

 

If connection parameters are not specified correctly in the external table definition, the following error occurs when you try to add data to that table:

ERROR:  PXF server error : Failed to create new KafkaAdminClient  (seg0 10.92.40.81:10000 pid=10513)
HINT:  Check the PXF logs located in the '/var/lib/pxf/logs' directory on host 'bds-sdw1' or 'set client_min_messages=LOG' for additional details.

Details can be found in PXF logs. For example, the following message indicates an invalid value of the TRUSTSTORE_PASSWORD parameter:

Caused by: java.security.UnrecoverableKeyException: Password verification failed
Found a mistake? Seleсt text and press Ctrl+Enter to report it