Настройка ADB to Kafka Connector
Для отправки данных из ADB в Kafka через ADB to Kafka Connector необходимо предварительно создать внешнюю таблицу writable external table на стороне кластера ADB, указав в директиве LOCATION
протокол PXF с профилем kafka
и опции соединения.
Примеры отправки данных из ADB в ADS с использованием описанных ниже настроек приведены в статье Примеры использования ADB to Kafka Connector.
Создание внешней таблицы
Чтобы создать внешнюю таблицу для записи данных, необходимо использовать команду CREATE WRITABLE EXTERNAL TABLE
, базовый синтаксис которой приведен ниже:
CREATE WRITABLE EXTERNAL TABLE <table_name> (
{ <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
'pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<option>=<value>[...]]'
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
где:
-
<table_name>
— имя внешней таблицы в ADB. -
<column_name>
— имя столбца. -
<data_type>
— тип данных столбца. -
<other_table>
— исходная таблица, из которой в новую внешнюю таблицу будут скопированы имена столбцов, их типы данных и политика распределения данных. Обратите внимание, что ограничения и значения по умолчанию, указанные в исходной таблице, не копируются, так как они не поддерживаются во внешних таблицах. -
<kafka_topic>
— название топика в Kafka. -
<server_name>
— имя сервера в директории $PXF_BASE/servers/ (по умолчанию /var/lib/pxf/servers/). Более подробную информацию можно получить в разделе Использование опции SERVER. -
<option>
— параметры, определяющие детали подключения к внешнему источнику данных. Список возможных параметров для коннектора ADB to Kafka приведены ниже в таблицах Основные опции, Опции Kafka, Опции JAAS при использовании SASL-механизма GSSAPI (Kerberos), Опции JAAS при использовании SASL-механизма PLAIN. -
<value>
— значения соответствующих параметров<option>
.
ПРИМЕЧАНИЕ
|
Имя | Описание | Default | Обязательность |
---|---|---|---|
KAFKA_ADMIN_CONNECTION_TIMEOUT |
Период времени, в течение которого допускается завершение текущих операций в случае закрытия соединения с Kafka (в миллисекундах). По истечении указанного тайм-аута все незавершенные операции будут прерваны с генерацией исключения |
30000 |
Нет |
BATCH_SIZE |
Число строк, которое должно помещаться в одно сообщение Avro |
1 |
Нет |
TOPIC_AUTO_CREATE_FLAG |
Флаг, указываюший на необходимость автоматического создания топика в Kafka. Топик будет создан с одной партицией и фактором репликации = |
true |
Нет |
AVRO_DEFAULT_DECIMAL_PRECISION |
Максимальное количество знаков в числах (за исключением десятичного разделителя). Для ввода допускаются целые положительные числа |
38 |
Нет |
AVRO_DEFAULT_DECIMAL_SCALE |
Максимальное количество знаков после десятичного разделителя в числах. Для ввода допускаются целые неотрицательные числа, не превышающие значение параметра |
18 |
Нет |
Имя | Описание | Default | Обязательность |
---|---|---|---|
BOOTSTRAP_SERVERS |
Разделенный запятыми список адресов брокеров Kafka, каждый из которых указан в формате |
— |
Да |
SECURITY_PROTOCOL |
Протокол, используемый для соединения с брокерами Kafka. Возможные значения:
|
PLAINTEXT |
Нет |
SASL_MECHANISM |
SASL-механизм, используемый для клиентских соединений. Возможные значения:
Если в списке отсутствует требуемый механизм аутентификации, можно использовать кастомный способ настройки параметров Kafka producer и JAAS-файла |
— |
Да, если |
TRUSTSTORE_LOCATION |
Путь к truststore-файлу |
— |
Да, если |
TRUSTSTORE_PASSWORD |
Пароль для доступа к truststore-файлу |
— |
Да, если |
KEYSTORE_LOCATION |
Путь к keystore-файлу |
— |
Да, если |
KEYSTORE_PASSWORD |
Пароль для доступа к keystore-файлу |
— |
Да, если |
KERBEROS_SERVICE_NAME |
Имя Kerberos-принципала, используемого Kafka. Может быть указано в конфигурационном файле Kafka либо в JAAS-файле |
— |
Да, если |
Приведенные ниже параметры используются, если в качестве протокола безопасности (SECURITY_PROTOCOL
) выбран SASL_PLAINTEXT
или SASL_SSL
, а в качестве механизма SASL (SASL_MECHANISM
) установлен GSSAPI
. Каждая из опций может зависеть от ряда других — поэтому все они отмечены как опциональные.
ПРИМЕЧАНИЕ
Дополнительную информацию об использовании JAAS можно получить в разделе JAAS Configuration документации Kafka. |
Имя | Описание | Default | Обязательность |
---|---|---|---|
KERBEROS_PRINCIPAL |
Имя принципала Kerberos, которое может быть как именем пользователя вида |
— |
Нет |
KERBEROS_KEYTAB |
Имя keytab-файла для получения секретного ключа принципала |
— |
Нет |
KERBEROS_USE_KEYTAB |
Флаг, указывающий на необходимость получения секретного ключа принципала из keytab |
true |
Нет |
KERBEROS_STORE_KEY |
Флаг, указывающий на необходимость хранения keytab или ключа принципала в Subject private credentials |
true |
Нет |
KERBEROS_USE_TICKET_CACHE |
Флаг, указывающий на необходимость получения Ticket-Granting Ticket (TGT) из кеша тикетов |
false |
Нет |
KERBEROS_DEBUG |
Флаг, указывающий на необходимость вывода отладочных сообщений |
false |
Нет |
JAAS_CONFIG_FILE |
Путь к конфигурационному файлу JAAS. Используется, если опций JAAS, приведенных в текущей таблице, недостаточно. См. детали ниже |
— |
Нет |
Приведенные ниже параметры используются, если в качестве протокола безопасности (SECURITY_PROTOCOL
) выбран SASL_PLAINTEXT
или SASL_SSL
, а в качестве механизма SASL (SASL_MECHANISM
) установлен PLAIN
, SCRAM-SHA-256
или SCRAM-SHA-512
.
Имя | Описание | Default | Обязательность |
---|---|---|---|
SASL_USER |
Имя пользователя для клиентских подключений |
— |
Да, если |
SASL_USER_PASSWORD |
Пароль пользователя |
— |
Да, если |
JAAS_CONFIG_FILE |
Путь к конфигурационному файлу JAAS. Используется, если опций JAAS, приведенных в текущей таблице, недостаточно. См. детали ниже |
— |
Нет |
Использование опции SERVER
Использование параметра SERVER=<server_name>
в запросе создания внешней таблицы позволяет считывать опции из предварительно созданного конфигурационного файла — без необходимости их явного указания в директиве LOCATION
при каждом вызове запроса. Для использования опции выполните шаги:
-
Добавьте новую директорию в $PXF_BASE/servers/ на хосте Master. Имя директории будет использоваться как имя сервера
<server_name>
на последующих шагах:$ sudo mkdir /var/lib/pxf/servers/<server_name>
ПРИМЕЧАНИЕДиректория, в которой расположены конфигурационные файлы PXF, определяется переменной окружения
$PXF_BASE
. Значение переменной по умолчанию — /var/lib/pxf. В общем случае путь может быть иным. -
Создайте конфигурационный файл kafka-site.xml в директории /var/lib/pxf/servers/<server_name>:
$ sudo vi /var/lib/pxf/servers/<server_name>/kafka-site.xml
Пример конфигурационного файла kafka-site.xml приведен ниже.
Пример kafka-site.xml<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value> </property> <property> <name>kafka.batch.size</name> <value>10</value> </property> <property> <name>kafka.topic.auto.create</name> <value>true</value> </property> <property> <name>avro.decimal.default.precision</name> <value>38</value> </property> <property> <name>avro.decimal.default.scale</name> <value>18</value> </property> </configuration>
-
Отредактируйте файл kafka-site.xml, указав в нем конфигурационные параметры для подключения к Kafka. Доступные параметры приведены ниже. Для каждого из них указана соответствуюшая опция из рассмотренного выше запроса создания внешней таблицы. Требования к обязательности и значения по умолчанию остаются без изменений.
Параметры kafka-site.xmlПараметр в kafka-site.xml Опция в секции запроса LOCATION kafka.bootstrap.servers
BOOTSTRAP_SERVERS
kafka.property.security.protocol
SECURITY_PROTOCOL
kafka.property.sasl.mechanism
SASL_MECHANISM
kafka.property.ssl.truststore.location
TRUSTSTORE_LOCATION
kafka.property.ssl.truststore.password
TRUSTSTORE_PASSWORD
kafka.property.ssl.keystore.location
KEYSTORE_LOCATION
kafka.property.ssl.keystore.password
KEYSTORE_PASSWORD
kafka.property.sasl.kerberos.service.name
KERBEROS_SERVICE_NAME
kafka.jaas.property.sasl.kerberos.principal
KERBEROS_PRINCIPAL
kafka.jaas.property.sasl.kerberos.keytab
KERBEROS_KEYTAB
kafka.jaas.property.sasl.kerberos.useKeyTab
KERBEROS_USE_KEYTAB
kafka.jaas.property.sasl.kerberos.storeKey
KERBEROS_STORE_KEY
kafka.jaas.property.sasl.kerberos.useTicketCache
KERBEROS_USE_TICKET_CACHE
kafka.jaas.property.sasl.kerberos.debug
KERBEROS_DEBUG
kafka.jaas.property.sasl.user
SASL_USER
kafka.jaas.property.sasl.user.password
SASL_USER_PASSWORD
kafka.jaas.property.config.file
JAAS_CONFIG_FILE
kafka.batch.size
BATCH_SIZE
kafka.admin.close.connection.timeout
KAFKA_ADMIN_CONNECTION_TIMEOUT
kafka.topic.auto.create
TOPIC_AUTO_CREATE_FLAG
avro.decimal.default.precision
AVRO_DEFAULT_DECIMAL_PRECISION
avro.decimal.default.scale
AVRO_DEFAULT_DECIMAL_SCALE
-
Подключитесь к Master-серверу ADB под пользователем
gpadmin
, который создается по умолчанию. Все команды, приведенные на шагах ниже, запускаются исключительно на хосте Master:$ sudo su - gpadmin
-
Для синхронизации конфигурационных файлов PXF на всех хостах кластера ADB выполните команду:
$ pxf cluster sync
Результат:
Syncing PXF configuration files from master host to standby master host and 2 segment hosts... PXF configs synced successfully on 3 out of 3 hosts
-
Перезапустите сервис PXF:
$ pxf cluster restart
Результат:
Restarting PXF on master host, standby master host, and 2 segment hosts... PXF restarted successfully on 4 out of 4 hosts
-
При создании внешней таблицы укажите в выражении
LOCATION
параметрSERVER=<server_name>
, не заполняя иные опции:CREATE WRITABLE EXTERNAL TABLE <table_name> ( { <column_name> <data_type> [, ...] | LIKE <other_table> } ) LOCATION ( 'pxf://<kafka_topic>?PROFILE=kafka&SERVER=<server_name>' ) FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
Использование кастомных опций
Наряду с опциями, перечислеными выше, при использовании ADB to Kafka Connector можно определить любой иной параметр, доступный для Kafka producer. Для определения кастомных опций существует два способа:
-
Указание опции в выражении
LOCATION
при создании внешней таблицы. Пример:CREATE WRITABLE EXTERNAL TABLE test1 (a INT, b TEXT) LOCATION ('pxf://test_topic?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&kafka.property.delivery.timeout.ms=131000') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
-
Добавление опции в файл kafka-site.xml для сервера, который впоследствии будет указан в опции SERVER при создании внешней таблицы. Пример:
<property> <name>kafka.property.delivery.timeout.ms</name> <value>131000</value> </property>
ВАЖНО
При указании кастомных опций необходимо добавлять префикс |
Кастомная настройка используемого механизма аутентификации SASL
Если Kafka-брокер использует метод аутентификации клиентов, отличный от приведенных в списке Опции Kafka (см. поле SASL_MECHANISM
), можно попытаться настроить плагин с кастомными настройками и JAAS-файлом (если требуется). Предположим, требуется использовать метод аутентификации SASL/OAUTHBEARER, отсутствующий в списке. Выполните следующие шаги для его настройки:
-
Убедитесь, что метод аутентификации поддерживается Kafka-брокером и у вас есть информация обо всех параметрах, необходимых для настройки Kafka-клиентов.
-
Присвойте параметру
SECURITY_PROTOCOL
значениеSASL_SSL
илиSASL_PLAINTEXT
в зависимости от настроек Kafka-брокера. -
Установите значение
SASL_MECHANISM
равнымOAUTHBEARER
. -
Заполните настройки, необходимые для аутентификации Kafka-клиентов.
-
Создайте конфигурационный файл JAAS (см. JAAS_CONFIG_FILE) и скопируйте его в одинаковую директорию на всех сегмент-хостах. Например, в директорию с файлом kafka-site.xml.
-
Присвойте параметру
JAAS_CONFIG_FILE
значение абсолютного пути к файлу, созданному на предыдущем шаге. -
Создайте таблицу при помощи следующего запроса:
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT) LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=OAUTHBEARER&JAAS_CONFIG_FILE=/var/lib/pxf/servers/kafka/oauth.config&kafka.property.sasl.oauthbearer.token.endpoint.url=http://localhost:8080/token&kafka.property.sasl.oauthbearer.scope.claim.name=kafka-pxf') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
В качестве альтернативного пути можно использовать файл kafka-site.xml. Для приведенного выше примера содержимое файла выглядит следующим образом:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value> </property> <property> <name>kafka.property.security.protocol</name> <value>SASL_SSL</value> </property> <property> <name>kafka.property.sasl.mechanism</name> <value>OAUTHBEARER</value> </property> <property> <name>kafka.jaas.property.config.file</name> <value>/var/lib/pxf/servers/kafka/oauth.config</value> </property> <property> <name>kafka.property.sasl.oauthbearer.token.endpoint.url</name> <value>http://localhost:8080/token</value> </property> <property> <name>kafka.property.sasl.oauthbearer.scope.claim.name</name> <value>kafka-pxf</value> </property> </configuration>
В случае сохранения конфигурационного файла в директории /var/lib/pxf/servers/kafka_server определение таблицы будет выглядеть следующим образом:
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT) LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&SERVER=kafka_server') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
Сопоставление типов данных Greenplum и AVRO
В таблице ниже описано, как преобразуются типы данных Greenplum в типы AVRO при использовании коннектора ADB to Kafka.
Тип данных Greenplum | Примитивный тип AVRO | Логический тип AVRO |
---|---|---|
BOOLEAN |
BOOLEAN |
— |
TEXT |
STRING |
— |
VARCHAR |
STRING |
— |
TIMESTAMP |
LONG |
timestamp-micros |
BIGINT |
LONG |
— |
TIME |
LONG |
time-micros |
NUMERIC |
DOUBLE |
— |
FLOAT8 |
DOUBLE |
— |
REAL |
FLOAT |
— |
SMALLINT |
INT |
— |
INTEGER |
INT |
— |
DATE |
INT |
date |