Типы процессоров NiFi
В данной статье описаны существующие типы процессоров NiFi.
ПРИМЕЧАНИЕ
Для получения информации об архитектуре основных обьектов NiFi и их взаимодействии вы можете обратиться к статье Объекты NiFi. |
Data transformation
Процессоры, принадлежащие типу Data Transformation, меняют содержимое FlowFile.
Процессор копирует содержимое из Content Repository, преобразует данные в соответствии с собственной логикой, записывает новые данные в Content Repository и сохраняет ссылку на измененные данные в FlowFile.
FlowFiles, содержимое которых удалось преобразовать, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Примеры процессоров и их использования:
-
замена всего или части текста при помощи регулярного выражения — ReplaceText;
-
сжатие или распаковка содержимого — CompressContent;
-
преобразование символов из одного набора символов в другой — ConvertCharacterSet.
Routing and mediation
Процессоры, принадлежащие типу Routing and Mediation, используются для маршрутизации FlowFiles к различным процессорам в соответствии с информацией в атрибутах или содержимом этих FlowFiles.
Процессор использует заданное пользователем и собственной логикой условие (на основе атрибутов или контента, скорости передачи данных, отслеживания дубликатов и т.д.) для управления перемещением FlowFiles.
FlowFiles, соответствующие заданному условию, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
(matched
, duplicate
— в зависимости от процессора).
Примеры процессоров и их использования:
-
регулирование пропускной способности FlowFiles на основе настроенной скорости — ControlRate;
-
отслеживание дубликатов FlowFiles на основе критериев, определяемых пользователем — DetectDuplicate;
-
маршрутизация FlowFiles на основе их атрибутов с использованием языка выражений NiFi — RouteOnAttribute.
Database access
Процессоры, принадлежащие типу Database Access, предназначены для работы с базами данных.
Процессор выполняет SQL-запрос, при этом в зависимости от собственной логики может использовать содержимое входящего FlowFile в качестве содержания запроса или записывать результат запроса в качестве содержимого исходящего FlowFile.
Также некоторые процессоры могут подготавливать запросы, преобразуя их из других форматов (ConvertJSONToSQL).
FlowFiles при успешном выполнении запроса отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Примеры процессоров и их использования:
-
преобразование документа JSON в команду SQL
INSERT
илиUPDATE
, которую затем можно передать процессору PutSQL — ConvertJSONToSQL; -
обновление базы данных с использованием в команде содержимого FlowFile — PutSQL;
-
выполнение команды SQL
SELECT
с записью результатов в содержимое FlowFile в формате Avro — ExecuteSQL.
Attribute extraction
Процессоры, принадлежащие типу Attribute Extraction, создают вновь или меняют атрибуты FlowFile.
Процессор использует заданное пользователем и собственной логикой условие (регулярное выражение, вычисление хеш-функции и т.д.), чтобы изменить уже имеющиеся атрибуты или извлечь атрибуты из содержимого FlowFile.
FlowFiles, аттрибуты которых удалось преобразовать, отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
(или matched
— в зависимости от процессора).
Примеры процессоров и их использования:
-
извлечение атрибутов из текста содержимого FlowFile в соответствии с регулярными выражениями, заданными в пользовательских свойствах процессора — ExtractText;
-
обновление атрибутов FlowFile с помощью языка выражений NiFi или на основе регулярных выражений, заданных пользователем — UpdateAttribute;
-
вычисление криптографического хеш-значения для содержимого FlowFile с использованием заданного алгоритма и запись значения в выходной атрибут — CryptographicHashContent.
System interaction
Процессоры, принадлежащие типу System Interaction, запускают процессы в операционной системе или скрипты в различных средах разработки.
Процессор запускает команду операционной системы или запускает скрипт с использованием содержимого FlowFile или без него. При этом процессор создает или изменяет FlowFile, в содержимое которого записывается результат отработки команды или скрипта.
При успешной обработке команд созданные FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Примеры процессоров и их использования:
-
запуск заданной пользователем команды операционной системы и создание FlowFile в результате отработки команды — ExecuteProcess;
-
запуск заданной пользователем команды операционной системы с использованием содержимого входящего FlowFile как StdIn и записью StdOut в исходящий FlowFile — ExecuteStreamCommand.
Data ingestion
Процессоры, принадлежащие типу Data Ingestion, принимают данные в поток NiFi.
Процессор создает новый FlowFile, при этом записывает содержимое исходного файла (или другого источника) в Content Repository и создает атрибуты FlowFile. Как правило, такие процессоры являются отправной точкой потока данных в Apache NiFi.
При успешной записи содержимого и создании атрибутов FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Примеры процессоров и их использования:
-
передача содержимого файла с локального диска (или сетевого диска) в качестве содержимого создаваемого FlowFile и последующее удаление исходного файла — GetFile;
-
подписка на один или несколько топиков Apache Kafka с созданием FlowFile, в качестве содержимого которого сохраняется сообщение Apache Kafka — ConsumeKafka.
Data egress/Sending data
Процессоры, принадлежащие типу Data Egress/Sending Data, отправляют данные на сервер назначения.
Процессор передает содержимое FlowFile в точку назначения, а также отправляет атрибуты в качестве дополнительных заголовков, если это указано пользователем в настройках процессора. Как правило, такие процессоры являются конечной точкой потока данных в Apache NiFi.
При успешной отправке данных и создании атрибутов FlowFiles удаляются.
Примеры процессоров и их использования:
-
отправление содержимого входящего FlowFile электронным письмом настроенным получателям — PutEmail;
-
отправление содержимого FlowFile в виде сообщения в Apache Kafka — PublishKafka;
-
запись содержимого FlowFile в файл локальной системы — PutFile.
Splitting and aggregation
Процессоры, принадлежащие типу Splitting and Aggregation, разделяют или объединяют содержимое FlowFile.
При разделении процессор создает новые FlowFiles, в содержимое которых записываются данные, полученные в результате разделения. Разделение происходит в соответствии с заданным пользователем условием и собственной логикой процессора. Атрибуты для каждого нового FlowFile обычно являются данными, описывающими полученный в результате деления сегмент содержимого — размер, идентификатор, количество строк и т.д.
При агрегировании процессор создает новый FlowFile, содержимое которого обьединяет содержимое всех входящих файлов. Атрибуты для нового FlowFile являются данными, описывающими полученный в результате обьединения файл: имя файла, количество обьединенных FlowFiles, время существования пакета до отправления и т.д.
Процессор записывает содержимое новых FlowFiles в Content Repository и сохраняет ссылку на них в каждом FlowFile.
При разделении или агрегировании содержимого новые FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
(splits
, segments
, merged
— в зависимости от процессора).
Примеры процессоров и их использования:
-
Распаковка архивов различных типов, например ZIP и TAR, при этом каждый файл после распаковки передается как отдельный FlowFile — UnpackContent.
-
Объединение множества FlowFile в один FlowFile в соотвествии с заданными пользователем условиями, например, на основе общего атрибута. При этом может быть задан минимальный и максимальный размер каждого пакета, а также тайм-аут для ожидания заполнения пакета — MergeContent.
-
Разделение FlowFile на потенциально множество FlowFile на основе определенной последовательности байтов, по которой следует разделить содержимое — SplitContent.
HTTP
Процессоры, принадлежащие типу HTTP, обрабатывают вызовы HTTP и HTTPS.
При входящих запросах процессор создает FlowFile и записывает содержимое запроса в содержимое FlowFile. Если процессор совершает исходящий запрос (PUT, POST или PATCH), содержимое FlowFile отправляется в качестве тела сообщения.
Успешно созданные при входящих запросах FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Для универсального процессора, отвечающего за прием и отправление запросов (InvokeHTTP), для соединения может быть настроены несколько значений For Relationships (original
, failure
, response
и т.д.), куда FlowFiles могут быть распределены в зависимости от кода состояния ответа HTTP.
Примеры процессоров и их использования:
-
запуск HTTP- (или HTTPS-) сервера и прослушивание входящих соединений — ListenHTTP;
-
взаимодействие с настраиваемой конечной точкой HTTP с использованием метода GET, при котором тело ответа становится содержимым создаваемого FlowFile — InvokeHTTP.
Amazon Web Services
Процессоры, принадлежащие типу Amazon Web Services, отвечают за взаимодействие с системой веб-сервисов Amazon.
Процессор совершает действия в соответствии с собственной логикой — извлекает содержимое обьектов или сообщений AWS, отправляет содержимое FlowFile в качестве содержимого обьекта AWS или уведомлений. При сохранении данных из AWS в содержимое FlowFile в качестве атрибутов сохраняются параметры, описывающие данные: идентификатор, хеш-сумма и т.д.
При успешном выполнении запроса FlowFiles отправляются в очередь соединения, у которого параметр For Relationships имеет значение success
.
Примеры процессоров и их использования:
-
извлечение содержимого объекта, хранящегося в Amazon Simple Storage — FetchS3Object;
-
запись содержимого FlowFile в объект Amazon S3 — PutS3Object.