ADB to Kafka Connector usage examples
This article describes how to transfer data from ADB to Kafka (as a part of ADS) via ADB to Kafka Connector. The following prerequisites are met:
-
The ADS cluster is installed according to the Online installation guide on the following hosts: bds-ads1, bds-ads2, bds-ads3.
-
The ADB cluster is installed according to the Online installation guide on the following hosts: bds-mdw, bds-sdw1, bds-sdw2.
-
In ADB, the PXF and ADB to Kafka services are added and installed.
-
The
adb
database exists in ADB. -
All ADB nodes have access to the
9092
port on all ADS nodes through which you plan to load data.
NOTE
Since ADB to Kafka Connector uses the AVRO OCF format for data serialization, you cannot deserialize data using kafka-avro-console-consumer. In all examples mentioned above, the kafka-console-consumer.sh script is used, which returns the schema of data obtained from ADB and the data itself in the HEX representation. Deserialization can be done with the Kafka to ADB Connector (see Kafka to ADB Connector usage examples). |
PLAINTEXT security protocol (by default)
Without SERVER option
-
Connect to any ADS host where the Kafka service is installed. Go to the root directory and create a test topic via the following command:
$ bin/kafka-topics.sh --create --topic topic_adb_to_kafka --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092
The result:
Created topic topic_adb_to_kafka.
TIP-
For more information on adding and reading Kafka topics, see Quick start with Kafka in the ADS documentation.
-
Topic creation is optional. If you specify a non-existent topic in step 3, it will be created automatically unless the TOPIC_AUTO_CREATE_FLAG value is changed to
false
.
-
-
Connect to the
adb
database on ADB master under thegpadmin
user (for example, via psql):$ sudo su - gpadmin
$ psql adb
-
Create an external table with the following structure:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Insert some data into the external table:
INSERT INTO ext_adb_to_kafka VALUES (1,'test1'), (2,'test2'), (3,'test3'), (4,'test4'), (5,'test5');
-
In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run bin/kafka-console-consumer.sh:
$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true
The command output contains information about 5 messages (with offset number):
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 ^CProcessed a total of 5 messages
With SERVER option
-
Perform steps 1-6 that are described in the Use the SERVER option section of the ADB to Kafka Connector configuration article. As
<server_name>
, usekafka_test
. -
In ADB, create an external table
ext_adb_to_kafka2
. Unlike the table from the previous example, specify theSERVER
option:CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka2 (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka?PROFILE=kafka&SERVER=kafka_test' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Insert some data into the
ext_adb_to_kafka2
table:INSERT INTO ext_adb_to_kafka2 VALUES (1,'test1');
-
In ADS, check that data from ADB has been successfully added to the
topic_adb_to_kafka
topic:$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true
The command output contains 6 messages:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E\x02\x12\x02\x02\x02\x0Atest1\xA3\xFB\xC7\xF5n\x81\x8E\x96\x16\xEA\xF3\xE4s\x06P\x8E Offset:1 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4\x02\x12\x02\x04\x02\x0Atest2\x94-;\xFE\x15C}\x93\xEC\xACwjs[J\xF4 Offset:2 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr\x02\x12\x02\x0A\x02\x0Atest5H\xD4N\xFAU_\x05\x01\xBAXv\xC7\xE8\xED\xFCr Offset:3 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13\x02\x12\x02\x06\x02\x0Atest3\xDD\xF6\x82r&G\xD4l\x1A\x00\x81\xE3^\x06\xE2\x13 Offset:4 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809\x02\x12\x02\x08\x02\x0Atest4\xCD\xCCY\xC7\xA9#\xED\xB9K\xE8\xB8\x97i)\x809 Offset:5 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs\x02\x12\x02\x02\x02\x0Atest1%\xA7\xB4\x18 \xE3\x8C\xCDx\xB4\xEE\xE4'Cjs ^CProcessed a total of 6 messages
SASL_PLAINTEXT security protocol
PLAIN mechanism
-
Connect to any ADS host where the Kafka service is installed. Configure SASL_PLAINTEXT authentication for ADS cluster according to the SASL PLAINTEXT article.
-
Ensure that Kafka authentication is configured correctly by following steps from the Check the installed SASL PLAINTEXT section.
-
Create a new user
adb-to-kafka
with password123
in ADS by following steps from the Create a user section. -
According to the User authentication in the Kafka environment section, update the /etc/kafka/conf/client.properties file contents:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN # Uncomment and set necessary username/password sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="adb-to-kafka" \ password="123";
-
Connect to the
adb
database on ADB master under thegpadmin
user (for example, via psql):$ sudo su - gpadmin
$ psql adb
-
Create an external table with the following structure in ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_sasl (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_sasl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=PLAIN&SASL_USER=adb-to-kafka&SASL_USER_PASSWORD=123' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Insert some data into the
ext_adb_to_kafka_sasl
external table:INSERT INTO ext_adb_to_kafka_sasl VALUES (1,'test1');
-
In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run bin/kafka-console-consumer.sh.
NOTEUnlike the examples mentioned above, when using the SASL_PLAINTEXT protocol, you should fill in the
--consumer.config
option. Use it to specify a path to the client.properties file described in step 4 above.$ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_sasl --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /etc/kafka/conf/client.properties
The command output contains one message:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81\x02\x12\x02\x02\x02\x0Atest1c\xA9\x9A\xC6ZoI\xA2br\xCD\xFBi=@\x81 ^CProcessed a total of 1 messages
GSSAPI mechanism (Kerberos)
CAUTION
If you configured SASL_PLAINTEXT authentication for your ADS cluster according to the SASL PLAINTEXT article (e.g. when running the previous example), you should disable it before ADS kerberization. To do this, deactivate the SASL_PLAINTEXT Authentication switcher on the ADS cluster configuration page and apply changes by clicking Save. |
-
Configure MIT Kerberos KDC on a separate host (bds-mit-kerberos.ru-central1.internal in the current example) and kerberize your ADS cluster according to the MIT Kerberos article. As a realm name, use
ADS-KAFKA.LOCAL
. -
Ensure that Kafka authentication is configured correctly by following steps from the Check installed Kerberos SASL section.
-
On the host where MIT Kerberos KDC is deployed, create a new principal
adb-to-kafka
, which will be used by ADB to Kafka Connector. Specify123
as a password:$ sudo kadmin.local -q "add_principal -pw 123 adb-to-kafka@ADS-KAFKA.LOCAL"
The result:
Principal "adb-to-kafka@ADS-KAFKA.LOCAL" created.
-
According to the Create a configuration file .properties for the user section, create the /tmp/client.properties file in ADS.
-
Follow the steps from the Create a JAAS file for a user section.
-
In ADB, install the following packages on master and segment hosts:
$ sudo yum install -y krb5-libs krb5-server krb5-workstation openldap-clients cyrus-sasl-gssapi
-
On ADB master and each segment host, edit the /etc/krb5.conf file contents in the same way as you configured it for MIT Kerberos KDC:
$ sudo vi /etc/krb5.conf
The file contents:
[logging] default = FILE:/var/log/krb5libs.log kdc = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log [libdefaults] default_realm = ADS-KAFKA.LOCAL kdc_timesync = 1 ticket_lifetime = 24h [realms] ADS-KAFKA.LOCAL = { admin_server = bds-mit-kerberos.ru-central1.internal kdc = bds-mit-kerberos.ru-central1.internal }
-
On ADB master and each segment host, edit the /var/kerberos/krb5kdc/kdc.conf file contents in the same way as you configured it for MIT Kerberos KDC:
$ sudo vi /var/kerberos/krb5kdc/kdc.conf
The file contents:
[kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 default_realm = ADS-KAFKA.LOCAL [realms] ADS-KAFKA.LOCAL = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal }
-
On ADB master, create a new directory to store keytab files. Assign an owner and necessary permissions to that directory:
$ sudo mkdir /var/lib/pxf/conf/kerberos $ sudo chmod 744 /var/lib/pxf/conf/kerberos $ sudo chown gpadmin:gpadmin /var/lib/pxf/conf/kerberos
-
Connect to ADB master under the
gpadmin
user:$ sudo su - gpadmin
-
On ADB master, run the interactive utility
ktutil
:$ ktutil
-
In the console that opens, run the following commands one by one (after the
ktutil:
prompt). After running each command, you should specify a password for theadb-to-kafka
principal (123
in the current example):add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes128-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e aes256-cts-hmac-sha1-96 add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e arcfour-hmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia256-cts-cmac add_entry -password -p adb-to-kafka@ADS-KAFKA.LOCAL -k 1 -e camellia128-cts-cmac
-
As the last command, send the following request to store a keytab file with the entered data. Then, type
q
to quitktutil
:write_kt /var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab
-
To copy adb-to-kafka.service.keytab to all ADB segment hosts, run the following commands on master in the given order:
-
PXF synchronization:
$ pxf cluster sync
-
PXF restart:
$ pxf cluster restart
-
-
Check the
adb-to-kafka
principal by sequentially running the following commands. When running thekinit
command, you need to specify the principal password (123
in the current example):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL
$ klist
The result indicates that a valid ticket is present:
Ticket cache: FILE:/tmp/krb5cc_2042 Default principal: adb-to-kafka@ADS-KAFKA.LOCAL Valid starting Expires Service principal 12/14/2023 18:13:16 12/15/2023 18:13:16 krbtgt/ADS-KAFKA.LOCAL@ADS-KAFKA.LOCAL
-
Connect to the
adb
database on ADB master (for example, via psql):$ psql adb
-
Create an external table with the following structure in ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_kerberos (a INT, b TEXT) LOCATION ( 'pxf://topic_adb_to_kafka_kerberos?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_PLAINTEXT&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Insert some data into the
ext_adb_to_kafka_kerberos
table:INSERT INTO ext_adb_to_kafka_kerberos VALUES (1,'kerberos');
-
In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run the following commands. When running the
kinit
command, specify a password for theadb-to-kafka
principal (123
in the current example):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_adb_to_kafka_kerberos --from-beginning --bootstrap-server bds-ads1:9092,bds-ads2:9092,bds-ads3:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client.properties
The command output contains one message:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3\x02\x18\x02\x02\x02\x10kerberos\xA6qBp\xF5;Q8\x035<\xFDB\xBF\xC6\xB3 ^C[2023-12-14 18:18:35,250] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
NOTE
In the |
SASL_SSL security protocol with GSSAPI mechanism (Kerberos)
NOTE
Actions with certificates that are described in steps 2-8 can be automated via a bash script (see generate.sh in SSL channel security). However, the script is not recommended for a production environment. |
-
Perform steps 1-15 that are described in the GSSAPI mechanism (Kerberos) section (if you have not passed them yet). Ensure that a valid ticket exists for the
adb-to-kafka
principal on the ADB side. -
Generate an RSA key pair and create keystore.jks on each ADB and ADS host:
$ keytool -genkeypair -noprompt -keyalg RSA -ext san=dns:<hostname> -alias <hostname> -dname "CN=<hostname>, OU=AD, O=AD, L=MSK, S=MO, C=RU" -keystore /tmp/keystore.jks -storepass bigdata -keypass bigdata -validity 360 -keysize 2048
where
<hostname>
— FQDN of the host where the command runs. -
To export SSL certificates, run the following command on each ADB and ADS host:
$ keytool -exportcert -file /tmp/<hostname>.crt -keystore /tmp/keystore.jks -storepass bigdata -alias <hostname> -rfc
where
<hostname>
— FQDN of the host where the command runs.The result is below (for the host with FQDN
bds-mdw.ru-central1.internal
):Certificate stored in file </tmp/bds-mdw.ru-central1.internal.crt>
-
Copy certificates from each ADB and ADS host to the local machine. The following is an example for Windows:
$ pscp <user>@<hostname>:/tmp/<hostname>.crt /<local_folder>/
where:
-
<user>
— a username that is used to connect to the ADB/ADS host via SSH; -
<hostname>
— an FQDN of the ADB/ADS host; -
<local_folder>
— a directory that is created on the local machine to store all certificates.
For example, for FQDN
bds-mdw.ru-central1.internal
the command can look like this:$ pscp dasha@bds-mdw:/tmp/bds-mdw.ru-central1.internal.crt /cert/
As a result, all certificates are stored in one place:
Directory: C:\cert Mode LastWriteTime Length Name ---- ------------- ------ ---- -a---- 12/25/2023 10:58 AM 1275 bds-ads1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-ads2.ru-central1.internal.crt -a---- 12/25/2023 10:59 AM 1275 bds-ads3.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1271 bds-mdw.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw1.ru-central1.internal.crt -a---- 12/25/2023 10:58 AM 1275 bds-sdw2.ru-central1.internal.crt
-
-
Copy all certificates from the local machine to the /tmp directory of each ADB and ADS host. To do this, run the following command for each host:
$ pscp /<local_folder>/*.crt <user>@<hostname>:/tmp/
For example, for ADB host with FQDN
bds-mdw.ru-central1.internal
the command can look like this:$ pscp /cert/*.crt dasha@bds-mdw:/tmp/
-
Import all certificates into truststore.jks and ca-bundle.crt on each ADB/ADS host as follows:
$ keytool -importcert -noprompt -alias <hostname> -file /tmp/<hostname>.crt -keystore /tmp/truststore.jks -storepass bigdata $ sudo bash -c "cat /tmp/<hostname>.crt >> /etc/pki/tls/certs/ca-bundle.crt"
where
<hostname>
— FQDN of each ADB/ADS host. In the current example, two commands listed above should be run 6 times on each of 6 ADB/ADS hosts (for each certificate). -
Import trustore.jks to Java CA store by running the following command on each ADB/ADS host:
$ sudo keytool -importkeystore -noprompt -srckeystore /tmp/truststore.jks -destkeystore /etc/pki/java/cacerts -deststorepass changeit -srcstorepass bigdata
The result:
Importing keystore /tmp/truststore.jks to /etc/pki/java/cacerts... Entry for alias bds-sdw1.ru-central1.internal successfully imported. Entry for alias bds-mdw.ru-central1.internal successfully imported. Entry for alias bds-ads1.ru-central1.internal successfully imported. Entry for alias bds-sdw2.ru-central1.internal successfully imported. Entry for alias bds-ads2.ru-central1.internal successfully imported. Entry for alias bds-ads3.ru-central1.internal successfully imported. Import command completed: 6 entries successfully imported, 0 entries failed or cancelled
-
Create and import OpenSSL certificates by running the following commands on each ADB/ADS host:
$ sudo openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -subj "/C=RU/ST=Denial/L=MSK/O=AD/CN=<hostname>" -keyout /etc/ssl/host_cert.key -out /etc/ssl/certs/host_cert.cert $ sudo bash -c "cat /etc/ssl/certs/host_cert.cert >> /etc/pki/tls/certs/ca-bundle.crt"
where
<hostname>
— FQDN of the host where commands run.The first command result:
Generating a 4096 bit RSA private key .....................................................++ ...............................................................++ writing new private key to '/etc/ssl/host_cert.key'
-
Enable SSL in ADS according to the Start SSL on an ADS cluster using ADCM section.
-
Ensure that Kafka authentication is configured correctly by following steps from the Check the installed SSL section.
-
In ADS, create a new file /tmp/client_ssl.properties:
$ sudo vi /tmp/client_ssl.properties
Add the file contents:
security.protocol=SASL_SSL sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
-
Connect to the
adb
database on ADB master under thegpadmin
user (for example, via psql):$ sudo su - gpadmin
$ psql adb
-
Create an external table with the following structure in ADB:
CREATE WRITABLE EXTERNAL TABLE ext_adb_to_kafka_ssl (a INT, b TEXT) LOCATION ( 'pxf://topic_ssl?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=GSSAPI&KERBEROS_KEYTAB=/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab&KERBEROS_PRINCIPAL=adb-to-kafka&KERBEROS_SERVICE_NAME=kafka&TRUSTSTORE_LOCATION=/tmp/truststore.jks&TRUSTSTORE_PASSWORD=bigdata' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Insert some data into the
ext_adb_to_kafka_ssl
table:INSERT INTO ext_adb_to_kafka_ssl VALUES (1,'ssl-test-data');
-
In ADS, check that data from ADB has been successfully added. To do this, on the host where Kafka is installed, go to the root directory and run the following commands. When running the
kinit
command, specify a password for theadb-to-kafka
principal (123
in the current example):$ kinit adb-to-kafka@ADS-KAFKA.LOCAL $ export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/client.jaas" $ bin/kafka-console-consumer.sh --topic topic_ssl --from-beginning --bootstrap-server bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092 -value-deserializer org.apache.kafka.common.serialization.BytesDeserializer --property print.key=false --property print.value=true --property print.offset=true --consumer.config /tmp/client_ssl.properties
The command output contains one message:
Offset:0 Obj\x01\x02\x16avro.schema\x88\x02{"type":"record","name":"row","fields":[{"name":"a","type":["null","int"],"doc":""},{"name":"b","type":["null","string"],"doc":""}]}\x00 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>?\x02"\x02\x02\x02\x1Assl-test-data 1I\xCDJ6|U\xBEw\x14\xD5\xA8\x5C>? ^C[2023-12-26 08:30:14,242] WARN [Principal=null]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin) Processed a total of 1 messages
NOTE
In the |