Примеры использования Kafka to ADB Connector

В статье показаны примеры чтения собщений коннектором Kafka to ADB, записанных в формате AVRO OCF в топик Kafka (в составе ADS) при помощи коннектора ADB to Kafka. Использование обоих коннекторов одновременно необязательно, цель примеров — показать возможность чтения сообщений в формате AVRO OCF (именно в этом формате сообщения сериализуются коннектором ADB to Kafka). При выполнении примеров соблюдены следующие условия:

  • Кластер ADS установлен согласно руководству Online-установка на следующих хостах: bds-ads1, bds-ads2, bds-ads3.

  • Кластер ADB установлен согласно руководству Online-установка.

  • В кластере ADB добавлен и установлен сервис Kafka to ADB.

  • База данных с именем adb существует в ADB.

  • Все ноды кластера ADB имеют доступ к порту 9092 на всех нодах кластера ADS, через которые планируется производить загрузку данных.

  • На стороне ADB установлен ADB to Kafka Connector.

Протокол PLAINTEXT (по умолчанию)

  1. Выполните шаги 1-5 из подраздела Без использования опции SERVER статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик topic_adb_to_kafka будет содержать 5 сообщений.

  2. Подключитесь к базе данных adb кластера ADB под пользователем gpadmin (например, через psql). Создайте последовательно следующие объекты.

    • Сервер kafka_server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • Foreign-таблица kafka_table (обратите внимание, что структура таблицы должна соответствовать используемой AVRO-схеме):

      DROP FOREIGN TABLE IF EXISTS kafka_table;
      CREATE FOREIGN TABLE kafka_table(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000'
      );
  3. Добавьте данные в таблицу ext_adb_to_kafka для их отправки в топик ADS topic_adb_to_kafka:

    INSERT INTO ext_adb_to_kafka VALUES
    (6,'test6'),
    (7,'test7'),
    (8,'test8'),
    (9,'test9'),
    (10,'test10');
  4. Прочитайте данные из foreign-таблицы kafka_table:

    SELECT * FROM kafka_table;

    Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции k_initial_offset и k_automatic_offsets в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=16387)
    NOTICE:  Kafka-ADB: Offset for partition 0 (0) is increased to 5 to match the smallest offset of an existing message in Kafka  (seg0 slice1 10.92.40.81:10000 pid=16387)
     a |   b
    ---+-------
     8 | test8
     9 | test9
     10 | test10
     6 | test6
     7 | test7
    (5 rows)
  5. Убедитесь в появлении записей в таблице kadb.offsets:

    SELECT * FROM kadb.offsets;

    Результат:

     ftoid | prt | off
    -------+-----+-----
     45426 |   0 |  10
    (1 row)

    где 45426 — это OID foreign-таблицы kafka_table, который может быть получен следующим образом:

    SELECT 'public.kafka_table'::regclass::oid;
      oid
    -------
     45426
    (1 row)

Протокол SASL_PLAINTEXT

С использованием механизма PLAIN

  1. Выполните все шаги из подраздела С использованием механизма PLAIN статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик topic_adb_to_kafka_sasl будет содержать 1 сообщение.

  2. Подключитесь к базе данных adb кластера ADB под пользователем gpadmin (например, через psql). Создайте последовательно следующие объекты.

    • Сервер kafka_server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • Foreign-таблица kafka_table_sasl (обратите внимание, что по сравнению с предыдущим примером в определении таблицы присутствуют опции librdkafka, специфичные для протокола SASL_PLAINTEXT и механизма PLAIN):

      DROP FOREIGN TABLE IF EXISTS kafka_table_sasl;
      CREATE FOREIGN TABLE kafka_table_sasl(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka_sasl',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_plaintext',
          "#sasl.mechanism" 'PLAIN',
          "#sasl.username" 'adb-to-kafka',
          "#sasl.password" '123'
      );
  3. Прочитайте данные из foreign-таблицы kafka_table_sasl:

    SELECT * FROM kafka_table_sasl;

    Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции k_initial_offset и k_automatic_offsets в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=8036)
     a |   b
    ---+-------
     1 | test1
    (1 row)
Ошибки аутентификации

 

В тех случаях, когда логин или пароль пользователя не указаны при объявлении foreign-таблицы ADB либо указаны неверно, при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  Kafka-ADB: [kafka] Failed to retrieve metadata for topic 'topic_adb_to_kafka_sasl': Local: Broker transport failure [-195] [-1]

