Пример использования NiFi ADB Connector

Обзор

Для иллюстрации работы NiFi ADB Connector в статье показана реализация загрузки данных из таблиц базы данных PostgreSQL в таблицы ADB (на основе Greenplum).

Создание NiFi ADB Connector выполняется в пользовательском интерфейсе NiFi и доступно начиная с ADS 3.9.0.1.b1.

Предварительные требования

Ниже описано окружение, используемое для создания NiFi ADB Connector.

ADS

  • Кластер ADS развернут согласно руководству Online-установка. Минимальная версия ADS — 3.9.0.1.b1.

  • Сервисы NiFi и ZooKeeper установлены в кластере ADS.

  • На хостах, где установлен NiFi, сохранен двоичный JAR-файл драйвера JDBC PostgreSQL, который позволяет программам подключаться к базе данных PostgreSQL, используя стандартный код Java. Путь к файлу используется при настройке сервисов DBCPConnectionPool для подключения к серверам PostgreSQL (ADPG) и Greenplum (ADB).

    Команда, приведенная ниже, позволяет загрузить необходимую версию JAR-файла (в данном примере используется версия 42.7.5) в нужную директорию:

$ wget https://jdbc.postgresql.org/download/postgresql-42.7.5.jar
  • При настройке сервиса DBCPConnectionPool также используется ссылка в формате jdbc:postgresql://<hostname>:5432/<database>, где:

    • <hostname> — хост, на котором установлен компонент ADPG (для подключения к серверу PostgreSQL) или компонент ADB Master (для подключения к серверу Greenplum);

    • <database> — наименование базы данных, в которой создана используемая таблица.

ADPG

  • Кластер ADPG развернут согласно руководству Online-установка.

  • На установленном PostgreSQL server (кластер ADPG) выполнены подготовительные действия:

    • Пользователь с именем my_user, привилегиями SUPERUSER и паролем создан в БД postgres.

    • В БД postgres создана таблица my_table и в нее добавлены несколько строк с данными.

    • Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис NiFi кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADPG добавлена запись об адресе хоста и пользователе:

      host    postgres  my_user      10.92.43.206/32     trust
ПРИМЕЧАНИЕ

ADB

  • Кластер ADB развернут согласно руководству Online-установка.

  • Сервис ADB установлен в кластере ADB.

  • Пользователь с именем new_user, привилегиями SUPERUSER и паролем создан в БД adb.

  • В БД adb создана таблица my_table для записи данных.

  • Файл pg_hba.conf настроен для обеспечения доступа пользователя с хоста, на котором установлен сервис NiFi кластера ADS. Для этого в поле PG_HBA на странице конфигурационных параметров сервиса ADB добавлена запись об адресе хоста и пользователе:

    host    adb  new_user       10.92.40.128/24     trust
  • В Interconnect-сети, к которой подключены хосты кластера, должен быть установлен MTU = 9000*(jumbo frame), чтобы пакеты, формируемые ADB (gp_max_packet_size + overhead), помещались в эти фреймы целиком. Для получения более подробной информации о требованиях к сети кластера ADB обратитесь к статье Требования к сети.

ПРИМЕЧАНИЕ

Для получения информации о работе в таблицах ADB обратитесь к статье Таблицы.

Подключение к PostgreSQL server (ADPG)

  1. Создайте процессор QueryDatabaseTable и откройте его конфигурацию. Этот процессор выполняет SQL-запросы для получения данных из таблицы PostgreSQL. Заполните параметры, связанные с используемой таблицей PostgreSQL.

    Конфигурация процессора QueryDatabaseTable
    Конфигурация процессора QueryDatabaseTable
    Конфигурация процессора QueryDatabaseTable
    Конфигурация процессора QueryDatabaseTable
  2. Перейдите в поле значения параметра Database Connection Pooling Service, во всплывающем списке выберите Create new service…​ и в открывшемся окне создайте экземпляр сервиса DBCPConnectionPool для создания подключения к PostgreSQL server.

    Создание экземпляра сервиса DBCPConnectionPool
    Создание экземпляра сервиса DBCPConnectionPool
    Создание экземпляра сервиса DBCPConnectionPool
    Создание экземпляра сервиса DBCPConnectionPool
  3. После сохранения созданного экземпляра нажмите arrow2 light, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и введите параметры, связанные с базой данных PostgreSQL.

    Конфигурация сервиса PostgressDBCPConnectionPool
    Конфигурация сервиса PostgressDBCPConnectionPool
    Конфигурация сервиса PostgressDBCPConnectionPool
    Конфигурация сервиса PostgressDBCPConnectionPool

