Kafka to ADB Connector usage examples

This article describes how Kafka to ADB Connector can be used to read AVRO OCF messages written to Kafka topics (as a part of ADS) by ADB to Kafka Connector. The use of both connectors simultaneously is optional. The purpose of examples is only to show the ability to read AVRO OCF messages (this is the format in which messages are serialized by ADB to Kafka Connector). The following prerequisites are met:

  • The ADS cluster is installed according to the Online installation guide on the following hosts: bds-ads1, bds-ads2, bds-ads3.

  • The ADB cluster is installed according to the Online installation guide.

  • In ADB, the Kafka to ADB service is added and installed.

  • The adb database exists in ADB.

  • All ADB nodes have access to the 9092 port on all ADS nodes through which you plan to load data.

  • In ADB, the ADB to Kafka Connector is installed.

PLAINTEXT security protocol (by default)

  1. Perform steps 1-5 from the Without SERVER option section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the topic_adb_to_kafka topic should contain 5 messages.

  2. Connect to the adb database of the ADB cluster under the gpadmin user (for example, via psql). Create the following objects in the given order.

    • The kafka_server server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • The kafka_table foreign table (note that the table structure should match the AVRO schema):

      DROP FOREIGN TABLE IF EXISTS kafka_table;
      CREATE FOREIGN TABLE kafka_table(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000'
      );
  3. In ADB, insert some data into the ext_adb_to_kafka table to send it to the topic_adb_to_kafka topic in ADS:

    INSERT INTO ext_adb_to_kafka VALUES
    (6,'test6'),
    (7,'test7'),
    (8,'test8'),
    (9,'test9'),
    (10,'test10');
  4. Select data from the kafka_table foreign table:

    SELECT * FROM kafka_table;

    The result contains some notifications that are described in Kafka to ADB Connector configuration (see the k_initial_offset and k_automatic_offsets options in the Server options table) and a list of messages read from Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=16387)
    NOTICE:  Kafka-ADB: Offset for partition 0 (0) is increased to 5 to match the smallest offset of an existing message in Kafka  (seg0 slice1 10.92.40.81:10000 pid=16387)
     a |   b
    ---+-------
     8 | test8
     9 | test9
     10 | test10
     6 | test6
     7 | test7
    (5 rows)
  5. Check that the kadb.offsets table contains a new row:

    SELECT * FROM kadb.offsets;

    The result:

     ftoid | prt | off
    -------+-----+-----
     45426 |   0 |  10
    (1 row)

    where 45426 is OID of the kafka_table foreign table, which can be defined as follows:

    SELECT 'public.kafka_table'::regclass::oid;
      oid
    -------
     45426
    (1 row)

SASL_PLAINTEXT security protocol

PLAIN mechanism

  1. Perform all steps from the PLAIN mechanism section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the topic_adb_to_kafka_sasl topic should contain 1 message.

  2. Connect to the adb database of the ADB cluster under the gpadmin user (for example, via psql). Create the following objects in the given order.

    • The kafka_server server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • The kafka_table_sasl foreign table (note that compared to the previous example, the table definition contains librdkafka options specific for the SASL_PLAINTEXT protocol with the PLAIN mechanism):

      DROP FOREIGN TABLE IF EXISTS kafka_table_sasl;
      CREATE FOREIGN TABLE kafka_table_sasl(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka_sasl',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_plaintext',
          "#sasl.mechanism" 'PLAIN',
          "#sasl.username" 'adb-to-kafka',
          "#sasl.password" '123'
      );
  3. Select data from the kafka_table_sasl foreign table:

    SELECT * FROM kafka_table_sasl;

    The result contains some notifications that are described in Kafka to ADB Connector configuration (see the k_initial_offset and k_automatic_offsets options in the Server options table) and a list of messages read from Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=8036)
     a |   b
    ---+-------
     1 | test1
    (1 row)
Authentication errors

 

If user credentials are not specified correctly in the foreign table definition, the following error occurs when you try to add data to that table:

ERROR:  Kafka-ADB: [kafka] Failed to retrieve metadata for topic 'topic_adb_to_kafka_sasl': Local: Broker transport failure [-195] [-1]

