ADB to Kafka Connector configuration
To send data from ADB to Kafka via ADB to Kafka Connector, you should first create a writable external table on the ADB cluster side. In the LOCATION
clause, set the PXF protocol with the kafka
profile and connection options.
To see how to send data from ADB to ADS using the settings listed below, refer to ADB to Kafka Connector usage examples.
Create a writable external table
To create a writable external table, use the CREATE WRITABLE EXTERNAL TABLE
command. The basic command syntax is listed below:
CREATE WRITABLE EXTERNAL TABLE <table_name> (
{ <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
'pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<option>=<value>[...]]'
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
where:
-
<table_name>
— an external table name in ADB. -
<column_name>
— a column name. -
<data_type>
— a column data type. -
<other_table>
— a source table from which column names, column data types, and a data distribution policy will be copied to the new external table. Note that the column constraints and default values that are specified in the source table are not copied because they are not supported in external tables. -
<kafka_topic>
— a topic name in Kafka. -
<server_name>
— a server name in the $PXF_BASE/servers/ directory (by default, /var/lib/pxf/servers/). For more information, see Use the SERVER option. -
<option>
— options that define the connection details. The options that are available for ADB to Kafka Connector are listed in the following tables: General options, Kafka options, JAAS options for the SASL GSSAPI mechanism (Kerberos), JAAS options for the SASL PLAIN mechanism. -
<value>
— option values.
NOTE
|
Name | Description | Default | Required |
---|---|---|---|
KAFKA_ADMIN_CONNECTION_TIMEOUT |
A grace period during which current operations are allowed to complete after closing Kafka connection (in milliseconds). Once that time period is over, all operations that have not yet been completed are aborted with the following exception: |
30000 |
No |
BATCH_SIZE |
A number of rows that producers should batch in one Avro message |
1 |
No |
TOPIC_AUTO_CREATE_FLAG |
A flag that indicates whether a Kafka topic should be automatically created when producers try to write data into it. The topic will be created with one partition and replication factor = |
true |
No |
AVRO_DEFAULT_DECIMAL_PRECISION |
A maximum number of digits in a number (except a decimal point). Positive integers are allowed |
38 |
No |
AVRO_DEFAULT_DECIMAL_SCALE |
A maximum number of digits to the right of the decimal point in a number. Must be zero or a positive integer less than or equal to the |
18 |
No |
Name | Description | Default | Required |
---|---|---|---|
BOOTSTRAP_SERVERS |
A comma-separated list of Kafka brokers, each of which is specified in the following format: |
— |
Yes |
SECURITY_PROTOCOL |
A security protocol that is used to communicate with Kafka brokers. Possible values:
|
PLAINTEXT |
No |
SASL_MECHANISM |
A SASL mechanism that is used for client connections. Possible values:
If you need another authentication mechanism, you can use custom settings to set up required Kafka producer parameters and JAAS configuration file |
— |
Yes if |
TRUSTSTORE_LOCATION |
A path to the truststore file |
— |
Yes if |
TRUSTSTORE_PASSWORD |
A password for the truststore file |
— |
Yes if |
KEYSTORE_LOCATION |
A path to the keystore file |
— |
Yes if |
KEYSTORE_PASSWORD |
A password for the keystore file |
— |
Yes if |
KERBEROS_SERVICE_NAME |
A name of the Kerberos principal used by Kafka. Can be defined in the Kafka configuration file or JAAS configuration file |
— |
Yes if |
The following parameters are used if SECURITY_PROTOCOL
is set to SASL_PLAINTEXT
or SASL_SSL
, and SASL_MECHANISM
is set to GSSAPI
. Since each parameter can depend on others, all of them are marked as optional.
NOTE
For more information on JAAS configuration, see JAAS Configuration in the Kafka documentation. |
Name | Description | Default | Required |
---|---|---|---|
KERBEROS_PRINCIPAL |
A name of the Kerberos principal that should be used. The principal can be a simple username such as |
— |
No |
KERBEROS_KEYTAB |
A file name of the keytab to get a principal secret key |
— |
No |
KERBEROS_USE_KEYTAB |
A flag that indicates whether to get a principal secret key from a keytab |
true |
No |
KERBEROS_STORE_KEY |
A flag that indicates whether to store a keytab or a principal key in the Subject private credentials |
true |
No |
KERBEROS_USE_TICKET_CACHE |
A flag that indicates whether to obtain a Ticket-Granting Ticket (TGT) from the ticket cache |
false |
No |
KERBEROS_DEBUG |
A flag that indicates whether to output debug messages |
false |
No |
JAAS_CONFIG_FILE |
A path to the JAAS configuration file. Use if options present in the current table are not enough for a specified JAAS configuration. See details below |
— |
No |
The following parameters are used if SECURITY_PROTOCOL
is set to SASL_PLAINTEXT
or SASL_SSL
, and SASL_MECHANISM
is set to PLAIN
, SCRAM-SHA-256
, or SCRAM-SHA-512
.
Name | Description | Default | Required |
---|---|---|---|
SASL_USER |
A user name for client connections |
— |
Yes if |
SASL_USER_PASSWORD |
A password of the |
— |
Yes if |
JAAS_CONFIG_FILE |
A path to the JAAS configuration file. Use if options present in the current table are not enough for a specified JAAS configuration. See details below |
— |
No |
Use the SERVER option
By using SERVER=<server_name>
in the external table creation query, you can read options from the configuration file without explicitly specifying them in the LOCATION
clause. To use the SERVER
option, follow the steps:
-
Add a new directory to $PXF_BASE/servers/ on Master. The new directory name will be used as a server name (
<server_name>
) below:$ sudo mkdir /var/lib/pxf/servers/<server_name>
NOTEThe directory, where PXF configuration files are located, is defined by the
$PXF_BASE
environment variable. The default value is /var/lib/pxf. The path may differ in your environment. -
Create the kafka-site.xml configuration file in the /var/lib/pxf/servers/<server_name> directory:
$ sudo vi /var/lib/pxf/servers/<server_name>/kafka-site.xml
The example of kafka-site.xml is listed below.
kafka-site.xml example<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value> </property> <property> <name>kafka.batch.size</name> <value>10</value> </property> <property> <name>kafka.topic.auto.create</name> <value>true</value> </property> <property> <name>avro.decimal.default.precision</name> <value>38</value> </property> <property> <name>avro.decimal.default.scale</name> <value>18</value> </property> </configuration>
-
In the kafka-site.xml file, specify configuration parameters that will be used to connect to Kafka. Available parameters are listed below. For each of them, the corresponding option from the abovementioned query is specified. Mandatory requirements and default values remain the same.
kafka-site.xml parameterskafka-site.xml parameter Option in the LOCATION query clause kafka.bootstrap.servers
BOOTSTRAP_SERVERS
kafka.property.security.protocol
SECURITY_PROTOCOL
kafka.property.sasl.mechanism
SASL_MECHANISM
kafka.property.ssl.truststore.location
TRUSTSTORE_LOCATION
kafka.property.ssl.truststore.password
TRUSTSTORE_PASSWORD
kafka.property.ssl.keystore.location
KEYSTORE_LOCATION
kafka.property.ssl.keystore.password
KEYSTORE_PASSWORD
kafka.property.sasl.kerberos.service.name
KERBEROS_SERVICE_NAME
kafka.jaas.property.sasl.kerberos.principal
KERBEROS_PRINCIPAL
kafka.jaas.property.sasl.kerberos.keytab
KERBEROS_KEYTAB
kafka.jaas.property.sasl.kerberos.useKeyTab
KERBEROS_USE_KEYTAB
kafka.jaas.property.sasl.kerberos.storeKey
KERBEROS_STORE_KEY
kafka.jaas.property.sasl.kerberos.useTicketCache
KERBEROS_USE_TICKET_CACHE
kafka.jaas.property.sasl.kerberos.debug
KERBEROS_DEBUG
kafka.jaas.property.sasl.user
SASL_USER
kafka.jaas.property.sasl.user.password
SASL_USER_PASSWORD
kafka.jaas.property.config.file
JAAS_CONFIG_FILE
kafka.batch.size
BATCH_SIZE
kafka.admin.close.connection.timeout
KAFKA_ADMIN_CONNECTION_TIMEOUT
kafka.topic.auto.create
TOPIC_AUTO_CREATE_FLAG
avro.decimal.default.precision
AVRO_DEFAULT_DECIMAL_PRECISION
avro.decimal.default.scale
AVRO_DEFAULT_DECIMAL_SCALE
-
Login under the default user name
gpadmin
. All the commands listed in the subsequent steps should be performed on Master:$ sudo su - gpadmin
-
Synchronize the PXF configuration on all hosts in the ADB cluster:
$ pxf cluster sync
The result:
Syncing PXF configuration files from master host to standby master host and 2 segment hosts... PXF configs synced successfully on 3 out of 3 hosts
-
Restart PXF:
$ pxf cluster restart
The result:
Restarting PXF on master host, standby master host, and 2 segment hosts... PXF restarted successfully on 4 out of 4 hosts
-
When creating an external table, specify
SERVER=<server_name>
in theLOCATION
clause. Do not fill in other options:CREATE WRITABLE EXTERNAL TABLE <table_name> ( { <column_name> <data_type> [, ...] | LIKE <other_table> } ) LOCATION ( 'pxf://<kafka_topic>?PROFILE=kafka&SERVER=<server_name>' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
Use custom options
In addition to ADB to Kafka Connector options mentioned above, you can define any other property available for a Kafka producer. There are two ways to set custom options:
-
Define Kafka properties in the
LOCATION
clause when creating an external table. For example:CREATE WRITABLE EXTERNAL TABLE test1 (a INT, b TEXT) LOCATION ('pxf://test_topic?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&kafka.property.delivery.timeout.ms=131000') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Add Kafka properties to the kafka-site.xml file for a server that will be specified in the SERVER option. For example:
<property> <name>kafka.property.delivery.timeout.ms</name> <value>131000</value> </property>
IMPORTANT
When setting custom options, you should add the |
Custom configuration of the SASL authentication mechanism
If your Kafka broker is configured to authenticate clients with the authentication mechanism that is absent in the Kafka options list (see SASL_MECHANISM
), you can try to configure the plugin with the custom properties and JAAS configuration file (if required). For example, you would like to use SASL/OAUTHBEARER authentication that is not in the list. Do the following:
-
Set
SECURITY_PROTOCOL
toSASL_SSL
orSASL_PLAINTEXT
depending on the Kafka broker configuration. -
Set
SASL_MECHANISM
toOAUTHBEARER
. -
Set required Kafka clients properties that are needed for this authentication. See Use custom options for information.
-
Create a JAAS configuration file (see JAAS_CONFIG_FILE) and copy it to the same directory on each segment host. For example, you can copy it to the folder with the kafka-site.xml file.
-
Set
JAAS_CONFIG_FILE
to the value which is an absolute path of the file from the previous point. -
Create a table using the following query:
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT) LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=OAUTHBEARER&JAAS_CONFIG_FILE=/var/lib/pxf/servers/kafka/oauth.config&kafka.property.sasl.oauthbearer.token.endpoint.url=http://localhost:8080/token&kafka.property.sasl.oauthbearer.scope.claim.name=kafka-pxf') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
Alternatively, you can use the kafka-site.xml file. From the example above, the file would look like this:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value> </property> <property> <name>kafka.property.security.protocol</name> <value>SASL_SSL</value> </property> <property> <name>kafka.property.sasl.mechanism</name> <value>OAUTHBEARER</value> </property> <property> <name>kafka.jaas.property.config.file</name> <value>/var/lib/pxf/servers/kafka/oauth.config</value> </property> <property> <name>kafka.property.sasl.oauthbearer.token.endpoint.url</name> <value>http://localhost:8080/token</value> </property> <property> <name>kafka.property.sasl.oauthbearer.scope.claim.name</name> <value>kafka-pxf</value> </property> </configuration>
In case of saving the file in the /var/lib/pxf/servers/kafka_server directory, the table definition would look like this:
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT) LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&SERVER=kafka_server') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
Greenplum and AVRO data types
The following table describes how Greenplum data types are converted into AVRO types when the ADB to Kafka Connector is used.
Greenplum type | AVRO primitive type | AVRO logical type |
---|---|---|
BOOLEAN |
BOOLEAN |
— |
TEXT |
STRING |
— |
VARCHAR |
STRING |
— |
TIMESTAMP |
LONG |
timestamp-micros |
BIGINT |
LONG |
— |
TIME |
LONG |
time-micros |
NUMERIC |
DOUBLE |
— |
FLOAT8 |
DOUBLE |
— |
REAL |
FLOAT |
— |
SMALLINT |
INT |
— |
INTEGER |
INT |
— |
DATE |
INT |
date |