Создание экземпляров сервиса может быть выполнено до создания процессора. Это может быть использовано для подключения нескольких процессоров к одному сервису. Для создания сервиса выполните:

  1. Правой кнопкой мыши кликните в пустом поле потока и выберите Configure в открывшемся контекстном меню.

  2. В окне NiFi Flow Configuration перейдите на вкладку Controller Services и кликните на +.

  3. Выберите нужный сервис из списка и создайте экземпляр сервиса в открывшемся окне.

Подключение к Greenplum (ADB)

  1. Для выполнения подключения к таблице Greenplum создайте процессор PutGreenplumRecord, откройте его конфигурацию и заполните параметры, связанные с таблицей Greenplum.

    Конфигурация процессора PutGreenplumRecord
    Конфигурация процессора PutGreenplumRecord
    Конфигурация процессора PutGreenplumRecord
    Конфигурация процессора PutGreenplumRecord
  2. Перейдите в поле значения параметра Gpfdist Service, во всплывающем списке выберите Create new service…​ и в открывшемся окне создайте экземпляр сервиса StandartGpfdistService.

    Создание экземпляра сервиса StandartGpfdistService
    Создание экземпляра сервиса StandartGpfdistService
    Создание экземпляра сервиса StandartGpfdistService
    Создание экземпляра сервиса StandartGpfdistService
  3. После сохранения созданного экземпляра нажмите arrow2 light, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и заполните необходимые параметры.

    Конфигурация сервиса StandartGpfdistService
    Конфигурация сервиса StandartGpfdistService
    Конфигурация сервиса StandartGpfdistService
    Конфигурация сервиса StandartGpfdistService
  4. Перейдите в поле значения параметра Database Connection Pooling Service, во всплывающем списке выберите Create new service…​ и в открывшемся окне создайте экземпляр сервиса DBCPConnectionPool для создания подключения к Greenplum.

  5. После сохранения созданного экземпляра нажмите arrow2 light, в открывшемся окне NiFi Flow Configuration → Controller Services откройте конфигурацию сервиса и введите параметры, связанные с базой данных ADB.

    Конфигурация сервиса GreenplumDBCPConnectionPool
    Конфигурация сервиса GreenplumDBCPConnectionPool
    Конфигурация сервиса GreenplumDBCPConnectionPool
    Конфигурация сервиса GreenplumDBCPConnectionPool
  6. Закройте окно NiFi Flow Configuration → Controller Services и снова перейдите к конфигурации процессора PutGreenplumRecord. В поле значения параметра Record Reader создайте экземпляр сервиса AvroReader для чтения записей из PostgreSQL в формате Avro со встроенной схемой.

Если используется несколько процессоров PutGreenplumRecord для переноса данных из разных источников в одну базу ADB, для значения Gpfdist Service выберите один и тот же созданный StandartGpfdistService.

После создания все сервисы отображаются на странице NiFi Flow Configuration → Controller Services.

Созданные сервисы
Созданные сервисы
Созданные сервисы
Созданные сервисы

Сервис StandartGpfdistService отображается с ошибкой до запуска связанного с ним сервиса GreenplumDBCPConnectionPool.

Запуск потока данных

Создайте и настройте подключение между процессорами.

Созданные и соединенные процессоры
Созданные и соединенные процессоры
Созданные и соединенные процессоры
Созданные и соединенные процессоры

Процессоры отображаются с ошибками из-за того, что сервисы, связанные с ними, не запущены.

Для запуска потока:

  1. Поочередно запустите сервисы на странице NiFi Flow Configuration → Controller Services, кликнув на иконку nifi ui oper 02.

  2. Запустите созданный поток данных.

Используя запросы к базе данных ADB, можно прочитать полученные данные.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней