ADB to Kafka (kafka-connector)¶
kafka-connector - это коннектор к Arenadata DB, позволяющий производить транзакционную загрузку данных из ADB в кластер брокера сообщений Kafka.
Совместимость¶
Коннектор совместим с Apache Kafka версий 1.0.0 и выше
Установка с помощью ADCM¶
Для установки коннектора с помощью ADCM требуется инсталлировать сервис PXF на все сегментные ноды кластера, а также в списке сервисов выбрать сервис ADB to Kafka. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера.
Установка из rpm-пакетов¶
Установка из rpm-пакетов предполагает, что в кластере ADB установлен сервис PXF.
Для установки коннектора из rpm-пакетов необходимо:
- Установить пакет kafka-connector на всех хостах кластера ADB, где установлен сервис PXF; 
- Добавить в файл /etc/pxf/conf/pxf-profiles-default.xml на каждом хосте кластера следующую секцию: - <profile> <name>kafka</name> <description>A profile for export data into Apache Kafka</description> <plugins> <accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor> <resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver> </plugins> <optionMappings> <mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/> <mapping option="BATCH_SIZE" property="kafka.batch.size"/> </optionMappings> </profile> 
- Перезапустить сервис PXF на всех хостах кластера. 
PXF плагин¶
PXF плагин позволяет передавать данные из ADB в Kafka.
Установка¶
Для установки плагина, необходимо иметь:
- Рабочий кластер ADB с установленным PXF.
- Рабочий кластер ADS или
При необходимости установки/обновления пользовательского pxf коннектора¶
Плагине имеется папка pxf-kafka/build/libs``с jar-файлами (``pxf-kafka.jar, avro-1.9.2.jar, kafka-clients-2.5.0.jar). Их необходимо скопировать на каждый сегмент хост ADB и удалить старые:
cp pxf-kafka.jar /usr/lib/pxf/lib/
chown pxf:pxf /usr/lib/pxf/lib/pxf-kafka.jar
rm /usr/lib/pxf/lib/shared/avro-1.7.7.jar
cp avro-1.9.2.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/avro-1.9.2.jar
cp kafka-clients-2.5.0.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/kafka-clients-2.5.0.jar
Необходимо скопировать текст в тэгах <profiles>...</profiles> из файла pxf-kafka/env/pxf-profiles.xml и добавить его в файл /var/lib/pxf/conf/pxf-profiles.xml на каждом сегмент хосте ADB. Пример файла pxf-profiles.xml:
<?xml version="1.0" encoding="UTF-8"?>
<profiles>
    <profile>
        <name>kafka</name>
        <description>A profile for export data into Apache Kafka</description>
        <plugins>
            <accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor>
            <resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver>
        </plugins>
        <optionMappings>
            <mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/>
            <mapping option="BATCH_SIZE" property="kafka.batch.size"/>
            <mapping option="TOPIC_AUTO_CREATE_FLAG" property="kafka.topic.auto.create"/>
            <mapping option="AVRO_DEFAULT_DECIMAL_PRECISION" property="avro.decimal.default.precision" />
            <mapping option="AVRO_DEFAULT_DECIMAL_SCALE" property="avro.decimal.default.scale" />
        </optionMappings>
    </profile>
</profiles>
После чего необходимо перезапустить сервис pxf на каждом сегмент хосте:
systemctl restart pxf
Запись информации в топик Kafka¶
Коннектор PXF Kafka с профилем kafka поддерживает запись данных в Kafka. Когда вы создаете внешнюю таблицу для записи при помощи коннектора PXF Kafka, вы указываете имя топика kafka. Когда вы вводите данные в эту таблицу, запись об этих данных появляется в указанном топике.
Используйте следующий синтаксис для создания внешней таблицы с профилем kafka:
CREATE WRITABLE EXTERNAL TABLE <table_name>
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
Ключевые параметры используемые в команде CREATE EXTERNAL TABLE описаны в таблице ниже:
Профиль kafka поддерживает следующие пользовательские настройки:
| Настройка | Описание значения | Обязателен | 
|---|---|---|
| BOOTSTRAP_SERVERS | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да | 
| BATCH_SIZE | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет | 
| TOPIC_AUTO_CREATE_FLAG | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет | 
| AVRO_DEFAULT_DECIMAL_PRECISION | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет | 
| AVRO_DEFAULT_DECIMAL_SCALE | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет | 
Пример: запись данных в топик Kafka¶
DROP EXTERNAL TABLE IF EXISTS kafka_tbl;
CREATE WRITABLE EXTERNAL TABLE kafka_tbl (a TEXT, b TEXT, c TEXT)
  LOCATION ('pxf://data_from_gp?PROFILE=kafka&BOOTSTRAP_SERVERS=10.92.8.43:9092,10.92.8.38:9092&BATCH_SIZE=10')
  FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO kafka_tbl VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');
Запись данных в топик Kafka при помощи профиль сервера SERVER=<server_name>¶
Профиль сервера SERVER=<server_name> позволяет вам использовать конфигурационный файл для параметров, вместо использования их в секции LOCATION внешней таблицы. Чтобы настроить профиль сервера, необходимо:
- Создать папку в разделе - /var/lib/pxf/servers/с подходящим названием. Например, следующая команда создаст папку для Kafka в среде разработки:- mkdir /var/lib/pxf/servers/kafka_dev 
- Скопировать шаблон конфигурационного файла из - pxf-kafka/env/kafka-site.xmlв папку- /var/lib/pxf/servers/kafka_dev. Пример конфигурационного файла- kafka-site.xml:- <?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>10.92.8.38:9092</value> </property> <property> <name>kafka.batch.size</name> <value>1</value> </property> <property> <name>kafka.topic.auto.create</name> <value>true</value> </property> <property> <name>avro.decimal.default.precision</name> <value>38</value> </property> <property> <name>avro.decimal.default.scale</name> <value>18</value> </property> </configuration>
- Отредактировать файл - kafka-site.xmlи скопировать его на каждый сегмент хост в ту же папку. Все доступные параметры перечислены в таблице ниже (как можно заметить, параметры повторяют параметры профиля):
| Настройка | Описание значения | Обязателен | 
|---|---|---|
| kafka.bootstrap.servers | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да | 
| kafka.batch.size | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет | 
| kafka.topic.auto.create | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет | 
| avro.decimal.default.precision | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет | 
| avro.decimal.default.scale | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет | 
Пример: Запись данных в топик Kafka при помощи профиля сервера SERVER=<server_name>¶
DROP EXTERNAL TABLE IF EXISTS kafka_tbl_dev;
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_dev (a TEXT, b TEXT, c TEXT)
  LOCATION ('pxf://data_from_gp?ACCESSOR=org.greenplum.pxf.plugins.kafka.KafkaAccessor&RESOLVER=org.greenplum.pxf.plugins.kafka.KafkaResolver&kafka.bootstrap.servers=10.92.8.38:9092')
  FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO kafka_tbl_dev VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');
Преобразование типов GPDB → AVRO¶
| Тип PXF | Примитивный тип AVRO | Логический тип AVRO | 
|---|---|---|
| BOOLEAN | BOOLEAN | |
| TEXT | STRING | |
| VARCHAR | STRING | |
| TIMESTAMP | LONG | timestamp-micros | 
| BIGINT | LONG | |
| TIME | LONG | time-micros | 
| NUMERIC | DOUBLE | |
| FLOAT8 | DOUBLE | |
| REAL | FLOAT | |
| SMALLINT | INT | |
| INTEGER | INT | |
| DATE | INT | date |