NiFi objects

This article describes the architecture of the core NiFi objects and their interaction.

FlowFile

FlowFile is a NiFi component, which is a data package, a unit of processing and transmission of information in NiFi.

Every FlowFile consists of two parts:

  • content — an array of bytes that should be converted, extracted, or transmitted using NiFi tools. The content itself is stored in Content Repository. FlowFile tracks the path to where the content is stored.

  • attributes — key/value pairs that contain information about the content and other data or parameters needed for proper processing.

Processor

Processor is the NiFi component that processes FlowFiles. The processor consists of a logical block implemented in Java in accordance with the specified purpose of the processor, and an interface with which it is possible to:

Connection

Connection is a NiFi component that connects processors to transfer FlowFiles between them. In NiFi, a connection acts as a buffer or queue of FlowFiles. After FlowFiles are processed by one processor, they accumulate in the connection queue until they are accepted for processing by another processor. Queues allow processors to communicate at different speeds.

A NiFi connection consists of a FlowFiles queue and an interface through which it is possible to:

Repositories

FlowFile Repository

The FlowFile Repository stores data of each FlowFile in the write ahead log format (WAL). Each FlowFile change is a transactional operation, which guarantees the ability to recover without data loss in case of failures.

Information about storing the Flowfile in the repository is available by viewing Flowfile data.

The system periodically creates a snapshot of the current state of the FlowFile (attributes and path to the contents on the disk) and writes the snapshot data to the repository as a file with the extension .partial. These files are intermediate records of FlowFile processing transactions. When a checkpoint occurs, the last snapshot .partial is renamed to snapshot (the interval between checkpoints is controlled by the nifi.flowfile.repository.checkpoint.interval parameter). In case of failures, the .partial files written after the last snapshot file are deleted, and the last snapshot file is considered the recovery point for the node. If the snapshot file does not exist, the last .partial is renamed to snapshot.

To increase performance, processed FlowFiles can be stored in JVM memory, which can lead to data loss during failures. Given the use of WAL, risks can be minimized by setting parameters according to project requirements.

The process of NiFi working with JVM memory (data storage and file swapping during processing) can be adjusted via the FlowFile Repository parameters:

  • nifi.flowfile.repository.implementation — determines the storage location of processed files;

  • nifi.flowfile.repository.always.sync — determines whether any change in the repository should be synchronized with disk;

  • nifi.queue.swap.threshold — queue threshold (number of FlowFiles) at which NiFi starts swapping FlowFile information to disk.

Content Repository

The content of each FlowFile is stored in a Content Repository. A repository consists of a set of files on disk. Each of the files has its own identifier. The contents of each FlowFile are stored in a file at a certain offset.

Below is the hierarchy of data storage in Content Repository:

base path
       |__Container 0
                    |__Section 1
                               |__identifier
                                           |__offset
                    |__Section 2
                               |__identifier
                                           |__offset
                    |__________ ...
       |__Container 1
                    |__Section 1
                    |__Section 2
                    |__ ...
       |__...

This repository uses the immutability and copy-on-write paradigms. This means that the data written to the repository does not change and data location on disk also does not change.

Content location is tracked by Java Resource Claims objects, which specify the path to the content on disk (Container → Section → identifier → offset) and pass this data as a Content Claim object, which is part of the FlowFile metadata when stored in FlowFile Repository.

Resource Claim tracks the same path for multiple FlowFiles that have the same content. If the processor changes content, the new data is written to a new fragment on disk and Resource Claim begins tracking the new data storage path for transmission to Content Claim.

If content references created by Resource Claim objects are not used in any Content Claim, the content will be deleted or archived (archiving is enabled by the nifi.content.repository.archive.enabled parameter and the retention time of archived data by the nifi.content.repository.archive.max.retention.period parameter ).

Tracking of unused links occurs every time a checkpoint is reached in the FlowFile Repository.

Provenance Repository

The Provenance Repository stores snapshots of data of every FlowFile each time a FlowFile has been modified by the processor. This allows you to track the chronology of FlowFile changes.

Such a snapshot in NiFi is called Provenance Event.

All provenance events are displayed on the Data Provenance page of the global UI menu.

Old versions of FlowFile content may be deleted before the origin events referencing them are deleted. In this case, the user will have access to FlowFile metadata and attributes for analyzing the flow of data.

The storage period for data provenance information is controlled by the nifi.provenance.repository.max.storage.time parameter.

Process FlowFile

The figure below shows how data is processed and modified in NiFi.

Nifi objects
Nifi objects
Nifi objects
Nifi objects

The FlowFile processing sequence is described below:

  1. Snapshots of FlowFile data are written to FlowFile Repository (F1 — F4) at a certain frequency.

  2. If the processor logic expects the content to change, FlowFile requests the content (C1 — Cn) from Content Repository according to Content Claim and reads it into JVM memory.

  3. The changed contents (N1 — Nn) are written to a new disk fragment.

  4. The Resource Claim object starts tracking the new content path and updates the data in the Content Claim.

  5. A new snapshot of the FlowFile metadata and attributes (provenance event) is written to Provenance Repository.

  6. FlowFile enters the connection queue for processing in the next processor. A new snapshot is written to the FlowFile Repository with updated FlowFile data.

If a failure occurs during one of the steps, the system will revert to the state corresponding to the latest snapshot file in the FlowFile Repository. In this case, new data written to the disk will be deleted, since the link does not have time to be updated and access to old data remains unchanged due to copy-on-write and immutability.

Process Group

Process Group is a NiFi component that allows you to combine the functionality of different processors and other NiFi elements into one component, which is essentially a new processor. The process group must contain an Input Port, Output Port and can contain any number of NiFi elements and connections between them.

A process group has an interface that allows you to:

Process Group
Process Group
Process Group
Process Group

Flow Controller

Flow Controller is an entity that unites all processes in NiFi. It is a broker for the operation of processors, which controls the operation of Nifi extensions and plans the allocation of resources for the operation of processors and extensions.

The flow controller allows you to add controller services.

Controller Services — services intended for setting up connections to external systems and distributing to multiple processors or process groups.

Configuring the flow controller and controller services is done on the Controller Settings page of the global user interface menu.

Configure NiFi

After adding and installing the NiFi service as part of ADS cluster, you can configure NiFi parameters on the configuration page of the NiFi service via ADCM.

To configure the parameters of the NiFi service, set the Show advanced switch to active, expand the nifi.properties node and enter new values ​​for the parameters. To change NiFi parameters that are not available in the ADCM interface, use the Add key,value field in the nifi.properties node. Select Add property and enter the name of the parameter and its value.

Found a mistake? Seleсt text and press Ctrl+Enter to report it