ADB to Kafka (kafka-connector)

kafka-connector - это коннектор к Arenadata DB, позволяющий производить транзакционную загрузку данных из ADB в кластер брокера сообщений Kafka.

Совместимость

Коннектор совместим с Apache Kafka версий 1.0.0 и выше

Установка с помощью ADCM

Для установки коннектора с помощью ADCM требуется инсталлировать сервис PXF на все сегментные ноды кластера, а также в списке сервисов выбрать сервис ADB to Kafka. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера.

Установка из rpm-пакетов

Установка из rpm-пакетов предполагает, что в кластере ADB установлен сервис PXF.

Для установки коннектора из rpm-пакетов необходимо:

  1. Установить пакет kafka-connector на всех хостах кластера ADB, где установлен сервис PXF;

  2. Добавить в файл /etc/pxf/conf/pxf-profiles-default.xml на каждом хосте кластера следующую секцию:

    <profile>
         <name>kafka</name>
         <description>A profile for export data into Apache Kafka</description>
         <plugins>
             <accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor>
             <resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver>
         </plugins>
         <optionMappings>
             <mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/>
             <mapping option="BATCH_SIZE" property="kafka.batch.size"/>
         </optionMappings>
     </profile>
    
  3. Перезапустить сервис PXF на всех хостах кластера.

PXF плагин

PXF плагин позволяет передавать данные из ADB в Kafka.

Установка

Для установки плагина, необходимо иметь:

  • Рабочий кластер ADB с установленным PXF.
  • Рабочий кластер ADS или

При необходимости установки/обновления пользовательского pxf коннектора

Плагине имеется папка pxf-kafka/build/libs``с jar-файлами (``pxf-kafka.jar, avro-1.9.2.jar, kafka-clients-2.5.0.jar). Их необходимо скопировать на каждый сегмент хост ADB и удалить старые:

cp pxf-kafka.jar /usr/lib/pxf/lib/
chown pxf:pxf /usr/lib/pxf/lib/pxf-kafka.jar

rm /usr/lib/pxf/lib/shared/avro-1.7.7.jar
cp avro-1.9.2.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/avro-1.9.2.jar

cp kafka-clients-2.5.0.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/kafka-clients-2.5.0.jar

Необходимо скопировать текст в тэгах <profiles>...</profiles> из файла pxf-kafka/env/pxf-profiles.xml и добавить его в файл /var/lib/pxf/conf/pxf-profiles.xml на каждом сегмент хосте ADB. Пример файла pxf-profiles.xml:

<?xml version="1.0" encoding="UTF-8"?>

<profiles>
    <profile>
        <name>kafka</name>
        <description>A profile for export data into Apache Kafka</description>
        <plugins>
            <accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor>
            <resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver>
        </plugins>
        <optionMappings>
            <mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/>
            <mapping option="BATCH_SIZE" property="kafka.batch.size"/>
            <mapping option="TOPIC_AUTO_CREATE_FLAG" property="kafka.topic.auto.create"/>
            <mapping option="AVRO_DEFAULT_DECIMAL_PRECISION" property="avro.decimal.default.precision" />
            <mapping option="AVRO_DEFAULT_DECIMAL_SCALE" property="avro.decimal.default.scale" />
        </optionMappings>
    </profile>
</profiles>

После чего необходимо перезапустить сервис pxf на каждом сегмент хосте:

systemctl restart pxf

Запись информации в топик Kafka

Коннектор PXF Kafka с профилем kafka поддерживает запись данных в Kafka. Когда вы создаете внешнюю таблицу для записи при помощи коннектора PXF Kafka, вы указываете имя топика kafka. Когда вы вводите данные в эту таблицу, запись об этих данных появляется в указанном топике.

Используйте следующий синтаксис для создания внешней таблицы с профилем kafka:

CREATE WRITABLE EXTERNAL TABLE <table_name>
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];

