Примеры использования Kafka to ADB Connector
В статье показаны примеры чтения собщений коннектором Kafka to ADB, записанных в формате AVRO OCF в топик Kafka (в составе ADS) при помощи коннектора ADB to Kafka. Использование обоих коннекторов одновременно необязательно, цель примеров — показать возможность чтения сообщений в формате AVRO OCF (именно в этом формате сообщения сериализуются коннектором ADB to Kafka). При выполнении примеров соблюдены следующие условия:
-
Кластер ADS установлен согласно руководству Online-установка на следующих хостах: bds-ads1, bds-ads2, bds-ads3.
-
Кластер ADB установлен согласно руководству Online-установка.
-
В кластере ADB добавлен и установлен сервис Kafka to ADB.
-
База данных с именем
adb
существует в ADB. -
Все ноды кластера ADB имеют доступ к порту
9092
на всех нодах кластера ADS, через которые планируется производить загрузку данных. -
На стороне ADB установлен ADB to Kafka Connector.
Протокол PLAINTEXT (по умолчанию)
-
Выполните шаги 1-5 из подраздела Без использования опции SERVER статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик
topic_adb_to_kafka
будет содержать 5 сообщений. -
Подключитесь к базе данных
adb
кластера ADB под пользователемgpadmin
(например, через psql). Создайте последовательно следующие объекты.-
Сервер
kafka_server
:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
Foreign-таблица
kafka_table
(обратите внимание, что структура таблицы должна соответствовать используемой AVRO-схеме):DROP FOREIGN TABLE IF EXISTS kafka_table; CREATE FOREIGN TABLE kafka_table(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000' );
-
-
Добавьте данные в таблицу ext_adb_to_kafka для их отправки в топик ADS
topic_adb_to_kafka
:INSERT INTO ext_adb_to_kafka VALUES (6,'test6'), (7,'test7'), (8,'test8'), (9,'test9'), (10,'test10');
-
Прочитайте данные из foreign-таблицы
kafka_table
:SELECT * FROM kafka_table;
Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции
k_initial_offset
иk_automatic_offsets
в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=16387) NOTICE: Kafka-ADB: Offset for partition 0 (0) is increased to 5 to match the smallest offset of an existing message in Kafka (seg0 slice1 10.92.40.81:10000 pid=16387) a | b ---+------- 8 | test8 9 | test9 10 | test10 6 | test6 7 | test7 (5 rows)
-
Убедитесь в появлении записей в таблице kadb.offsets:
SELECT * FROM kadb.offsets;
Результат:
ftoid | prt | off -------+-----+----- 45426 | 0 | 10 (1 row)
где
45426
— это OID foreign-таблицыkafka_table
, который может быть получен следующим образом:SELECT 'public.kafka_table'::regclass::oid;
oid ------- 45426 (1 row)
Протокол SASL_PLAINTEXT
С использованием механизма PLAIN
-
Выполните все шаги из подраздела С использованием механизма PLAIN статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик
topic_adb_to_kafka_sasl
будет содержать 1 сообщение. -
Подключитесь к базе данных
adb
кластера ADB под пользователемgpadmin
(например, через psql). Создайте последовательно следующие объекты.-
Сервер
kafka_server
:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
Foreign-таблица
kafka_table_sasl
(обратите внимание, что по сравнению с предыдущим примером в определении таблицы присутствуют опции librdkafka, специфичные для протоколаSASL_PLAINTEXT
и механизмаPLAIN
):DROP FOREIGN TABLE IF EXISTS kafka_table_sasl; CREATE FOREIGN TABLE kafka_table_sasl(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_sasl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'PLAIN', "#sasl.username" 'adb-to-kafka', "#sasl.password" '123' );
-
-
Прочитайте данные из foreign-таблицы
kafka_table_sasl
:SELECT * FROM kafka_table_sasl;
Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции
k_initial_offset
иk_automatic_offsets
в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=8036) a | b ---+------- 1 | test1 (1 row)
С использованием механизма GSSAPI (Kerberos)
-
Выполните все шаги из подраздела С использованием механизма GSSAPI (Kerberos) статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик
topic_adb_to_kafka_kerberos
будет содержать 1 сообщение. -
Подключитесь к базе данных
adb
кластера ADB под пользователемgpadmin
(например, через psql). Создайте последовательно следующие объекты.-
Сервер
kafka_server
:DROP SERVER IF EXISTS kafka_server CASCADE; CREATE SERVER kafka_server FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092' );
-
Foreign-таблица
kafka_table_kerberos
(обратите внимание, что по сравнению с предыдущими примерами в определении таблицы присутствуют опции librdkafka, специфичные для протоколаSASL_PLAINTEXT
и механизмаGSSAPI
):DROP FOREIGN TABLE IF EXISTS kafka_table_kerberos; CREATE FOREIGN TABLE kafka_table_kerberos(a INT, b TEXT) SERVER kafka_server OPTIONS ( format 'avro', k_topic 'topic_adb_to_kafka_kerberos', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_plaintext', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka' );
-
-
Прочитайте данные из foreign-таблицы
kafka_table_kerberos
:SELECT * FROM kafka_table_kerberos;
Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции
k_initial_offset
иk_automatic_offsets
в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=24779) a | b ---+---------- 1 | kerberos (1 row)
Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos)
-
Выполните все шаги из подраздела Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos) статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик
topic_ssl
будет содержать 1 сообщение. -
Подключитесь к базе данных
adb
кластера ADB под пользователемgpadmin
(например, через psql). Создайте последовательно следующие объекты.-
Сервер
kafka_server_ssl
(обратите внимание, что по сравнению с предыдущими примерами в определении сервера необходимо обязательно указать FQDN хостов ADS):DROP SERVER IF EXISTS kafka_server_ssl CASCADE; CREATE SERVER kafka_server_ssl FOREIGN DATA WRAPPER kadb_fdw OPTIONS ( k_brokers 'bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092' );
-
Foreign-таблица
kafka_table_ssl
(обратите внимание, что по сравнению с предыдущими примерами в определении таблицы присутствуют опции librdkafka, специфичные для протоколаSASL_SSL
и механизмаGSSAPI
):DROP FOREIGN TABLE IF EXISTS kafka_table_ssl; CREATE FOREIGN TABLE kafka_table_ssl(a INT, b TEXT) SERVER kafka_server_ssl OPTIONS ( format 'avro', k_topic 'topic_ssl', k_consumer_group 'group', k_seg_batch '100', k_timeout_ms '1000', "#security.protocol" 'sasl_ssl', "#sasl.mechanism" 'GSSAPI', "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab', "#sasl.kerberos.principal" 'adb-to-kafka', "#sasl.kerberos.service.name" 'kafka', "#ssl.ca.location" '/etc/pki/tls/certs/ca-bundle.crt', "#ssl.certificate.location" '/etc/ssl/certs/host_cert.cert', "#ssl.key.location" '/etc/ssl/host_cert.key', "#ssl.key.password" 'bigdata' );
-
-
Прочитайте данные из foreign-таблицы
kafka_table_ssl
:SELECT * FROM kafka_table_ssl;
Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции
k_initial_offset
иk_automatic_offsets
в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:NOTICE: Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0 (seg0 slice1 10.92.40.81:10000 pid=9031) a | b ---+--------------- 1 | ssl-test-data (1 row)