GSSAPI mechanism (Kerberos)

  1. Perform all steps from the GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the topic_adb_to_kafka_kerberos topic should contain 1 message.

  2. Connect to the adb database of the ADB cluster under the gpadmin user (for example, via psql). Create the following objects in the given order.

    • The kafka_server server:

      DROP SERVER IF EXISTS kafka_server CASCADE;
      CREATE SERVER kafka_server
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1:9092,bds-ads2:9092,bds-ads3:9092'
      );
    • The kafka_table_kerberos foreign table (note that compared to the previous examples, the table definition contains librdkafka options specific for the SASL_PLAINTEXT protocol with the GSSAPI mechanism):

      DROP FOREIGN TABLE IF EXISTS kafka_table_kerberos;
      CREATE FOREIGN TABLE kafka_table_kerberos(a INT, b TEXT)
      SERVER kafka_server
      OPTIONS (
          format 'avro',
          k_topic 'topic_adb_to_kafka_kerberos',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_plaintext',
          "#sasl.mechanism" 'GSSAPI',
          "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab',
          "#sasl.kerberos.principal" 'adb-to-kafka',
          "#sasl.kerberos.service.name" 'kafka'
      );
  3. Select data from the kafka_table_kerberos foreign table:

    SELECT * FROM kafka_table_kerberos;

    The result contains some notifications that are described in Kafka to ADB Connector configuration (see the k_initial_offset and k_automatic_offsets options in the Server options table) and a list of messages read from Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=24779)
     a |    b
    ---+----------
     1 | kerberos
    (1 row)
Authentication errors

 

If connection parameters (keytab, principal, or service name) are not specified correctly in the external table definition and there is no a valid ticket for the selected service in the ticket cache, the following error occurs when you try to add data to that table:

ERROR:  Kafka-ADB: [kafka] Failed to retrieve metadata for topic 'topic_adb_to_kafka_kerberos': Local: Broker transport failure [-195] [-1]

SASL_SSL security protocol with GSSAPI mechanism (Kerberos)

  1. Perform all steps from the SASL_SSL security protocol with GSSAPI mechanism (Kerberos) section of the ADB to Kafka Connector usage examples article (if you have not passed them yet). As a result, the topic_ssl topic should contain 1 message.

  2. Connect to the adb database of the ADB cluster under the gpadmin user (for example, via psql). Create the following objects in the given order.

    • The kafka_server_ssl server (note that compared to the previous examples, the server definition should contain FQDN of ADS hosts):

      DROP SERVER IF EXISTS kafka_server_ssl CASCADE;
      CREATE SERVER kafka_server_ssl
      FOREIGN DATA WRAPPER kadb_fdw
      OPTIONS (
          k_brokers 'bds-ads1.ru-central1.internal:9092,bds-ads2.ru-central1.internal:9092,bds-ads3.ru-central1.internal:9092'
      );
    • The kafka_table_ssl foreign table (note that compared to the previous examples, the table definition contains librdkafka options specific for the SASL_SSL protocol with the GSSAPI mechanism):

      DROP FOREIGN TABLE IF EXISTS kafka_table_ssl;
      CREATE FOREIGN TABLE kafka_table_ssl(a INT, b TEXT)
      SERVER kafka_server_ssl
      OPTIONS (
          format 'avro',
          k_topic 'topic_ssl',
          k_consumer_group 'group',
          k_seg_batch '100',
          k_timeout_ms '1000',
          "#security.protocol" 'sasl_ssl',
          "#sasl.mechanism" 'GSSAPI',
          "#sasl.kerberos.keytab" '/var/lib/pxf/conf/kerberos/adb-to-kafka.service.keytab',
          "#sasl.kerberos.principal" 'adb-to-kafka',
          "#sasl.kerberos.service.name" 'kafka',
          "#ssl.ca.location" '/etc/pki/tls/certs/ca-bundle.crt',
          "#ssl.certificate.location" '/etc/ssl/certs/host_cert.cert',
          "#ssl.key.location" '/etc/ssl/host_cert.key',
          "#ssl.key.password" 'bigdata'
      );
  3. Select data from the kafka_table_ssl foreign table:

    SELECT * FROM kafka_table_ssl;

    The result contains some notifications that are described in Kafka to ADB Connector configuration (see the k_initial_offset and k_automatic_offsets options in the Server options table) and a list of messages read from Kafka:

    NOTICE:  Kafka-ADB: Offset for partition 0 is not known, and is set to default value 0  (seg0 slice1 10.92.40.81:10000 pid=9031)
     a |       b
    ---+---------------
     1 | ssl-test-data
    (1 row)
Authentication errors

 

If certificate paths are not specified correctly in the foreign table definition, the following error occurs when you try to add data to that table:

ERROR:  Kafka-ADB: [kafka] Failed to create a Kafka consumer: ssl.key.location failed: ssl_rsa.c:633: error:140B0002:SSL routines:SSL_CTX_use_PrivateKey_file:system lib:  [5]

When specifying an incorrect keytab, principal, or service name, you get an error message identical to the one described above for the SASL_PLAINTEXT protocol with GSSAPI mechanism.

Found a mistake? Seleсt text and press Ctrl+Enter to report it