ADB to Kafka (kafka-connector)¶
kafka-connector - это коннектор к Arenadata DB, позволяющий производить транзакционную загрузку данных из ADB в кластер брокера сообщений Kafka.
Совместимость¶
Коннектор совместим с Apache Kafka версий 1.0.0 и выше
Установка с помощью ADCM¶
Для установки коннектора с помощью ADCM требуется инсталлировать сервис PXF на все сегментные ноды кластера, а также в списке сервисов выбрать сервис ADB to Kafka. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера.
Установка из rpm-пакетов¶
Установка из rpm-пакетов предполагает, что в кластере ADB установлен сервис PXF.
Для установки коннектора из rpm-пакетов необходимо:
Установить пакет kafka-connector на всех хостах кластера ADB, где установлен сервис PXF;
Добавить в файл /etc/pxf/conf/pxf-profiles-default.xml на каждом хосте кластера следующую секцию:
<profile> <name>kafka</name> <description>A profile for export data into Apache Kafka</description> <plugins> <accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor> <resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver> </plugins> <optionMappings> <mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/> <mapping option="BATCH_SIZE" property="kafka.batch.size"/> </optionMappings> </profile>
Перезапустить сервис PXF на всех хостах кластера.
PXF плагин¶
PXF плагин позволяет передавать данные из ADB в Kafka.
Установка¶
Для установки плагина, необходимо иметь:
- Рабочий кластер ADB с установленным PXF.
- Рабочий кластер ADS или
При необходимости установки/обновления пользовательского pxf коннектора¶
Плагине имеется папка pxf-kafka/build/libs``с jar-файлами (``pxf-kafka.jar
, avro-1.9.2.jar
, kafka-clients-2.5.0.jar
). Их необходимо скопировать на каждый сегмент хост ADB и удалить старые:
cp pxf-kafka.jar /usr/lib/pxf/lib/
chown pxf:pxf /usr/lib/pxf/lib/pxf-kafka.jar
rm /usr/lib/pxf/lib/shared/avro-1.7.7.jar
cp avro-1.9.2.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/avro-1.9.2.jar
cp kafka-clients-2.5.0.jar /usr/lib/pxf/lib/shared/
chown pxf:pxf /usr/lib/pxf/lib/shared/kafka-clients-2.5.0.jar
Необходимо скопировать текст в тэгах <profiles>...</profiles>
из файла pxf-kafka/env/pxf-profiles.xml
и добавить его в файл /var/lib/pxf/conf/pxf-profiles.xml
на каждом сегмент хосте ADB. Пример файла pxf-profiles.xml:
<?xml version="1.0" encoding="UTF-8"?>
<profiles>
<profile>
<name>kafka</name>
<description>A profile for export data into Apache Kafka</description>
<plugins>
<accessor>org.greenplum.pxf.plugins.kafka.KafkaAccessor</accessor>
<resolver>org.greenplum.pxf.plugins.kafka.KafkaResolver</resolver>
</plugins>
<optionMappings>
<mapping option="BOOTSTRAP_SERVERS" property="kafka.bootstrap.servers"/>
<mapping option="BATCH_SIZE" property="kafka.batch.size"/>
<mapping option="TOPIC_AUTO_CREATE_FLAG" property="kafka.topic.auto.create"/>
<mapping option="AVRO_DEFAULT_DECIMAL_PRECISION" property="avro.decimal.default.precision" />
<mapping option="AVRO_DEFAULT_DECIMAL_SCALE" property="avro.decimal.default.scale" />
</optionMappings>
</profile>
</profiles>
После чего необходимо перезапустить сервис pxf на каждом сегмент хосте:
systemctl restart pxf
Запись информации в топик Kafka¶
Коннектор PXF Kafka с профилем kafka
поддерживает запись данных в Kafka. Когда вы создаете внешнюю таблицу для записи при помощи коннектора PXF Kafka, вы указываете имя топика kafka. Когда вы вводите данные в эту таблицу, запись об этих данных появляется в указанном топике.
Используйте следующий синтаксис для создания внешней таблицы с профилем kafka
:
CREATE WRITABLE EXTERNAL TABLE <table_name>
( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<kafka_topic>?PROFILE=kafka[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
Ключевые параметры используемые в команде CREATE EXTERNAL TABLE описаны в таблице ниже:
Профиль kafka
поддерживает следующие пользовательские настройки:
Настройка | Описание значения | Обязателен |
---|---|---|
BOOTSTRAP_SERVERS | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да |
BATCH_SIZE | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет |
TOPIC_AUTO_CREATE_FLAG | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет |
AVRO_DEFAULT_DECIMAL_PRECISION | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет |
AVRO_DEFAULT_DECIMAL_SCALE | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет |
Пример: запись данных в топик Kafka¶
DROP EXTERNAL TABLE IF EXISTS kafka_tbl;
CREATE WRITABLE EXTERNAL TABLE kafka_tbl (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://data_from_gp?PROFILE=kafka&BOOTSTRAP_SERVERS=10.92.8.43:9092,10.92.8.38:9092&BATCH_SIZE=10')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO kafka_tbl VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');
Запись данных в топик Kafka при помощи профиль сервера SERVER=<server_name>
¶
Профиль сервера SERVER=<server_name>
позволяет вам использовать конфигурационный файл для параметров, вместо использования их в секции LOCATION
внешней таблицы. Чтобы настроить профиль сервера, необходимо:
Создать папку в разделе
/var/lib/pxf/servers/
с подходящим названием. Например, следующая команда создаст папку для Kafka в среде разработки:mkdir /var/lib/pxf/servers/kafka_dev
Скопировать шаблон конфигурационного файла из
pxf-kafka/env/kafka-site.xml
в папку/var/lib/pxf/servers/kafka_dev
. Пример конфигурационного файлаkafka-site.xml
:<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>kafka.bootstrap.servers</name> <value>10.92.8.38:9092</value> </property> <property> <name>kafka.batch.size</name> <value>1</value> </property> <property> <name>kafka.topic.auto.create</name> <value>true</value> </property> <property> <name>avro.decimal.default.precision</name> <value>38</value> </property> <property> <name>avro.decimal.default.scale</name> <value>18</value> </property> </configuration>
Отредактировать файл
kafka-site.xml
и скопировать его на каждый сегмент хост в ту же папку. Все доступные параметры перечислены в таблице ниже (как можно заметить, параметры повторяют параметры профиля):
Настройка | Описание значения | Обязателен |
---|---|---|
kafka.bootstrap.servers | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да |
kafka.batch.size | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет |
kafka.topic.auto.create | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет |
avro.decimal.default.precision | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет |
avro.decimal.default.scale | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет |
Пример: Запись данных в топик Kafka при помощи профиля сервера SERVER=<server_name>
¶
DROP EXTERNAL TABLE IF EXISTS kafka_tbl_dev;
CREATE WRITABLE EXTERNAL TABLE kafka_tbl_dev (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://data_from_gp?ACCESSOR=org.greenplum.pxf.plugins.kafka.KafkaAccessor&RESOLVER=org.greenplum.pxf.plugins.kafka.KafkaResolver&kafka.bootstrap.servers=10.92.8.38:9092')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO kafka_tbl_dev VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z');
Преобразование типов GPDB → AVRO¶
Тип PXF | Примитивный тип AVRO | Логический тип AVRO |
---|---|---|
BOOLEAN | BOOLEAN | |
TEXT | STRING | |
VARCHAR | STRING | |
TIMESTAMP | LONG | timestamp-micros |
BIGINT | LONG | |
TIME | LONG | time-micros |
NUMERIC | DOUBLE | |
FLOAT8 | DOUBLE | |
REAL | FLOAT | |
SMALLINT | INT | |
INTEGER | INT | |
DATE | INT | date |