С использованием механизма GSSAPI (Kerberos)

  1. Выполните все шаги из подраздела С использованием механизма GSSAPI (Kerberos) статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик topic_adb_to_kafka_kerberos будет содержать 1 сообщение.

  2. Подключитесь к базе данных adb кластера ADB под пользователем gpadmin (например, через psql). Создайте последовательно следующие объекты.

    • Сервер kafka_server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • Foreign-таблица kafka_table_kerberos (обратите внимание, что по сравнению с предыдущими примерами в определении таблицы присутствуют опции librdkafka, специфичные для протокола SASL_PLAINTEXT и механизма GSSAPI):

      DROP FOREIGN TABLE IF EXISTS kafka_table_kerberos;
      CREATE FOREIGN TABLE kafka_table_kerberos(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka_kerberos',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_plaintext',
          "#sasl.mechanism" 'GSSAPI',
          "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab',
          "#sasl.kerberos.principal" 'adb-to-kafka',
          "#sasl.kerberos.service.name" 'kafka'
      );
  3. Прочитайте данные из foreign-таблицы kafka_table_kerberos:

    SELECT * FROM kafka_table_kerberos;

    Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции k_initial_offset и k_automatic_offsets в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=24779)
     a |    b
    ---+----------
     1 | kerberos
    (1 row)
Ошибки аутентификации

 

В тех случаях, когда keytab, принципал или имя сервиса заполнены неверно при объявлении foreign-таблицы ADB (и при этом в кеше нет валидного тикета для выбранного сервиса), при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  Kafka-ADB: [kafka] Failed to retrieve metadata for topic 'topic_adb_to_kafka_kerberos': Local: Broker transport failure [-195] [-1]

Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos)

  1. Выполните все шаги из подраздела Протокол SASL_SSL с использованием механизма GSSAPI (Kerberos) статьи Примеры использования ADB to Kafka Connector (если они не были выполнены ранее). В результате тестовый топик topic_ssl будет содержать 1 сообщение.

  2. Подключитесь к базе данных adb кластера ADB под пользователем gpadmin (например, через psql). Создайте последовательно следующие объекты.

    • Сервер kafka_server_ssl (обратите внимание, что по сравнению с предыдущими примерами в определении сервера необходимо обязательно указать FQDN хостов ADS):

      DROP SERVER IF EXISTS kafka_server_ssl CASCADE;
      CREATE SERVER kafka_server_ssl
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092'
      );
    • Foreign-таблица kafka_table_ssl (обратите внимание, что по сравнению с предыдущими примерами в определении таблицы присутствуют опции librdkafka, специфичные для протокола SASL_SSL и механизма GSSAPI):

      DROP FOREIGN TABLE IF EXISTS kafka_table_ssl;
      CREATE FOREIGN TABLE kafka_table_ssl(a INT, b TEXT)
      SERVER kafka_server_ssl
      OPTIONS (
          format 'avro',
          k_topic 'topic_ssl',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_ssl',
          "#sasl.mechanism" 'GSSAPI',
          "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab',
          "#sasl.kerberos.principal" 'adb-to-kafka',
          "#sasl.kerberos.service.name" 'kafka',
          "#ssl.ca.location" '/etc/pki/tls/certs/ca-bundle.crt',
          "#ssl.certificate.location" '/etc/ssl/certs/host_cert.cert',
          "#ssl.key.location" '/etc/ssl/host_cert.key',
          "#ssl.key.password" 'bigdata'
      );
  3. Прочитайте данные из foreign-таблицы kafka_table_ssl:

    SELECT * FROM kafka_table_ssl;

    Результат содержит служебные уведомления, подробно описанные в статье Настройка Kafka to ADB Connector (см. опции k_initial_offset и k_automatic_offsets в таблице Опции сервера), а также список сообщений, прочитанных из Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=9031)
     a |       b
    ---+---------------
     1 | ssl-test-data
    (1 row)
Ошибки аутентификации

 

В тех случаях, когда пути к сертификатам и ключам не указаны при объявлении foreign-таблицы ADB либо указаны неверно, при попытке добавления данных в таблицу возвращается ошибка следующего вида:

ERROR:  Kafka-ADB: [kafka] Failed to create a Kafka consumer: ssl.key.location failed: ssl_rsa.c:633: error:140B0002:SSL routines:SSL_CTX_use_PrivateKey_file:system lib:  [5]

При указании неверных keytab, принципала или имени сервиса сообщение об ошибке идентично описанному выше для протокола SASL_PLAINTEXT с механизмом GSSAPI.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней