ADB to Kafka Connector configuration

To send data from ADB to Kafka via ADB to Kafka Connector, you should first create a writable external table on the ADB cluster side. In the LOCATION clause, set the PXF protocol with the kafka profile and connection options.

To see how to send data from ADB to ADS using the settings listed below, refer to ADB to Kafka Connector usage examples.

Create a writable external table

To create a writable external table, use the CREATE WRITABLE EXTERNAL TABLE command. The basic command syntax is listed below:

CREATE WRITABLE EXTERNAL TABLE <table_name> (
    { <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
    'pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<option>=<value>[...]]'
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

where:

  • <table_name> — an external table name in ADB.

  • <column_name> — a column name.

  • <data_type> — a column data type.

  • <other_table> — a source table from which column names, column data types, and a data distribution policy will be copied to the new external table. Note that the column constraints and default values that are specified in the source table are not copied because they are not supported in external tables.

  • <kafka_topic> — a topic name in Kafka.

  • <server_name> — a server name in the $PXF_BASE/servers/ directory (by default, /var/lib/pxf/servers/). For more information, see Use the SERVER option.

  • <option> — options that define the connection details. The options that are available for ADB to Kafka Connector are listed in the following tables: General options, Kafka options, JAAS options for the SASL GSSAPI mechanism (Kerberos), JAAS options for the SASL PLAIN mechanism.

  • <value> — option values.

NOTE
  • For the optimal performance, the data distribution policy in the external table should be similar to the policy in the source table from which data will be selected before sending it to Kafka. This will allow you to send data directly from ADB segments rather than redistribute it before sending. For this purpose, it is recommended to use the LIKE clause when creating an external table, or explicitly specify a similar distribution key in the DISTRIBUTED BY clause.

  • The full description of the CREATE EXTERNAL TABLE command syntax is available in the Greenplum documentation.

  • To alter an external table definition, use the ALTER EXTERNAL TABLE command. To delete an external table, use DROP EXTERNAL TABLE.

General options
Name Description Default Required

KAFKA_ADMIN_CONNECTION_TIMEOUT

A grace period during which current operations are allowed to complete after closing Kafka connection (in milliseconds). Once that time period is over, all operations that have not yet been completed are aborted with the following exception: org.apache.kafka.common.errors.TimeoutException

30000

No

BATCH_SIZE

A number of rows that producers should batch in one Avro message

1

No

TOPIC_AUTO_CREATE_FLAG

A flag that indicates whether a Kafka topic should be automatically created when producers try to write data into it. The topic will be created with one partition and replication factor = 1

true

No

AVRO_DEFAULT_DECIMAL_PRECISION

A maximum number of digits in a number (except a decimal point).

Positive integers are allowed

38

No

AVRO_DEFAULT_DECIMAL_SCALE

A maximum number of digits to the right of the decimal point in a number.

Must be zero or a positive integer less than or equal to the AVRO_DEFAULT_DECIMAL_PRECISION value

18

No

Kafka options
Name Description Default Required

BOOTSTRAP_SERVERS

A comma-separated list of Kafka brokers, each of which is specified in the following format: <hostname>:<port> or <IP>:<port>

 — 

Yes

SECURITY_PROTOCOL

A security protocol that is used to communicate with Kafka brokers. Possible values:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

PLAINTEXT

No

SASL_MECHANISM

A SASL mechanism that is used for client connections. Possible values:

  • PLAIN

  • GSSAPI

 — 

Yes if SECURITY_PROTOCOL=SASL_PLAINTEXT|SASL_SSL

TRUSTSTORE_LOCATION

A path to the truststore file

 — 

Yes if SECURITY_PROTOCOL = SASL_SSL

TRUSTSTORE_PASSWORD

A password for the truststore file

 — 

Yes if SECURITY_PROTOCOL = SASL_SSL

KERBEROS_SERVICE_NAME

A name of the Kerberos principal used by Kafka. Can be defined in the Kafka configuration file or JAAS configuration file

 — 

Yes if SASL_MECHANISM = GSSAPI

JAAS options for the SASL GSSAPI mechanism (Kerberos)

 

The following parameters are used if SECURITY_PROTOCOL is set to SASL_PLAINTEXT or SASL_SSL, and SASL_MECHANISM is set to GSSAPI. Since each parameter can depend on others, all of them are marked as optional.

NOTE

For more information on JAAS configuration, see JAAS Configuration in the Kafka documentation.

Name Description Default Required

KERBEROS_PRINCIPAL

A name of the Kerberos principal that should be used. The principal can be a simple username such as "testuser" or a service name such as "host/testhost.eng.sun.com"

 — 

No

KERBEROS_KEYTAB

A file name of the keytab to get a principal secret key

 — 

No

KERBEROS_USE_KEYTAB

A flag that indicates whether to get a principal secret key from a keytab

true

No

KERBEROS_STORE_KEY

A flag that indicates whether to store a keytab or a principal key in the Subject private credentials

true

No

KERBEROS_USE_TICKET_CACHE

A flag that indicates whether to obtain a Ticket-Granting Ticket (TGT) from the ticket cache

false

No

KERBEROS_DEBUG

A flag that indicates whether to output debug messages

false

No

JAAS_CONFIG_FILE

A path to the JAAS configuration file. Use if options present in the current table are not enough for a specified JAAS configuration. See details below

 — 

No

JAAS options for the SASL PLAIN mechanism

 

The following parameters are used if SECURITY_PROTOCOL is set to SASL_PLAINTEXT or SASL_SSL, and SASL_MECHANISM is set to PLAIN.

Name Description Default Required

SASL_USER

A user name for client connections

 — 

Yes if SASL_MECHANISM=PLAIN

SASL_USER_PASSWORD

A password of the SASL_USER user

 — 

Yes if SASL_MECHANISM=PLAIN

JAAS_CONFIG_FILE

A path to the JAAS configuration file. Use if options present in the current table are not enough for a specified JAAS configuration. See details below

 — 

No

JAAS_CONFIG_FILE

 

ADB to Kafka Connector uses Java Authentication and Authorization Service (JAAS) when connecting to Kafka via SASL_PLAINTEXT and SASL_SSL protocols. For JAAS configuration, the sasl.jaas.config parameter is used. There are two options to set up that parameter in the connector:

It is recommended to use JAAS_CONFIG_FILE if additional options, that are not present in two abovementioned tables, are required for a specific JAAS configuration. The file, which path is set in the JAAS_CONFIG_FILE option, should contain only one line. File examples are listed below.

File example for SASL_MECHANISM = PLAIN
org.apache.kafka.common.security.plain.PlainLoginModule required username\="kafka_user" password\="kafka_password";
File example for SASL_MECHANISM = GSSAPI
com.sun.security.auth.module.Krb5LoginModule required principal="kafka_user@COMPANY.INTERNAL" keyTab="/var/lib/pxf/conf/ssl/kafka_user.keytab" useKeyTab="true" storeKey="true" useTicketCache="false" debug="false";

Before starting queries to an external table, you should copy the file to each ADB segment host.

Use the SERVER option

By using SERVER=<server_name> in the external table creation query, you can read options from the configuration file without explicitly specifying them in the LOCATION clause. To use the SERVER option, follow the steps:

  1. Add a new directory to $PXF_BASE/servers/ on Master. The new directory name will be used as a server name (<server_name>) below:

    $ sudo mkdir /var/lib/pxf/servers/<server_name>
    NOTE

    The directory, where PXF configuration files are located, is defined by the $PXF_BASE environment variable. The default value is /var/lib/pxf. The path may differ in your environment.

  2. Create the kafka-site.xml configuration file in the /var/lib/pxf/servers/<server_name> directory:

    $ sudo vi /var/lib/pxf/servers/<server_name>/kafka-site.xml

    The example of kafka-site.xml is listed below.

    kafka-site.xml example
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>kafka.bootstrap.servers</name>
            <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value>
        </property>
        <property>
            <name>kafka.batch.size</name>
            <value>10</value>
        </property>
        <property>
            <name>kafka.topic.auto.create</name>
            <value>true</value>
        </property>
        <property>
            <name>avro.decimal.default.precision</name>
            <value>38</value>
        </property>
        <property>
            <name>avro.decimal.default.scale</name>
            <value>18</value>
        </property>
    </configuration>
  3. In the kafka-site.xml file, specify configuration parameters that will be used to connect to Kafka. Available parameters are listed below. For each of them, the corresponding option from the abovementioned query is specified. Mandatory requirements and default values remain the same.

    kafka-site.xml parameters
    kafka-site.xml parameter Option in the LOCATION query clause

    kafka.bootstrap.servers

    BOOTSTRAP_SERVERS

    kafka.property.security.protocol

    SECURITY_PROTOCOL

    kafka.property.sasl.mechanism

    SASL_MECHANISM

    kafka.property.ssl.truststore.location

    TRUSTSTORE_LOCATION

    kafka.property.ssl.truststore.password

    TRUSTSTORE_PASSWORD

    kafka.property.sasl.kerberos.service.name

    KERBEROS_SERVICE_NAME

    kafka.jaas.property.sasl.kerberos.principal

    KERBEROS_PRINCIPAL

    kafka.jaas.property.sasl.kerberos.keytab

    KERBEROS_KEYTAB

    kafka.jaas.property.sasl.kerberos.useKeyTab

    KERBEROS_USE_KEYTAB

    kafka.jaas.property.sasl.kerberos.storeKey

    KERBEROS_STORE_KEY

    kafka.jaas.property.sasl.kerberos.useTicketCache

    KERBEROS_USE_TICKET_CACHE

    kafka.jaas.property.sasl.kerberos.debug

    KERBEROS_DEBUG

    kafka.jaas.property.sasl.user

    SASL_USER

    kafka.jaas.property.sasl.user.password

    SASL_USER_PASSWORD

    kafka.jaas.property.config.file

    JAAS_CONFIG_FILE

    kafka.batch.size

    BATCH_SIZE

    kafka.admin.close.connection.timeout

    KAFKA_ADMIN_CONNECTION_TIMEOUT

    kafka.topic.auto.create

    TOPIC_AUTO_CREATE_FLAG

    avro.decimal.default.precision

    AVRO_DEFAULT_DECIMAL_PRECISION

    avro.decimal.default.scale

    AVRO_DEFAULT_DECIMAL_SCALE

  4. Login under the default user name gpadmin. All the commands listed in the subsequent steps should be performed on Master:

    $ sudo su - gpadmin
  5. Synchronize the PXF configuration on all hosts in the ADB cluster:

    $ pxf cluster sync

    The result:

    Syncing PXF configuration files from master host to standby master host and 2 segment hosts...
    PXF configs synced successfully on 3 out of 3 hosts
  6. Restart PXF:

    $ pxf cluster restart

    The result:

    Restarting PXF on master host, standby master host, and 2 segment hosts...
    PXF restarted successfully on 4 out of 4 hosts
  7. When creating an external table, specify SERVER=<server_name> in the LOCATION clause. Do not fill in other options:

    CREATE WRITABLE EXTERNAL TABLE <table_name> (
        { <column_name> <data_type> [, ...] | LIKE <other_table> }
    )
    LOCATION (
        'pxf://<kafka_topic>?PROFILE=kafka&SERVER=<server_name>'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

Use custom options

In addition to ADB to Kafka Connector options mentioned above, you can define any other property available for a Kafka producer. There are two ways to set custom options:

  • Define Kafka properties in the LOCATION clause when creating an external table. For example:

    CREATE WRITABLE EXTERNAL TABLE test1 (a INT, b TEXT)
      LOCATION ('pxf://test_topic?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&kafka.property.delivery.timeout.ms=131000')
      FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  • Add Kafka properties to the kafka-site.xml file for a server that will be specified in the SERVER option. For example:

    <property>
        <name>kafka.property.delivery.timeout.ms</name>
        <value>131000</value>
    </property>
IMPORTANT

When setting custom options, you should add the kafka.property. prefix to property names from Kafka documentation. Due to this, the delivery.timeout.ms Kafka property is called kafka.property.delivery.timeout.ms in the examples above.

Greenplum and AVRO data types

The following table describes how Greenplum data types are converted into AVRO types when the ADB to Kafka Connector is used.

Greenplum type AVRO primitive type AVRO logical type

BOOLEAN

BOOLEAN

 — 

TEXT

STRING

 — 

VARCHAR

STRING

 — 

TIMESTAMP

LONG

timestamp-micros

BIGINT

LONG

 — 

TIME

LONG

time-micros

NUMERIC

DOUBLE

 — 

FLOAT8

DOUBLE

 — 

REAL

FLOAT

 — 

SMALLINT

INT

 — 

INTEGER

INT

 — 

DATE

INT

date

Found a mistake? Seleсt text and press Ctrl+Enter to report it