Пример использования NiFi ADB Connector
Обзор
Для иллюстрации работы NiFi ADB Connector в статье показана реализация загрузки данных из таблиц базы данных PostgreSQL в таблицы ADB (на основе Greenplum).
Создание NiFi ADB Connector выполняется в пользовательском интерфейсе NiFi и доступно начиная с ADS 3.9.0.1.b1.
Предварительные требования
Ниже описано окружение, используемое для создания NiFi ADB Connector.
ADS
-
Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.9.0.1.b1.
-
Сервисы NiFi и ZooKeeper установлены в кластере ADS.
-
На хостах, где установлен NiFi, сохранен двоичный JAR-файл драйвера JDBC PostgreSQL, который позволяет программам подключаться к базе данных PostgreSQL, используя стандартный код Java. Путь к файлу используется при настройке сервисов DBCPConnectionPool для подключения к серверам PostgreSQL (ADPG) и Greenplum (ADB).
Команда, приведенная ниже, позволяет загрузить необходимую версию JAR-файла (в данном примере используется версия 42.7.5) в нужную директорию:
$ wget https://jdbc.postgresql.org/download/postgresql-42.7.5.jar
-
При настройке сервиса DBCPConnectionPool также используется ссылка в формате
jdbc:postgresql://<hostname>:5432/<database>
, где:-
<hostname>
— хост, на котором установлен компонент ADPG (для подключения к серверу PostgreSQL) или компонент ADB Master (для подключения к серверу Greenplum); -
<database>
— наименование базы данных, в которой создана используемая таблица.
-
ADPG
-
Кластер ADPG развернут согласно руководству Online-установка.
-
На установленном PostgreSQL server (кластер ADPG) выполнены подготовительные действия:
-
Пользователь с именем
my_user
, привилегиямиSUPERUSER
и паролем создан в БДpostgres
. -
В БД
postgres
создана таблицаmy_table
и в нее добавлены несколько строк с данными. -
Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис NiFi кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADPG добавлена запись об адресе хоста и пользователе:
host postgres my_user 10.92.43.206/32 trust
-
ПРИМЕЧАНИЕ
|
ADB
-
Кластер ADB развернут согласно руководству Online-установка.
-
Сервис ADB установлен в кластере ADB.
-
Пользователь с именем
new_user
, привилегиямиSUPERUSER
и паролем создан в БДadb
. -
В БД
adb
создана таблицаmy_table
для записи данных. -
Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис NiFi кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADB добавлена запись об адресе хоста и пользователе:
host adb new_user 10.92.40.128/24 trust
-
В Interconnect-сети, к которой подключены хосты кластера, должен быть установлен
MTU = 9000*
(jumbo frame), чтобы пакеты, формируемые ADB (gp_max_packet_size
+ overhead), помещались в эти фреймы целиком. Для получения более подробной информации о требованиях к сети кластера ADB обратитесь к статье Требования к сети.
ПРИМЕЧАНИЕ
Для получения информации о работе в таблицах ADB обратитесь к статье Таблицы. |
Подключение к PostgreSQL server (ADPG)
-
Создайте процессор QueryDatabaseTable и откройте его конфигурацию. Этот процессор выполняет SQL-запросы для получения данных из таблицы PostgreSQL. Заполните параметры, связанные с используемой таблицей PostgreSQL.
Конфигурация процессора QueryDatabaseTableКонфигурация процессора QueryDatabaseTable -
Перейдите в поле значения параметра Database Connection Pooling Service, во всплывающем списке выберите Create new service… и в открывшемся окне создайте экземпляр сервиса DBCPConnectionPool для создания подключения к PostgreSQL server.
Создание экземпляра сервиса DBCPConnectionPoolСоздание экземпляра сервиса DBCPConnectionPool -
После сохранения созданного экземпляра нажмите
, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и введите параметры, связанные с базой данных PostgreSQL.
Конфигурация сервиса PostgressDBCPConnectionPoolКонфигурация сервиса PostgressDBCPConnectionPool
Подключение к Greenplum (ADB)
-
Для выполнения подключения к таблице Greenplum создайте процессор PutGreenplumRecord, откройте его конфигурацию и заполните параметры, связанные с таблицей Greenplum.
Конфигурация процессора PutGreenplumRecordКонфигурация процессора PutGreenplumRecord -
Перейдите в поле значения параметра Gpfdist Service, во всплывающем списке выберите Create new service… и в открывшемся окне создайте экземпляр сервиса StandartGpfdistService.
Создание экземпляра сервиса StandartGpfdistServiceСоздание экземпляра сервиса StandartGpfdistService -
После сохранения созданного экземпляра нажмите
, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и заполните необходимые параметры.
Конфигурация сервиса StandartGpfdistServiceКонфигурация сервиса StandartGpfdistService -
Перейдите в поле значения параметра Database Connection Pooling Service, во всплывающем списке выберите Create new service… и в открывшемся окне создайте экземпляр сервиса DBCPConnectionPool для создания подключения к Greenplum.
-
После сохранения созданного экземпляра нажмите
, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и введите параметры, связанные с базой данных ADB.
Конфигурация сервиса GreenplumDBCPConnectionPoolКонфигурация сервиса GreenplumDBCPConnectionPool -
Закройте окно NiFi Flow Configuration → Controller Services и снова перейдите к конфигурации процессора PutGreenplumRecord. В поле значения параметра Record Reader создайте экземпляр сервиса AvroReader для чтения записей из PostgreSQL в формате Avro со встроенной схемой.
Если используется несколько процессоров PutGreenplumRecord для переноса данных из разных источников в одну базу ADB, для значения Gpfdist Service выберите один и тот же созданный StandartGpfdistService.
После создания все сервисы отображаются на странице NiFi Flow Configuration → Controller Services.


Сервис StandartGpfdistService отображается с ошибкой до запуска связанного с ним сервиса GreenplumDBCPConnectionPool.
Запуск потока данных


Процессоры отображаются с ошибками из-за того, что сервисы, связанные с ними, не запущены.
Для запуска потока:
-
Поочередно запустите сервисы на странице NiFi Flow Configuration → Controller Services, кликнув на иконку
.
-
Запустите созданный поток данных.
Используя запросы к базе данных ADB, можно прочитать полученные данные.