****************************** ADB to Kafka (kafka-connector) ****************************** **kafka-connector** - это коннектор к Arenadata DB, позволяющий производить транзакционную загрузку данных из ADB в кластер брокера сообщений Kafka. Совместимость ======================== Коннектор совместим с Apache Kafka версий 1.0.0 и выше Установка с помощью ADCM ======================== Для установки коннектора с помощью **ADCM** требуется инсталлировать сервис *PXF* на все сегментные ноды кластера, а также в списке сервисов выбрать сервис *ADB to Kafka*. При этом необходимые пакеты и файлы автоматически устанавливаются на машины кластера. Установка из rpm-пакетов ======================== Установка из rpm-пакетов предполагает, что в кластере *ADB* установлен сервис *PXF*. Для установки коннектора из rpm-пакетов необходимо: #. Установить пакет *kafka-connector* на всех хостах кластера *ADB*, где установлен сервис *PXF*; #. Добавить в файл */etc/pxf/conf/pxf-profiles-default.xml* на каждом хосте кластера следующую секцию: .. code-block:: xml kafka A profile for export data into Apache Kafka org.greenplum.pxf.plugins.kafka.KafkaAccessor org.greenplum.pxf.plugins.kafka.KafkaResolver #. Перезапустить сервис *PXF* на всех хостах кластера. ********** PXF плагин ********** `PXF `_ плагин позволяет передавать данные из *ADB* в *Kafka*. Установка ========= Для установки плагина, необходимо иметь: * Рабочий кластер ADB с установленным PXF. * Рабочий кластер ADS или При необходимости установки/обновления пользовательского pxf коннектора ====================================================================== Плагине имеется папка ``pxf-kafka/build/libs``с jar-файлами (``pxf-kafka.jar``, ``avro-1.9.2.jar``, ``kafka-clients-2.5.0.jar``). Их необходимо скопировать на каждый сегмент хост *ADB* и удалить старые:: cp pxf-kafka.jar /usr/lib/pxf/lib/ chown pxf:pxf /usr/lib/pxf/lib/pxf-kafka.jar rm /usr/lib/pxf/lib/shared/avro-1.7.7.jar cp avro-1.9.2.jar /usr/lib/pxf/lib/shared/ chown pxf:pxf /usr/lib/pxf/lib/shared/avro-1.9.2.jar cp kafka-clients-2.5.0.jar /usr/lib/pxf/lib/shared/ chown pxf:pxf /usr/lib/pxf/lib/shared/kafka-clients-2.5.0.jar Необходимо скопировать текст в тэгах ``...`` из файла ``pxf-kafka/env/pxf-profiles.xml`` и добавить его в файл ``/var/lib/pxf/conf/pxf-profiles.xml`` на каждом сегмент хосте *ADB*. Пример файла *pxf-profiles.xml*:: kafka A profile for export data into Apache Kafka org.greenplum.pxf.plugins.kafka.KafkaAccessor org.greenplum.pxf.plugins.kafka.KafkaResolver После чего необходимо перезапустить сервис pxf на каждом сегмент хосте:: systemctl restart pxf Запись информации в топик Kafka =============================== Коннектор *PXF Kafka* с профилем ``kafka`` поддерживает запись данных в Kafka. Когда вы создаете внешнюю таблицу для записи при помощи коннектора *PXF Kafka*, вы указываете имя топика kafka. Когда вы вводите данные в эту таблицу, запись об этих данных появляется в указанном топике. Используйте следующий синтаксис для создания внешней таблицы с профилем ``kafka``:: CREATE WRITABLE EXTERNAL TABLE ( [, ...] | LIKE ) LOCATION ('pxf://?PROFILE=kafka[&SERVER=][&=[...]]') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); [DISTRIBUTED BY ( [, ... ] ) | DISTRIBUTED RANDOMLY]; Ключевые параметры используемые в команде `CREATE EXTERNAL TABLE `_ описаны в таблице ниже: +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | Ключевое слово | Значение | Обязателен | | | | | +========================+=============================================================================================================================================================================================================================================================+=============+ | kafka_topic | Название топика ``kafka``. | Да | +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | PROFILE | Должен быть указан ``kafka``. | Да | +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | SERVER= | Имя конфигурации сервера, который использует PXF для доступа к данным. Если не указано, PXF использует сервер по-умолчанию. | Нет | +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | | Пользовательские настройки. Описнаы в таблице ниже. | см. таблицу | +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | DISTRIBUTED BY | Если вы хотите загрузить данные из существующей таблицы ADB во внешнюю таблицу, лучше будет указать ту же политику распределения или в обеих таблицах. Это позволит избежать лишних перемещений данных между сегменами при операции загрузки. | Нет | +------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ Профиль ``kafka`` поддерживает следующие пользовательские настройки: +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | Настройка | Описание значения | Обязателен | | | | | +================================+=======================================================================================================================================================================================================================+=============+ | BOOTSTRAP_SERVERS | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | BATCH_SIZE | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | TOPIC_AUTO_CREATE_FLAG | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | AVRO_DEFAULT_DECIMAL_PRECISION | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | AVRO_DEFAULT_DECIMAL_SCALE | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ Пример: запись данных в топик Kafka ----------------------------------- :: DROP EXTERNAL TABLE IF EXISTS kafka_tbl; CREATE WRITABLE EXTERNAL TABLE kafka_tbl (a TEXT, b TEXT, c TEXT) LOCATION ('pxf://data_from_gp?PROFILE=kafka&BOOTSTRAP_SERVERS=10.92.8.43:9092,10.92.8.38:9092&BATCH_SIZE=10') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); INSERT INTO kafka_tbl VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z'); Запись данных в топик Kafka при помощи профиль сервера ``SERVER=`` =============================================================================== Профиль сервера ``SERVER=`` позволяет вам использовать конфигурационный файл для параметров, вместо использования их в секции ``LOCATION`` внешней таблицы. Чтобы настроить профиль сервера, необходимо: 1. Создать папку в разделе ``/var/lib/pxf/servers/`` с подходящим названием. Например, следующая команда создаст папку для Kafka в среде разработки:: mkdir /var/lib/pxf/servers/kafka_dev 2. Скопировать шаблон конфигурационного файла из ``pxf-kafka/env/kafka-site.xml`` в папку ``/var/lib/pxf/servers/kafka_dev``. Пример конфигурационного файла ``kafka-site.xml``:: kafka.bootstrap.servers 10.92.8.38:9092 kafka.batch.size 1 kafka.topic.auto.create true avro.decimal.default.precision 38 avro.decimal.default.scale 18 3. Отредактировать файл ``kafka-site.xml`` и скопировать его на каждый сегмент хост в ту же папку. Все доступные параметры перечислены в таблице ниже (как можно заметить, параметры повторяют параметры профиля): +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | Настройка | Описание значения | Обязателен | | | | | +================================+=======================================================================================================================================================================================================================+=============+ | kafka.bootstrap.servers | Список брокеров Kafka через запятую, каждый из которых является хостом или host:port строкой. | Да | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | kafka.batch.size | Определяет сколько строк должно складываться в одно сообщение Avro. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | kafka.topic.auto.create | Позволяет топикам создаваться автоматически, когда в них записывают данные. Топик будет создан с 1 партицией с фактором репликации =1. Значение по умолчанию: true. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | avro.decimal.default.precision | Максимальное количество знаков в числе. Должно быть положительным числом выше нуля. Значение по-умолчанию: 38. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ | avro.decimal.default.scale | Количество знаков после запятой для чисел. Например, число 123.45 имеет DECIMAL_PRECISION 5 и DECIMAL_SCALE 2. Должно быть неотрицательным числом меньше или равным предыдущему параметру. Значение по-умолчанию: 18. | Нет | +--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+ Пример: Запись данных в топик Kafka при помощи профиля сервера ``SERVER=`` --------------------------------------------------------------------------------------- :: DROP EXTERNAL TABLE IF EXISTS kafka_tbl_dev; CREATE WRITABLE EXTERNAL TABLE kafka_tbl_dev (a TEXT, b TEXT, c TEXT) LOCATION ('pxf://data_from_gp?ACCESSOR=org.greenplum.pxf.plugins.kafka.KafkaAccessor&RESOLVER=org.greenplum.pxf.plugins.kafka.KafkaResolver&kafka.bootstrap.servers=10.92.8.38:9092') FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export'); INSERT INTO kafka_tbl_dev VALUES ('a', 'b,c', 'd'), ('x', 'y', 'z'); Преобразование типов GPDB → AVRO ================================ +-----------+----------------------+---------------------+ | Тип PXF | Примитивный тип AVRO | Логический тип AVRO | | | | | +===========+======================+=====================+ | BOOLEAN | BOOLEAN | | +-----------+----------------------+---------------------+ | TEXT | STRING | | +-----------+----------------------+---------------------+ | VARCHAR | STRING | | +-----------+----------------------+---------------------+ | TIMESTAMP | LONG | timestamp-micros | +-----------+----------------------+---------------------+ | BIGINT | LONG | | +-----------+----------------------+---------------------+ | TIME | LONG | time-micros | +-----------+----------------------+---------------------+ | NUMERIC | DOUBLE | | +-----------+----------------------+---------------------+ | FLOAT8 | DOUBLE | | +-----------+----------------------+---------------------+ | REAL | FLOAT | | +-----------+----------------------+---------------------+ | SMALLINT | INT | | +-----------+----------------------+---------------------+ | INTEGER | INT | | +-----------+----------------------+---------------------+ | DATE | INT | date | +-----------+----------------------+---------------------+