Ключевые параметры используемые в команде CREATE EXTERNAL TABLE описаны в таблице ниже:

Профиль kafka поддерживает следующие пользовательские настройки:

Настройка Описание значения Обязателен
BOOTSTRAP_SERVERS Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. Да
BATCH_SIZE Определяет сколько строк должно складываться в одно сообщение Avro. Нет
TOPIC_AUTO_CREATE_FLAG Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. Нет
AVRO_DEFAULT_DECIMAL_PRECISION Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. Нет
AVRO_DEFAULT_DECIMAL_SCALE Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. Нет

Пример: запись данных в топик Kafka

DROP EXTERNAL TABLE IF EXISTS kafka_tbl;

CREATE WRITABLE EXTERNAL TABLE kafka_tbl (a TEXT, b TEXT, c TEXT)
  LOCATION ('pxf://data_from_gp?PROFILE=kafka&BOOTSTRAP_SERVERS=10.92.8.43:9092,10.92.8.38:9092&BATCH_SIZE=10')
  FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

INSERT INTO kafka_tbl VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');

Запись данных в топик Kafka при помощи профиль сервера SERVER=<server_name>

Профиль сервера SERVER=<server_name> позволяет вам использовать конфигурационный файл для параметров, вместо использования их в секции LOCATION внешней таблицы. Чтобы настроить профиль сервера, необходимо:

  1. Создать папку в разделе /var/lib/pxf/servers/ с подходящим названием. Например, следующая команда создаст папку для Kafka в среде разработки:

    mkdir /var/lib/pxf/servers/kafka_dev
    
  2. Скопировать шаблон конфигурационного файла из pxf-kafka/env/kafka-site.xml в папку /var/lib/pxf/servers/kafka_dev. Пример конфигурационного файла kafka-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>kafka.bootstrap.servers</name>
            <value>10.92.8.38:9092</value>
        </property>
        <property>
            <name>kafka.batch.size</name>
            <value>1</value>
        </property>
        <property>
            <name>kafka.topic.auto.create</name>
            <value>true</value>
        </property>
        <property>
            <name>avro.decimal.default.precision</name>
            <value>38</value>
        </property>
        <property>
            <name>avro.decimal.default.scale</name>
            <value>18</value>
        </property>
    </configuration>
    
  3. Отредактировать файл kafka-site.xml и скопировать его на каждый сегмент хост в ту же папку. Все доступные параметры перечислены в таблице ниже (как можно заметить, параметры повторяют параметры профиля):

Настройка Описание значения Обязателен
kafka.bootstrap.servers Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. Да
kafka.batch.size Определяет сколько строк должно складываться в одно сообщение Avro. Нет
kafka.topic.auto.create Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. Нет
avro.decimal.default.precision Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. Нет
avro.decimal.default.scale Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. Нет

Пример: Запись данных в топик Kafka при помощи профиля сервера SERVER=<server_name>

DROP EXTERNAL TABLE IF EXISTS kafka_tbl_dev;

CREATE WRITABLE EXTERNAL TABLE kafka_tbl_dev (a TEXT, b TEXT, c TEXT)
  LOCATION ('pxf://data_from_gp?ACCESSOR=org.greenplum.pxf.plugins.kafka.KafkaAccessor&RESOLVER=org.greenplum.pxf.plugins.kafka.KafkaResolver&kafka.bootstrap.servers=10.92.8.38:9092')
  FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');

INSERT INTO kafka_tbl_dev VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');

Преобразование типов GPDB → AVRO

Тип PXF Примитивный тип AVRO Логический тип AVRO
BOOLEAN BOOLEAN  
TEXT STRING  
VARCHAR STRING  
TIMESTAMP LONG timestamp-micros
BIGINT LONG  
TIME LONG time-micros
NUMERIC DOUBLE  
FLOAT8 DOUBLE  
REAL FLOAT  
SMALLINT INT  
INTEGER INT  
DATE INT date