Объекты NiFi
В данной статье описана архитектура основных обьектов NiFi и их взаимодействие.
FlowFile
FlowFile — компонент NiFi, представляющий собой пакет данных. Это основная единица обработки и передачи информации в NiFi.
Каждый FlowFile состоит из двух частей:
-
содержимое (сontent) — массив байтов, который должен быть преобразован, извлечен или передан при помощи инструментов NiFi. Само содержимое хранится в Content Repository, и FlowFile отслеживает путь к месту его хранения.
-
атрибуты (attributes) — пары ключ/значение, которые содержат сведения о содержимом и другие данные или параметры, необходимые для правильной обработки.
Процессор
Процессор (Processor) — компонент NiFi, осуществляющий обработку FlowFiles. Процессор состоит из логического блока, реализованного на Java в соответствии с заданным назначением процессора, и интерфейса, при помощи которого возможно:
-
выполнять настройку конфигурационных параметров процессора, указывающих, как будут обработаны данные, и определяющих связь с внешними процессами;
-
управлять процессором;
-
осуществлять диагностику состояния процессора.
Соединение
Соединение (сonnection) — компонент NiFi, соединяющий процессоры для передачи FlowFiles между ними. В NiFi cоединение действует как буфер или очередь FlowFiles. После обработки FlowFiles одним процессором они накапливаются в очереди соединения, пока не будут приняты на обработку другим процессором. Очереди позволяют процессорам взаимодействовать с разной скоростью.
Соединение NiFi состоит из очереди FlowFiles и интерфейса, при помощи которого возможно:
-
выполнять настройку параметров соединения, которые определяют, как будут обрабатываться очереди, в том числе:
-
настройку применения backpressure;
-
настройку приоритетности FlowFiles;
-
-
управлять соединением;
-
осуществлять диагностику очереди.
Репозитории
FlowFile Repository
Репозиторий FlowFile Repository сохраняет данные каждого FlowFile в формате журнала опережающей записи (Write Ahead Log, WAL). Каждое изменение FlowFile является транзакционной операцией, что гарантирует возможность восстановления без потери данных при сбоях.
Информация о хранении Flowfile в репозитории доступна при просмотре данных Flowfile.
Система периодически создает моментальный снимок (снепшот) текущего состояния FlowFile (атрибуты и путь к содержимому на диске) и записывает данные снимка в репозиторий в виде файла с расширением .partial. Эти файлы являются промежуточными записями транзакций обработки FlowFile. В момент наступления контрольной точки (checkpoint) последний снепшот .partial переименовывается в snapshot (интервал между контрольными точками управляется параметром nifi.flowfile.repository.checkpoint.interval
). В случае сбоя файлы .partial, записанные после последнего файла snapshot, удаляются, а последний файл snapshot считается точкой восстановления для узла. Если файла snapshot не существует, последний .partial переименовывается в snapshot.
Для увеличения быстродействия обрабатываемые FlowFile могут храниться в памяти JVM, что может привести к потере данных при сбоях. С учетом использования WAL риски могут быть минимизированы при соответствующей требованиям проекта настройке параметров.
Процесс работы NiFi с памятью JVM (хранение данных и подкачка файлов при обработке) может быть отрегулирован при помощи параметров FlowFile Repository:
-
nifi.flowfile.repository.implementation
— определяет место хранения обрабатываемых файлов; -
nifi.flowfile.repository.always.sync
— определяет, нужно ли синхронизировать любое изменение в репозитории с диском; -
nifi.queue.swap.threshold
— порог очереди (количество FlowFile), при котором NiFi начинает подкачку (swap) информации FlowFile на диск.
Content Repository
Содержимое каждого FlowFile хранится в Content Repository. Репозиторий состоит из набора файлов на диске, каждый из которых имеет свой идентификатор (identifier). Содержимое каждого FlowFile хранится в файле под определенным смещением (offset).
Ниже приведена иерархия хранения данных в Content Repository:
base path |__Container 0 |__Section 1 |__identifier |__offset |__Section 2 |__identifier |__offset |__________ ... |__Container 1 |__Section 1 |__Section 2 |__ ... |__...
Этот репозиторий использует парадигмы неизменяемости (immutability) и копирования (copy-on-write). Это значит, что данные, записанные в репозиторий, не меняются и не меняют места размещения на диске.
Местонахождение содержимого отслеживают объекты Java Resource Claims, которые указывают путь к содержимому на диске (Container → Section → identifier → offset) и передают эти данные в качестве объекта Content Claim, который является частью метаданных FlowFile при хранении в FlowFile Repository.
Resource Claim отслеживает один и тот же путь для нескольких FlowFile, для которых содержимое является одинаковым. Если процессор меняет содержимое, новые данные записываются в новый фрагмент на диске и Resource Claim начинает отслеживать новый путь хранения данных для передачи в Content Claim.
Если ссылки на содержимое, созданные объектами Resource Claim, не используются ни в одном Content Claim, данное содержимое будет удалено или архивировано (архивирование включается параметром nifi.content.repository.archive.enabled
, а время хранения архивных данных параметром nifi.content.repository.archive.max.retention.period
).
Отслеживание неиспользующихся ссылок происходит каждый раз в момент достижения контрольной точки в FlowFile Repository.
Provenance Repository
Репозиторий Provenance Repository сохраняет снепшоты данных каждого FlowFile каждый раз после того, как FlowFile был изменен процессором. Это позволяет проследить хронологию изменения FlowFile.
Такой снимок в NiFi называется событием происхождения (Provenance Event).
Все события происхождения отображаются на странице Data Provenance глобального меню пользовательского интерфейса.
Старые версии содержимого FlowFile могут быть удалены до того, как будут удалены события происхождения, ссылающиеся на них. При этом пользователю будут доступны данные FlowFile для анализа прохождения данных.
Период хранения информации о происхождении данных регулируется параметром nifi.provenance.repository.max.storage.time
.
Обработка FlowFile
Ниже на рисунке показано, как происходит обработка и изменение данных в NiFi.
Ниже описана последовательность обработки FlowFile:
-
Снепшоты данных FlowFile с определенной периодичностью записываются в FlowFile Repository (
F1 — F4
). -
Если логика процессора предполагает изменение содержимого, FlowFile запрашивает содержимое (
С1 — Сn
) из Content Repository в соответствии с Content Claim и считывает его в память JVM. -
Измененное содержимое (
N1 — Nn
) записывается в новый фрагмент диска. -
Обьект Resource Claim начинает отслеживать новый путь к содержимому и обновляет данные в Content Claim.
-
В Provenance Repository записывается новый снепшот метаданных и атрибутов FlowFile (событие происхождения).
-
FlowFile поступает в очередь соединения для передачи в обработку следующему процессору. В FlowFile Repository записывается новый снепшот данных FlowFile.
Если во время выполнения одного из этапов возникает сбой, система вернется к состоянию, соответствующему последнему файлу snapshot в FlowFile Repository. При этом новые данные, записанные на диск, будут удалены, так как ссылка на них не успеет обновиться, а доступ к старым данным сохранится благодаря копированию при записи и неизменяемости.
Process Group
Группа процессов (Process Group) — компонент NiFi, позволяющий обьединять функциональные возможности разных процессоров и других элементов NiFi в один компонент, являющийся по сути новым процессором. Группа процессов должна содержать входной порт (Input Port), выходной порт (Output Port) и может содержать любое количество элементов NiFi и соединений между ними.
Группа процессов имеет интерфейс, при помощи которого возможно:
-
выполнять настройку конфигурационных параметров группы процессов;
-
управлять группой процессов;
-
осуществлять диагностику состояния группы процессов.
Flow Controller
Koнтроллер потока (Flow Controller) — сущность, объединяющая все процессы в NiFi. Является брокером для работы процессоров, контролирует работу расширений Nifi и планирует выделение ресурсов для работы процессоров и расширений.
Контроллер потока позволяет добавлять службы контроллера.
Службы контроллеров (Controller Service) — службы, при помощи которых могут быть определены общие настройки подключения к внешним системам и распространены на множество процессоров или групп процессов.
Настройка контроллера потока и сервисов контроллера выполняется на странице Controller Settings глобального меню пользовательского интерфейса.
Настройка NiFi
После добавления и установки сервиса NiFi в составе кластера ADS настройка параметров выполняется на странице конфигурирования сервиса NiFi в интерфейсе ADCM.
Для настройки параметров работы сервиса NiFi переведите в активное состояние переключатель Show advanced, раскройте узел nifi.properties и введите новые значения для параметров. Для изменения параметров NiFi, отсутствующих в интерфейсе ADCM, используйте поле Add key,value в узле nifi.properties. Выберите Add property и введите наименование параметра и его значение.