Kafka to ADB Connector usage examples
This article describes how Kafka to ADB Connector can be used to read AVRO OCF messages written to Kafka topics (as a part of ADS) by ADB to Kafka Connector. The use of both connectors simultaneously is optional. The purpose of examples is only to show the ability to read AVRO OCF messages (this is the format in which messages are serialized by ADB to Kafka Connector). The following prerequisites are met:
-
The ADS cluster is installed according to the Online installation guide on the following hosts: bds-ads1, bds-ads2, bds-ads3.
-
The ADB cluster is installed according to the Online installation guide.
-
The
adb
database exists in ADB. -
All ADB nodes have access to the
9092
port on all ADS nodes through which you plan to load data. -
In ADB, the ADB to Kafka Connector is installed.
PLAINTEXT security protocol (by default)
-
Perform steps 1-5 from the Without SERVER option section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafka
topic should contain 5 messages. -
Connect to the
adb
database of the ADB cluster under thegpadmin
user (for example, via psql). Create the following objects in the given order.-
The
kafka_server
server:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
The
kafka_table
foreign table (note that the table structure should match the AVRO schema):DROP FOREIGN TABLE IF EXISTS kafka_table; CREATE FOREIGN TABLE kafka_table(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000' );
-
-
In ADB, insert some data into the ext_adb_to_kafka table to send it to the
topic_adb_to_kafka
topic in ADS:INSERT INTO ext_adb_to_kafka VALUES (6,'test6'), (7,'test7'), (8,'test8'), (9,'test9'), (10,'test10');
-
Select data from the
kafka_table
foreign table:SELECT * FROM kafka_table;
The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offset
andk_automatic_offsets
options in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=16387) NOTICE: Kafka-ADB: Offset for partition 0 (0) is increased to 5 to match the smallest offset of an existing message in Kafka (seg0 slice1 10.92.40.81:10000 pid=16387) a | b ---+------- 8 | test8 9 | test9 10 | test10 6 | test6 7 | test7 (5 rows)
-
Check that the kadb.offsets table contains a new row:
SELECT * FROM kadb.offsets;
The result:
ftoid | prt | off -------+-----+----- 45426 | 0 | 10 (1 row)
where
45426
is OID of thekafka_table
foreign table, which can be defined as follows:SELECT 'public.kafka_table'::regclass::oid;
oid ------- 45426 (1 row)
SASL_PLAINTEXT security protocol
PLAIN mechanism
-
Perform all steps from the PLAIN mechanism section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafka_sasl
topic should contain 1 message. -
Connect to the
adb
database of the ADB cluster under thegpadmin
user (for example, via psql). Create the following objects in the given order.-
The
kafka_server
server:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
The
kafka_table_sasl
foreign table (note that compared to the previous example, the table definition contains librdkafka options specific for theSASL_PLAINTEXT
protocol with thePLAIN
mechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_sasl; CREATE FOREIGN TABLE kafka_table_sasl(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_sasl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'PLAIN', "#sasl.username" 'adb-to-kafka', "#sasl.password" '123' );
-
-
Select data from the
kafka_table_sasl
foreign table:SELECT * FROM kafka_table_sasl;
The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offset
andk_automatic_offsets
options in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=8036) a | b ---+------- 1 | test1 (1 row)
GSSAPI mechanism (Kerberos)
-
Perform all steps from the GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafka_kerberos
topic should contain 1 message. -
Connect to the
adb
database of the ADB cluster under thegpadmin
user (for example, via psql). Create the following objects in the given order.-
The
kafka_server
server:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
The
kafka_table_kerberos
foreign table (note that compared to the previous examples, the table definition contains librdkafka options specific for theSASL_PLAINTEXT
protocol with theGSSAPI
mechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_kerberos; CREATE FOREIGN TABLE kafka_table_kerberos(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_kerberos', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka' );
-
-
Select data from the
kafka_table_kerberos
foreign table:SELECT * FROM kafka_table_kerberos;
The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offset
andk_automatic_offsets
options in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=24779) a | b ---+---------- 1 | kerberos (1 row)
SASL_SSL security protocol with GSSAPI mechanism (Kerberos)
-
Perform all steps from the SASL_SSL security protocol with GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_ssl
topic should contain 1 message. -
Connect to the
adb
database of the ADB cluster under thegpadmin
user (for example, via psql). Create the following objects in the given order.-
The
kafka_server_ssl
server (note that compared to the previous examples, the server definition should contain FQDN of ADS hosts):DROP SERVER IF EXISTS kafka_server_ssl CASCADE; CREATE SERVER kafka_server_ssl FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092' );
-
The
kafka_table_ssl
foreign table (note that compared to the previous examples, the table definition contains librdkafka options specific for theSASL_SSL
protocol with theGSSAPI
mechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_ssl; CREATE FOREIGN TABLE kafka_table_ssl(a INT, b TEXT) SERVER kafka_server_ssl OPTIONS ( format 'avro', k_topic 'topic_ssl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_ssl', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka', "#ssl.ca.location" '/etc/pki/tls/certs/ca-bundle.crt', "#ssl.certificate.location" '/etc/ssl/certs/host_cert.cert', "#ssl.key.location" '/etc/ssl/host_cert.key', "#ssl.key.password" 'bigdata' );
-
-
Select data from the
kafka_table_ssl
foreign table:SELECT * FROM kafka_table_ssl;
The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offset
andk_automatic_offsets
options in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=9031) a | b ---+--------------- 1 | ssl-test-data (1 row)