Настройка ADB to Kafka Connector

Для отправки данных из ADB в Kafka через ADB to Kafka Connector необходимо предварительно создать внешнюю таблицу writable external table на стороне кластера ADB, указав в директиве LOCATION протокол PXF с профилем kafka и опции соединения.

Примеры отправки данных из ADB в ADS с использованием описанных ниже настроек приведены в статье Примеры использования ADB to Kafka Connector.

Создание внешней таблицы

Чтобы создать внешнюю таблицу для записи данных, необходимо использовать команду CREATE WRITABLE EXTERNAL TABLE, базовый синтаксис которой приведен ниже:

CREATE WRITABLE EXTERNAL TABLE <table_name> (
    { <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
    'pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<option>=<value>[...]]'
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

где:

  • <table_name> — имя внешней таблицы в ADB.

  • <column_name> — имя столбца.

  • <data_type> — тип данных столбца.

  • <other_table> — исходная таблица, из которой в новую внешнюю таблицу будут скопированы имена столбцов, их типы данных и политика распределения данных. Обратите внимание, что ограничения и значения по умолчанию, указанные в исходной таблице, не копируются, так как они не поддерживаются во внешних таблицах.

  • <kafka_topic> — название топика в Kafka.

  • <server_name> — имя сервера в директории $PXF_BASE/servers/ (по умолчанию /var/lib/pxf/servers/). Более подробную информацию можно получить в разделе Использование опции SERVER.

  • <option> — параметры, определяющие детали подключения к внешнему источнику данных. Список возможных параметров для коннектора ADB to Kafka приведены ниже в таблицах Основные опции, Опции Kafka, Опции JAAS при использовании SASL-механизма GSSAPI (Kerberos), Опции JAAS при использовании SASL-механизма PLAIN.

  • <value> — значения соответствующих параметров <option>.

ПРИМЕЧАНИЕ
  • Для оптимальной производительности политика распределения данных во внешней таблице должна быть указана аналогично таблице-источнику, из которой будут браться данные при передаче в Kafka. Это позволит не перераспределять данные перед отправкой, а отправлять их напрямую с сегментов ADB. Для этого при создании внешней таблицы рекомендуется использовать выражение LIKE или явно указывать аналогичный ключ распределения в выражении DISTRIBUTED BY.

  • Полную версию синтаксиса команды CREATE EXTERNAL TABLE можно посмотреть в документации Greenplum.

  • Для редактирования параметров внешней таблицы предназначена команда ALTER EXTERNAL TABLE, для удаления — DROP EXTERNAL TABLE.

Основные опции
Имя Описание Default Обязательность

KAFKA_ADMIN_CONNECTION_TIMEOUT

Период времени, в течение которого допускается завершение текущих операций в случае закрытия соединения с Kafka (в миллисекундах). По истечении указанного тайм-аута все незавершенные операции будут прерваны с генерацией исключения org.apache.kafka.common.errors.TimeoutException

30000

Нет

BATCH_SIZE

Число строк, которое должно помещаться в одно сообщение Avro

1

Нет

TOPIC_AUTO_CREATE_FLAG

Флаг, указываюший на необходимость автоматического создания топика в Kafka. Топик будет создан с одной партицией и фактором репликации = 1

true

Нет

AVRO_DEFAULT_DECIMAL_PRECISION

Максимальное количество знаков в числах (за исключением десятичного разделителя).

Для ввода допускаются целые положительные числа

38

Нет

AVRO_DEFAULT_DECIMAL_SCALE

Максимальное количество знаков после десятичного разделителя в числах.

Для ввода допускаются целые неотрицательные числа, не превышающие значение параметра AVRO_DEFAULT_DECIMAL_PRECISION

18

Нет

Опции Kafka
Имя Описание Default Обязательность

BOOTSTRAP_SERVERS

Разделенный запятыми список адресов брокеров Kafka, каждый из которых указан в формате <имя хоста>:<порт> или <IP-адрес>:<порт>

 — 

Да

SECURITY_PROTOCOL

Протокол, используемый для соединения с брокерами Kafka. Возможные значения:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL (односторонняя и двусторонняя аутентификация)

PLAINTEXT

Нет

SASL_MECHANISM

SASL-механизм, используемый для клиентских соединений. Возможные значения:

  • PLAIN

  • GSSAPI

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Если в списке отсутствует требуемый механизм аутентификации, можно использовать кастомный способ настройки параметров Kafka producer и JAAS-файла

 — 

Да, если SECURITY_PROTOCOL=SASL_PLAINTEXT|SASL_SSL

TRUSTSTORE_LOCATION

Путь к truststore-файлу

 — 

Да, если SECURITY_PROTOCOL = SASL_SSL

TRUSTSTORE_PASSWORD

Пароль для доступа к truststore-файлу

 — 

Да, если SECURITY_PROTOCOL = SASL_SSL

KEYSTORE_LOCATION

Путь к keystore-файлу

 — 

Да, если SECURITY_PROTOCOL = SASL_SSL|SSL (двусторонняя аутентификация)

KEYSTORE_PASSWORD

Пароль для доступа к keystore-файлу

 — 

Да, если SECURITY_PROTOCOL = SASL_SSL|SSL (двусторонняя аутентификация)

KERBEROS_SERVICE_NAME

Имя Kerberos-принципала, используемого Kafka. Может быть указано в конфигурационном файле Kafka либо в JAAS-файле

 — 

Да, если SASL_MECHANISM = GSSAPI

Опции JAAS при использовании SASL-механизма GSSAPI (Kerberos)

 

Приведенные ниже параметры используются, если в качестве протокола безопасности (SECURITY_PROTOCOL) выбран SASL_PLAINTEXT или SASL_SSL, а в качестве механизма SASL (SASL_MECHANISM) установлен GSSAPI. Каждая из опций может зависеть от ряда других — поэтому все они отмечены как опциональные.

ПРИМЕЧАНИЕ

Дополнительную информацию об использовании JAAS можно получить в разделе JAAS Configuration документации Kafka.

Имя Описание Default Обязательность

KERBEROS_PRINCIPAL

Имя принципала Kerberos, которое может быть как именем пользователя вида "testuser", так и названием сервиса, например "host/testhost.eng.sun.com"

 — 

Нет

KERBEROS_KEYTAB

Имя keytab-файла для получения секретного ключа принципала

 — 

Нет

KERBEROS_USE_KEYTAB

Флаг, указывающий на необходимость получения секретного ключа принципала из keytab

true

Нет

KERBEROS_STORE_KEY

Флаг, указывающий на необходимость хранения keytab или ключа принципала в Subject private credentials

true

Нет

KERBEROS_USE_TICKET_CACHE

Флаг, указывающий на необходимость получения Ticket-Granting Ticket (TGT) из кеша тикетов

false

Нет

KERBEROS_DEBUG

Флаг, указывающий на необходимость вывода отладочных сообщений

false

Нет

JAAS_CONFIG_FILE

Путь к конфигурационному файлу JAAS. Используется, если опций JAAS, приведенных в текущей таблице, недостаточно. См. детали ниже

 — 

Нет

Опции JAAS при использовании SASL-механизма PLAIN, SCRAM-SHA-256 или SCRAM-SHA-512

 

Приведенные ниже параметры используются, если в качестве протокола безопасности (SECURITY_PROTOCOL) выбран SASL_PLAINTEXT или SASL_SSL, а в качестве механизма SASL (SASL_MECHANISM) установлен PLAIN, SCRAM-SHA-256 или SCRAM-SHA-512.

Имя Описание Default Обязательность

SASL_USER

Имя пользователя для клиентских подключений

 — 

Да, если SASL_MECHANISM=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512

SASL_USER_PASSWORD

Пароль пользователя SASL_USER

 — 

Да, если SASL_MECHANISM=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512

JAAS_CONFIG_FILE

Путь к конфигурационному файлу JAAS. Используется, если опций JAAS, приведенных в текущей таблице, недостаточно. См. детали ниже

 — 

Нет

JAAS_CONFIG_FILE

 

ADB to Kafka Connector использует Java Authentication and Authorization Service (JAAS) при подключении к Kafka с помощью протоколов SASL_PLAINTEXT и SASL_SSL. Для конфигурирования JAAS используется специальный параметр sasl.jaas.config. Для его установки в коннекторе доступны два способа:

JAAS_CONFIG_FILE рекомендуется использовать, если для конфигурирования JAAS требуются дополнительные опции, отсутствующие в приведенных выше таблицах. Файл, путь к которому указывается в JAAS_CONFIG_FILE, должен содержать одну строку. Примеры файлов приведены ниже.

Пример файла для SASL_MECHANISM = PLAIN
org.apache.kafka.common.security.plain.PlainLoginModule required username\="kafka_user" password\="kafka_password";
Пример файла для SASL_MECHANISM = GSSAPI
com.sun.security.auth.module.Krb5LoginModule required principal="kafka_user@COMPANY.INTERNAL" keyTab="/var/lib/pxf/conf/ssl/kafka_user.keytab" useKeyTab="true" storeKey="true" useTicketCache="false" debug="false";
Пример файла для SASL_MECHANISM = SCRAM-SHA-256|SCRAM-SHA-512
org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka_user" password="kafka_password";

Перед запуском запросов к внешней таблице файлы должны быть скопированы на каждый сегмент-хост ADB.

Использование опции SERVER

Использование параметра SERVER=<server_name> в запросе создания внешней таблицы позволяет считывать опции из предварительно созданного конфигурационного файла — без необходимости их явного указания в директиве LOCATION при каждом вызове запроса. Для использования опции выполните шаги:

  1. Добавьте новую директорию в $PXF_BASE/servers/ на хосте Master. Имя директории будет использоваться как имя сервера <server_name> на последующих шагах:

    $ sudo mkdir /var/lib/pxf/servers/<server_name>
    ПРИМЕЧАНИЕ

    Директория, в которой расположены конфигурационные файлы PXF, определяется переменной окружения $PXF_BASE. Значение переменной по умолчанию — /var/lib/pxf. В общем случае путь может быть иным.

  2. Создайте конфигурационный файл kafka-site.xml в директории /var/lib/pxf/servers/<server_name>:

    $ sudo vi /var/lib/pxf/servers/<server_name>/kafka-site.xml

    Пример конфигурационного файла kafka-site.xml приведен ниже.

    Пример kafka-site.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>kafka.bootstrap.servers</name>
            <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value>
        </property>
        <property>
            <name>kafka.batch.size</name>
            <value>10</value>
        </property>
        <property>
            <name>kafka.topic.auto.create</name>
            <value>true</value>
        </property>
        <property>
            <name>avro.decimal.default.precision</name>
            <value>38</value>
        </property>
        <property>
            <name>avro.decimal.default.scale</name>
            <value>18</value>
        </property>
    </configuration>
  3. Отредактируйте файл kafka-site.xml, указав в нем конфигурационные параметры для подключения к Kafka. Доступные параметры приведены ниже. Для каждого из них указана соответствуюшая опция из рассмотренного выше запроса создания внешней таблицы. Требования к обязательности и значения по умолчанию остаются без изменений.

    Параметры kafka-site.xml
    Параметр в kafka-site.xml Опция в секции запроса LOCATION

    kafka.bootstrap.servers

    BOOTSTRAP_SERVERS

    kafka.property.security.protocol

    SECURITY_PROTOCOL

    kafka.property.sasl.mechanism

    SASL_MECHANISM

    kafka.property.ssl.truststore.location

    TRUSTSTORE_LOCATION

    kafka.property.ssl.truststore.password

    TRUSTSTORE_PASSWORD

    kafka.property.ssl.keystore.location

    KEYSTORE_LOCATION

    kafka.property.ssl.keystore.password

    KEYSTORE_PASSWORD

    kafka.property.sasl.kerberos.service.name

    KERBEROS_SERVICE_NAME

    kafka.jaas.property.sasl.kerberos.principal

    KERBEROS_PRINCIPAL

    kafka.jaas.property.sasl.kerberos.keytab

    KERBEROS_KEYTAB

    kafka.jaas.property.sasl.kerberos.useKeyTab

    KERBEROS_USE_KEYTAB

    kafka.jaas.property.sasl.kerberos.storeKey

    KERBEROS_STORE_KEY

    kafka.jaas.property.sasl.kerberos.useTicketCache

    KERBEROS_USE_TICKET_CACHE

    kafka.jaas.property.sasl.kerberos.debug

    KERBEROS_DEBUG

    kafka.jaas.property.sasl.user

    SASL_USER

    kafka.jaas.property.sasl.user.password

    SASL_USER_PASSWORD

    kafka.jaas.property.config.file

    JAAS_CONFIG_FILE

    kafka.batch.size

    BATCH_SIZE

    kafka.admin.close.connection.timeout

    KAFKA_ADMIN_CONNECTION_TIMEOUT

    kafka.topic.auto.create

    TOPIC_AUTO_CREATE_FLAG

    avro.decimal.default.precision

    AVRO_DEFAULT_DECIMAL_PRECISION

    avro.decimal.default.scale

    AVRO_DEFAULT_DECIMAL_SCALE

  4. Подключитесь к Master-серверу ADB под пользователем gpadmin, который создается по умолчанию. Все команды, приведенные на шагах ниже, запускаются исключительно на хосте Master:

    $ sudo su - gpadmin
  5. Для синхронизации конфигурационных файлов PXF на всех хостах кластера ADB выполните команду:

    $ pxf cluster sync

    Результат:

    Syncing PXF configuration files from master host to standby master host and 2 segment hosts...
    PXF configs synced successfully on 3 out of 3 hosts
  6. Перезапустите сервис PXF:

    $ pxf cluster restart

    Результат:

    Restarting PXF on master host, standby master host, and 2 segment hosts...
    PXF restarted successfully on 4 out of 4 hosts
  7. При создании внешней таблицы укажите в выражении LOCATION параметр SERVER=<server_name>, не заполняя иные опции:

    CREATE WRITABLE EXTERNAL TABLE <table_name> (
        { <column_name> <data_type> [, ...] | LIKE <other_table> }
    )
    LOCATION (
        'pxf://<kafka_topic>?PROFILE=kafka&SERVER=<server_name>'
    )
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

Использование кастомных опций

Наряду с опциями, перечислеными выше, при использовании ADB to Kafka Connector можно определить любой иной параметр, доступный для Kafka producer. Для определения кастомных опций существует два способа:

  • Указание опции в выражении LOCATION при создании внешней таблицы. Пример:

    CREATE WRITABLE EXTERNAL TABLE test1 (a INT, b TEXT)
      LOCATION ('pxf://test_topic?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&kafka.property.delivery.timeout.ms=131000')
      FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  • Добавление опции в файл kafka-site.xml для сервера, который впоследствии будет указан в опции SERVER при создании внешней таблицы. Пример:

    <property>
        <name>kafka.property.delivery.timeout.ms</name>
        <value>131000</value>
    </property>
ВАЖНО

При указании кастомных опций необходимо добавлять префикс kafka.property. к именам свойств из документации Kafka. Поэтому в примерах выше для параметра Kafka delivery.timeout.ms использовано имя kafka.property.delivery.timeout.ms.

Кастомная настройка используемого механизма аутентификации SASL

Если Kafka-брокер использует метод аутентификации клиентов, отличный от приведенных в списке Опции Kafka (см. поле SASL_MECHANISM), можно попытаться настроить плагин с кастомными настройками и JAAS-файлом (если требуется). Предположим, требуется использовать метод аутентификации SASL/OAUTHBEARER, отсутствующий в списке. Выполните следующие шаги для его настройки:

  1. Убедитесь, что метод аутентификации поддерживается Kafka-брокером и у вас есть информация обо всех параметрах, необходимых для настройки Kafka-клиентов.

  2. Присвойте параметру SECURITY_PROTOCOL значение SASL_SSL или SASL_PLAINTEXT в зависимости от настроек Kafka-брокера.

  3. Установите значение SASL_MECHANISM равным OAUTHBEARER.

  4. Заполните настройки, необходимые для аутентификации Kafka-клиентов.

  5. Создайте конфигурационный файл JAAS (см. JAAS_CONFIG_FILE) и скопируйте его в одинаковую директорию на всех сегмент-хостах. Например, в директорию с файлом kafka-site.xml.

  6. Присвойте параметру JAAS_CONFIG_FILE значение абсолютного пути к файлу, созданному на предыдущем шаге.

  7. Создайте таблицу при помощи следующего запроса:

    CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT)
    LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&BOOTSTRAP_SERVERS=bds-ads1:9092,bds-ads2:9092,bds-ads3:9092&SECURITY_PROTOCOL=SASL_SSL&SASL_MECHANISM=OAUTHBEARER&JAAS_CONFIG_FILE=/var/lib/pxf/servers/kafka/oauth.config&kafka.property.sasl.oauthbearer.token.endpoint.url=http://localhost:8080/token&kafka.property.sasl.oauthbearer.scope.claim.name=kafka-pxf')
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

    В качестве альтернативного пути можно использовать файл kafka-site.xml. Для приведенного выше примера содержимое файла выглядит следующим образом:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>kafka.bootstrap.servers</name>
            <value>bds-ads1:9092,bds-ads2:9092,bds-ads3:9092</value>
        </property>
        <property>
            <name>kafka.property.security.protocol</name>
            <value>SASL_SSL</value>
        </property>
        <property>
            <name>kafka.property.sasl.mechanism</name>
            <value>OAUTHBEARER</value>
        </property>
        <property>
            <name>kafka.jaas.property.config.file</name>
            <value>/var/lib/pxf/servers/kafka/oauth.config</value>
        </property>
        <property>
            <name>kafka.property.sasl.oauthbearer.token.endpoint.url</name>
            <value>http://localhost:8080/token</value>
        </property>
        <property>
            <name>kafka.property.sasl.oauthbearer.scope.claim.name</name>
            <value>kafka-pxf</value>
        </property>
    </configuration>

    В случае сохранения конфигурационного файла в директории /var/lib/pxf/servers/kafka_server определение таблицы будет выглядеть следующим образом:

    CREATE WRITABLE EXTERNAL TABLE kafka_tbl_oauthbearer (a INT, b TEXT, c TEXT)
    LOCATION ('pxf://pxf-sasl-oauthbearer?PROFILE=kafka&SERVER=kafka_server')
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

Сопоставление типов данных Greenplum и AVRO

В таблице ниже описано, как преобразуются типы данных Greenplum в типы AVRO при использовании коннектора ADB to Kafka.

Тип данных Greenplum Примитивный тип AVRO Логический тип AVRO

BOOLEAN

BOOLEAN

 — 

TEXT

STRING

 — 

VARCHAR

STRING

 — 

TIMESTAMP

LONG

timestamp-micros

BIGINT

LONG

 — 

TIME

LONG

time-micros

NUMERIC

DOUBLE

 — 

FLOAT8

DOUBLE

 — 

REAL

FLOAT

 — 

SMALLINT

INT

 — 

INTEGER

INT

 — 

DATE

INT

date

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней