Типы процессоров NiFi

В данной статье описаны существующие типы процессоров NiFi.

ПРИМЕЧАНИЕ

Для получения информации об архитектуре основных обьектов NiFi и их взаимодействии вы можете обратиться к статье Объекты NiFi.

Data transformation

Процессоры, принадлежащие типу Data Transformation, меняют содержимое FlowFile.

Процессор копирует содержимое из Content Repository, преобразует данные в соответствии с собственной логикой, записывает новые данные в Content Repository и сохраняет ссылку на измененные данные в FlowFile.

FlowFiles, содержимое которых удалось преобразовать, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success.

Преобразование данных
Преобразование данных
Преобразование данных
Преобразование данных

Примеры процессоров и их использования:

  • замена всего или части текста при помощи регулярного выражения — ReplaceText;

  • сжатие или распаковка содержимого — CompressContent;

  • преобразование символов из одного набора символов в другой — ConvertCharacterSet.

Routing and mediation

Процессоры, принадлежащие типу Routing and Mediation, используются для маршрутизации FlowFiles к различным процессорам в соответствии с информацией в атрибутах или содержимом этих FlowFiles.

Процессор использует заданное пользователем и собственной логикой условие (на основе атрибутов или контента, скорости передачи данных, отслеживания дубликатов и т.д.) для управления перемещением FlowFiles.

FlowFiles, соответствующие заданному условию, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success (matched, duplicate — в зависимости от процессора).

Маршрутизация и посредничество
Маршрутизация и посредничество
Маршрутизация и посредничество
Маршрутизация и посредничество

Примеры процессоров и их использования:

  • регулирование пропускной способности FlowFiles на основе настроенной скорости — ControlRate;

  • отслеживание дубликатов FlowFiles на основе критериев, определяемых пользователем — DetectDuplicate;

  • маршрутизация FlowFiles на основе их атрибутов с использованием языка выражений NiFi — RouteOnAttribute.

Database access

Процессоры, принадлежащие типу Database Access, предназначены для работы с базами данных.

Процессор выполняет SQL-запрос, при этом в зависимости от собственной логики может использовать содержимое входящего FlowFile в качестве содержания запроса или записывать результат запроса в качестве содержимого исходящего FlowFile.

Также некоторые процессоры могут подготавливать запросы, преобразуя их из других форматов (ConvertJSONToSQL).

FlowFiles при успешном выполнении запроса отправляются в очередь соединения, у которого параметр For Relationships имеет значение success.

Доступ к базе данных
Доступ к базе данных
Доступ к базе данных
Доступ к базе данных

Примеры процессоров и их использования:

  • преобразование документа JSON в команду SQL INSERT или UPDATE, которую затем можно передать процессору PutSQL — ConvertJSONToSQL;

  • обновление базы данных с использованием в команде содержимого FlowFile — PutSQL;

  • выполнение команды SQL SELECT с записью результатов в содержимое FlowFile в формате Avro — ExecuteSQL.

Attribute extraction

Процессоры, принадлежащие типу Attribute Extraction, создают вновь или меняют атрибуты FlowFile.

Процессор использует заданное пользователем и собственной логикой условие (регулярное выражение, вычисление хеш-функции и т.д.), чтобы изменить уже имеющиеся атрибуты или извлечь атрибуты из содержимого FlowFile.

FlowFiles, аттрибуты которых удалось преобразовать, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success (или matched — в зависимости от процессора).

Извлечение атрибутов
Извлечение атрибутов
Извлечение атрибутов
Извлечение атрибутов

Примеры процессоров и их использования:

  • извлечение атрибутов из текста содержимого FlowFile в соответствии с регулярными выражениями, заданными в пользовательских свойствах процессора — ExtractText;

  • обновление атрибутов FlowFile с помощью языка выражений NiFi или на основе регулярных выражений, заданных пользователем — UpdateAttribute;

  • вычисление криптографического хеш-значения для содержимого FlowFile с использованием заданного алгоритма и запись значения в выходной атрибут — CryptographicHashContent.

System interaction

Процессоры, принадлежащие типу System Interaction, запускают процессы в операционной системе или скрипты в различных средах разработки.

Процессор запускает команду операционной системы или запускает скрипт с использованием содержимого FlowFile или без него. При этом процессор создает или изменяет FlowFile, в содержимое которого записывается результат отработки команды или скрипта.

При успешной обработке команд созданные FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success.

Взаимодействие с системой
Взаимодействие с системой
Взаимодействие с системой
Взаимодействие с системой

Примеры процессоров и их использования:

  • запуск заданной пользователем команды операционной системы и создание FlowFile в результате отработки команды — ExecuteProcess;

  • запуск заданной пользователем команды операционной системы с использованием содержимого входящего FlowFile как StdIn и записью StdOut в исходящий FlowFile — ExecuteStreamCommand.

Data ingestion

Процессоры, принадлежащие типу Data Ingestion, принимают данные в поток NiFi.

Процессор создает новый FlowFile, при этом записывает содержимое исходного файла (или другого источника) в Content Repository и создает атрибуты FlowFile. Как правило, такие процессоры являются отправной точкой потока данных в Apache NiFi.

При успешной записи содержимого и создании атрибутов FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success.

Прием данных
Прием данных
Прием данных
Прием данных

Примеры процессоров и их использования:

  • передача содержимого файла с локального диска (или сетевого диска) в качестве содержимого создаваемого FlowFile и последующее удаление исходного файла — GetFile;

  • подписка на один или несколько топиков Apache Kafka с созданием FlowFile, в качестве содержимого которого сохраняется сообщение Apache Kafka — ConsumeKafka.

Data egress/Sending data

Процессоры, принадлежащие типу Data Egress/Sending Data, отправляют данные на сервер назначения.

Процессор передает содержимое FlowFile в точку назначения, а также отправляет атрибуты в качестве дополнительных заголовков, если это указано пользователем в настройках процессора. Как правило, такие процессоры являются конечной точкой потока данных в Apache NiFi.

При успешной отправке данных и создании атрибутов FlowFiles удаляются.

Выход данных
Выход данных
Выход данных
Выход данных

Примеры процессоров и их использования:

  • отправление содержимого входящего FlowFile электронным письмом настроенным получателям — PutEmail;

  • отправление содержимого FlowFile в виде сообщения в Apache Kafka — PublishKafka;

  • запись содержимого FlowFile в файл локальной системы — PutFile.

Splitting and aggregation

Процессоры, принадлежащие типу Splitting and Aggregation, разделяют или объединяют содержимое FlowFile.

При разделении процессор создает новые FlowFiles, в содержимое которых записываются данные, полученные в результате разделения. Разделение происходит в соответствии с заданным пользователем условием и собственной логикой процессора. Атрибуты для каждого нового FlowFile обычно являются данными, описывающими полученный в результате деления сегмент содержимого — размер, идентификатор, количество строк и т.д.

При агрегировании процессор создает новый FlowFile, содержимое которого обьединяет содержимое всех входящих файлов. Атрибуты для нового FlowFile являются данными, описывающими полученный в результате обьединения файл: имя файла, количество обьединенных FlowFiles, время существования пакета до отправления и т.д.

Процессор записывает содержимое новых FlowFiles в Content Repository и сохраняет ссылку на них в каждом FlowFile.

При разделении или агрегировании содержимого новые FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success (splits, segments, merged — в зависимости от процессора).

Разделение
Разделение
Разделение
Разделение

Примеры процессоров и их использования:

  • Распаковка архивов различных типов, например ZIP и TAR, при этом каждый файл после распаковки передается как отдельный FlowFile — UnpackContent.

  • Объединение множества FlowFile в один FlowFile в соотвествии с заданными пользователем условиями, например, на основе общего атрибута. При этом может быть задан минимальный и максимальный размер каждого пакета, а также тайм-аут для ожидания заполнения пакета — MergeContent.

  • Разделение FlowFile на потенциально множество FlowFile на основе определенной последовательности байтов, по которой следует разделить содержимое — SplitContent.

HTTP

Процессоры, принадлежащие типу HTTP, обрабатывают вызовы HTTP и HTTPS.

При входящих запросах процессор создает FlowFile и записывает содержимое запроса в содержимое FlowFile. Если процессор совершает исходящий запрос (PUT, POST или PATCH), содержимое FlowFile отправляется в качестве тела сообщения.

Успешно созданные при входящих запросах FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success. Для универсального процессора, отвечающего за прием и отправление запросов (InvokeHTTP), для соединения может быть настроены несколько значений For Relationships (original, failure, response и т.д.), куда FlowFiles могут быть распределены в зависимости от кода состояния ответа HTTP.

HTTP-запросы
HTTP-запросы
HTTP-запросы
HTTP-запросы

Примеры процессоров и их использования:

  • запуск HTTP- (или HTTPS-) сервера и прослушивание входящих соединений — ListenHTTP;

  • взаимодействие с настраиваемой конечной точкой HTTP с использованием метода GET, при котором тело ответа становится содержимым создаваемого FlowFile — InvokeHTTP.

Amazon Web Services

Процессоры, принадлежащие типу Amazon Web Services, отвечают за взаимодействие с системой веб-сервисов Amazon.

Процессор совершает действия в соответствии с собственной логикой — извлекает содержимое обьектов или сообщений AWS, отправляет содержимое FlowFile в качестве содержимого обьекта AWS или уведомлений. При сохранении данных из AWS в содержимое FlowFile в качестве атрибутов сохраняются параметры, описывающие данные: идентификатор, хеш-сумма и т.д.

При успешном выполнении запроса FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success.

AWS
AWS
AWS
AWS

Примеры процессоров и их использования:

  • извлечение содержимого объекта, хранящегося в Amazon Simple Storage — FetchS3Object;

  • запись содержимого FlowFile в объект Amazon S3 — PutS3Object.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней