Kafka to ADB Connector usage examples
This article describes how Kafka to ADB Connector can be used to read AVRO OCF messages written to Kafka topics (as a part of ADS) by ADB to Kafka Connector. The use of both connectors simultaneously is optional. The purpose of examples is only to show the ability to read AVRO OCF messages (this is the format in which messages are serialized by ADB to Kafka Connector). The following prerequisites are met:
-
The ADS cluster is installed according to the Online installation guide on the following hosts:
bds-ads1,bds-ads2,bds-ads3. -
The ADB cluster is installed according to the Online installation guide.
-
The
adbdatabase exists in ADB. -
All ADB nodes have access to the
9092port on all ADS nodes through which you plan to load data. -
In ADB, the ADB to Kafka Connector is installed.
PLAINTEXT security protocol (by default)
-
Perform steps 1—5 from the Without SERVER option section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafkatopic should contain 5 messages. -
Connect to the
adbdatabase of the ADB cluster under thegpadminuser (for example, viapsql). Create the following objects in the given order.-
The
kafka_serverserver:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ); -
The
kafka_tableforeign table (note that the table structure should match the AVRO schema):DROP FOREIGN TABLE IF EXISTS kafka_table; CREATE FOREIGN TABLE kafka_table(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000' );
-
-
In ADB, insert some data into the ext_adb_to_kafka table to send it to the
topic_adb_to_kafkatopic in ADS:INSERT INTO ext_adb_to_kafka VALUES (6,'test6'), (7,'test7'), (8,'test8'), (9,'test9'), (10,'test10'); -
Select data from the
kafka_tableforeign table:SELECT * FROM kafka_table;The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offsetandk_automatic_offsetsoptions in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=16387) NOTICE: Kafka-ADB: Offset for partition 0 (0) is increased to 5 to match the smallest offset of an existing message in Kafka (seg0 slice1 10.92.40.81:10000 pid=16387) a | b ---+------- 8 | test8 9 | test9 10 | test10 6 | test6 7 | test7 (5 rows)
-
Check that the kadb.offsets table contains a new row:
SELECT * FROM kadb.offsets;Result:
ftoid | prt | off -------+-----+----- 45426 | 0 | 10 (1 row)
where
45426is OID of thekafka_tableforeign table, which can be defined as follows:SELECT 'public.kafka_table'::regclass::oid;oid ------- 45426 (1 row)
SASL_PLAINTEXT security protocol
PLAIN mechanism
-
Perform all steps from the PLAIN mechanism section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafka_sasltopic should contain 1 message. -
Connect to the
adbdatabase of the ADB cluster under thegpadminuser (for example, viapsql). Create the following objects in the given order.-
The
kafka_serverserver:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ); -
The
kafka_table_saslforeign table (note that compared to the previous example, the table definition contains librdkafka options specific for theSASL_PLAINTEXTprotocol with thePLAINmechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_sasl; CREATE FOREIGN TABLE kafka_table_sasl(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_sasl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'PLAIN', "#sasl.username" 'adb-to-kafka', "#sasl.password" '123' );
-
-
Select data from the
kafka_table_saslforeign table:SELECT * FROM kafka_table_sasl;The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offsetandk_automatic_offsetsoptions in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=8036) a | b ---+------- 1 | test1 (1 row)
GSSAPI mechanism (Kerberos)
-
Perform all steps from the GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_adb_to_kafka_kerberostopic should contain 1 message. -
Connect to the
adbdatabase of the ADB cluster under thegpadminuser (for example, viapsql). Create the following objects in the given order.-
The
kafka_serverserver:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' ); -
The
kafka_table_kerberosforeign table (note that compared to the previous examples, the table definition contains librdkafka options specific for theSASL_PLAINTEXTprotocol with theGSSAPImechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_kerberos; CREATE FOREIGN TABLE kafka_table_kerberos(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_kerberos', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka' );
-
-
Select data from the
kafka_table_kerberosforeign table:SELECT * FROM kafka_table_kerberos;The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offsetandk_automatic_offsetsoptions in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=24779) a | b ---+---------- 1 | kerberos (1 row)
SASL_SSL security protocol with GSSAPI mechanism (Kerberos)
-
Perform all steps from the SASL_SSL security protocol with GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the
topic_ssltopic should contain 1 message. -
Connect to the
adbdatabase of the ADB cluster under thegpadminuser (for example, viapsql). Create the following objects in the given order.-
The
kafka_server_sslserver (note that compared to the previous examples, the server definition should contain FQDN of ADS hosts):DROP SERVER IF EXISTS kafka_server_ssl CASCADE; CREATE SERVER kafka_server_ssl FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092' ); -
The
kafka_table_sslforeign table (note that compared to the previous examples, the table definition contains librdkafka options specific for theSASL_SSLprotocol with theGSSAPImechanism):DROP FOREIGN TABLE IF EXISTS kafka_table_ssl; CREATE FOREIGN TABLE kafka_table_ssl(a INT, b TEXT) SERVER kafka_server_ssl OPTIONS ( format 'avro', k_topic 'topic_ssl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_ssl', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka', "#ssl.ca.location" '/etc/pki/tls/certs/ca-bundle.crt', "#ssl.certificate.location" '/etc/ssl/certs/host_cert.cert', "#ssl.key.location" '/etc/ssl/host_cert.key', "#ssl.key.password" 'bigdata' );
-
-
Select data from the
kafka_table_sslforeign table:SELECT * FROM kafka_table_ssl;The result contains some notifications that are described in Kafka to ADB Connector configuration (see the
k_initial_offsetandk_automatic_offsetsoptions in the Server options table) and a list of messages read from Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=9031) a | b ---+--------------- 1 | ssl-test-data (1 row)