NiFi REST API usage example

This article describes how to use NiFi REST API to create and manage a simple sequence of NiFi processors using the command line tool cURL.

The following examples of using the NiFi Rest API are described below:

  1. Create processors:

    • GenerateFlowFile — for generating a FlowFile that consists of random data of a given size or user-defined data.

    • PublishKafka_2_6 — for publishing messages to a Kafka topic.

  2. Create a connection between processors.

  3. Run processors.

  4. Get parameters, attributes of Flowfile, view Flowfile contents.

  5. Stop and terminate processors.

  6. User authorization.

The following prerequisites are met:

  • The ADS cluster was installed according to the Online installation guide.

  • In the ADS cluster the following services were added and installed: NiFi (on the sov-test-1.ru-central1.internal host) and Kafka (on the sov-test-2.ru-central1.internal host).

    NOTE

    The examples described below use the ADS cluster version 3.3.2.1.b1 and the corresponding versions of the NiFi (1.20.0) and Kafka (3.3.2) services.

  • To automatically create a Kafka topic, the auto.create.topics.enable parameter is enabled in the server.properties group when configuring the Kafka service.

  • From the host on which REST requests are generated, there is an access to the 9090 port of the sov-test-1.ru-central1.internal host.

  • On the host where REST requests are generated, the ./jq utility is installed to display JSON-format responses in a human-readable form.

Before executing the User authorization example, authentication of a user who has an account in Active Directory is additionally configured on the ADS cluster.

NiFi Root Process Group

All components exist within a process group. When a user initially navigates to the NiFi UI page, they are placed in the Root Process Group.

To work with NiFi via the REST API, you should use the root process group identifier ("id") in the path of some REST requests (endpoint). That identifier can be found by running the request:

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/process-groups/root |jq

The response to the request displays the parameters of the root process group, including the identifier ("id").

Root Process Group parameters
{
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "processGroupFlow": {
    "id": "4410f0eb-018c-1000-9096-472c6e6db150",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/process-groups/4410f0eb-018c-1000-9096-472c6e6db150",
    "breadcrumb": {
      "id": "4410f0eb-018c-1000-9096-472c6e6db150",
      "permissions": {
        "canRead": true,
        "canWrite": true
      },
      "breadcrumb": {
        "id": "4410f0eb-018c-1000-9096-472c6e6db150",
        "name": "NiFi Flow"
      }
    },
    "flow": {
      "processGroups": [],
      "remoteProcessGroups": [],
      "processors": [],
      "inputPorts": [],
      "outputPorts": [],
      "connections": [],
      "labels": [],
      "funnels": []
    },
    "lastRefreshed": "19:32:09 UTC"
  }
}

Create processors using NiFi REST API

To create a processor, use the POST/process-groups/{id}/processors request form.

Below are the queries for creating processors. At the endpoint of the request, the root process group identifier ("id") is indicated, and the processor parameters are indicated in the body of the request in JSON format.

Create the GenerateFlowFile processor
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
     "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "position": { \
            "x": 200, \
            "y": 0 \
        }, \
        "name": "GenerateFlowFile", \
        "type": "org.apache.nifi.processors.standard.GenerateFlowFile", \
        "bundle": { \
            "group": "org.apache.nifi", \
            "artifact": "nifi-standard-nar", \
            "version": "1.20.0" \
        }, \
        "state": "STOPPED", \
        "config": { \
            "properties": { \
                "File Size": "0B", \
                "Batch Size": "1", \
                "Data Format": "Text", \
                "Unique FlowFiles": "false", \
                "generate-ff-custom-text": "my custom message", \
                "character-set": "UTF-8", \
                "mime-type": null \
            } \
        } \
    } \
}' |jq
Create the PublishKafka_2_6 processor
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
    "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "position": { \
            "x": 200, \
            "y": 400 \
        }, \
        "name": "PublishKafka_2_6", \
        "type": "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6", \
        "bundle": { \
        "group": "org.apache.nifi", \
        "artifact": "nifi-kafka-2-6-nar", \
        "version": "1.20.0" \
        }, \
        "state": "STOPPED", \
        "config": { \
            "properties": { \
                "bootstrap.servers": "sov-test-2.ru-central1.internal:9092", \
                "topic": "my_new_topic" \
            }, \
         "autoTerminatedRelationships":["success","failure"] \
         } \
    } \
}' |jq

The parameters in the request body are entered in the format suggested by the ProcessorEntity link for this request on the /nifi-api page .

Parameters for creating a processor
Parameters for creating a processor
Parameters for creating a processor
Parameters for creating a processor

Some processor parameters are described below:

  • "revision":

    • "version" — version number from which the version counting for the processor will begin.

  • "component":

    • "name", "type", "bundle" — parameters unique for each processor type. These parameters can be found by querying all available processors for the current version of NiFi:

      $ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/processor-types
    • "state" — state of the processor when created.

    • "config" — processor configuration parameters ("properties"), different from default values ​​(configuration parameters are described in the documentation of each processor, for example PublishKafka_2_6).

    • "autoTerminatedRelationships" — parameter set for the last processor in the pair. For more information, see RELATIONSHIPS tabs.

The response to a processor creation request contains the identifier of the created processor ("id") and other processor parameters.

Parameters of the created processor
{
  "revision": {
    "version": 1,
    "lastModifier": "anonymous"
  },
  "id": "5f46b4af-018c-1000-f339-ca645d27debf",
  "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/5f46b4af-018c-1000-f339-ca645d27debf",
  "position": {
    "x": 200,
    "y": 0
  },
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "bulletins": [],
  "component": {
    "id": "5f46b4af-018c-1000-f339-ca645d27debf",
    "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "position": {
      "x": 200,
      "y": 0
    },
    "name": "GenerateFlowFile",
    "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
    "bundle": {
      "group": "org.apache.nifi",
      "artifact": "nifi-standard-nar",
      "version": "1.20.0"
    },
    "state": "STOPPED",
    "style": {},
    "relationships": [
      {
        "name": "success",
        "description": "",
        "autoTerminate": false,
        "retry": false
      }
    ],
    "supportsParallelProcessing": true,
    "supportsEventDriven": false,
    "supportsBatching": true,
    "supportsSensitiveDynamicProperties": false,
    "persistsState": false,
    "restricted": false,
    "deprecated": false,
    "executionNodeRestricted": false,
    "multipleVersionsAvailable": false,
    "inputRequirement": "INPUT_FORBIDDEN",
    "config": {
      "properties": {
        "File Size": "0B",
        "Batch Size": "1",
        "Data Format": "Text",
        "Unique FlowFiles": "false",
        "generate-ff-custom-text": "my custom message",
        "character-set": "UTF-8",
        "mime-type": null
      },
      "descriptors": {
        "File Size": {
          "name": "File Size",
          "displayName": "File Size",
          "description": "The size of the file that will be used",
          "defaultValue": "0B",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Batch Size": {
          "name": "Batch Size",
          "displayName": "Batch Size",
          "description": "The number of FlowFiles to be transferred in each invocation",
          "defaultValue": "1",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Data Format": {
          "name": "Data Format",
          "displayName": "Data Format",
          "description": "Specifies whether the data should be Text or Binary",
          "defaultValue": "Text",
          "allowableValues": [
            {
              "allowableValue": {
                "displayName": "Binary",
                "value": "Binary"
              },
              "canRead": true
            },
            {
              "allowableValue": {
                "displayName": "Text",
                "value": "Text"
              },
              "canRead": true
            }
          ],
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Unique FlowFiles": {
          "name": "Unique FlowFiles",
          "displayName": "Unique FlowFiles",
          "description": "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput",
          "defaultValue": "false",
          "allowableValues": [
            {
              "allowableValue": {
                "displayName": "true",
                "value": "true"
              },
              "canRead": true
            },
            {
              "allowableValue": {
                "displayName": "false",
                "value": "false"
              },
              "canRead": true
            }
          ],
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "generate-ff-custom-text": {
          "name": "generate-ff-custom-text",
          "displayName": "Custom Text",
          "description": "If Data Format is text and if Unique FlowFiles is false, then this custom text will be used as content of the generated FlowFiles and the File Size will be ignored. Finally, if Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles",
          "required": false,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": true,
          "expressionLanguageScope": "Variable Registry Only",
          "dependencies": []
        },
        "character-set": {
          "name": "character-set",
          "displayName": "Character Set",
          "description": "Specifies the character set to use when writing the bytes of Custom Text to a flow file.",
          "defaultValue": "UTF-8",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "mime-type": {
          "name": "mime-type",
          "displayName": "Mime Type",
          "description": "Specifies the value to set for the \"mime.type\" attribute.",
          "required": false,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        }
      },
      "schedulingPeriod": "1 min",
      "schedulingStrategy": "TIMER_DRIVEN",
      "executionNode": "ALL",
      "penaltyDuration": "30 sec",
      "yieldDuration": "1 sec",
      "bulletinLevel": "WARN",
      "runDurationMillis": 0,
      "concurrentlySchedulableTaskCount": 1,
      "comments": "",
      "lossTolerant": false,
      "defaultConcurrentTasks": {
        "TIMER_DRIVEN": "1",
        "EVENT_DRIVEN": "0",
        "CRON_DRIVEN": "1"
      },
      "defaultSchedulingPeriod": {
        "TIMER_DRIVEN": "0 sec",
        "CRON_DRIVEN": "* * * * * ?"
      },
      "retryCount": 10,
      "retriedRelationships": [],
      "backoffMechanism": "PENALIZE_FLOWFILE",
      "maxBackoffPeriod": "10 mins"
    },
    "validationErrors": [
      "'Relationship success' is invalid because Relationship 'success' is not connected to any component and is not auto-terminated"
    ],
    "validationStatus": "INVALID",
    "extensionMissing": false
  },
  "inputRequirement": "INPUT_FORBIDDEN",
  "status": {
    "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "id": "5f46b4af-018c-1000-f339-ca645d27debf",
    "name": "GenerateFlowFile",
    "runStatus": "Invalid",
    "statsLastRefreshed": "18:25:18 UTC",
    "aggregateSnapshot": {
      "id": "5f46b4af-018c-1000-f339-ca645d27debf",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "GenerateFlowFile",
      "type": "GenerateFlowFile",
      "runStatus": "Invalid",
      "executionNode": "ALL",
      "bytesRead": 0,
      "bytesWritten": 0,
      "read": "0 bytes",
      "written": "0 bytes",
      "flowFilesIn": 0,
      "bytesIn": 0,
      "input": "0 (0 bytes)",
      "flowFilesOut": 0,
      "bytesOut": 0,
      "output": "0 (0 bytes)",
      "taskCount": 0,
      "tasksDurationNanos": 0,
      "tasks": "0",
      "tasksDuration": "00:00:00.000",
      "activeThreadCount": 0,
      "terminatedThreadCount": 0
    }
  },
  "operatePermissions": {
    "canRead": true,
    "canWrite": true
  }
}

Create a connection between processors using NiFi REST API

To create a connection, use the POST/process-groups/{id}/connections request form.

Below is a query to create a connection. The endpoint specifies the root process group identifier ("id"), and the parameters are specified in the body of the request in JSON format.

Creating a connection between processors
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/connections \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
    "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "source": { \
            "id": "632d5f0c-018c-1000-3d78-410a76f1f451", \
            "type": "PROCESSOR", \
            "groupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
            "name": "GenerateFlowFile" \
        }, \
        "destination": { \
            "id": "632beead-018c-1000-cbec-e625a264e27a", \
            "type": "PROCESSOR", \
            "groupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
            "name": "PublishKafka_2_6" \
        }, \
        "name": "Test connection", \
        "selectedRelationships": [ \
            "success" \
        ] \
        } \
}' |jq

The parameters in the request body are entered in the format suggested by the ConnectionEntity link for this request on the /nifi-api page.

Parameters for creating a connection
Parameters for creating a connection
Parameters for creating a connection
Parameters for creating a connection

Some connection parameters are described below:

  • "revision":

    • "version" — version number from which the version count for the connection will begin.

  • "component":

    • "source" — parameters of the source processor obtained when creating the GenerateFlowFile processor.

    • "destination" — parameters of the destination processor received when creating the PublishKafka_2_6 processor.

    • "name" — connection name.

    • "selectedRelationships" — parameter that defines the relationship between the source and target components. For more information, see connection settings.

The response to a connection creation request contains the identifier of the created connection ("id") and other connection parameters.

Parameters of the created connection
{
  "revision": {
    "version": 1,
    "lastModifier": "anonymous"
  },
  "id": "6339c013-018c-1000-cabb-b497fd92db23",
  "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/connections/6339c013-018c-1000-cabb-b497fd92db23",
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "component": {
    "id": "6339c013-018c-1000-cabb-b497fd92db23",
    "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "source": {
      "id": "632d5f0c-018c-1000-3d78-410a76f1f451",
      "type": "PROCESSOR",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "GenerateFlowFile",
      "running": false,
      "comments": ""
    },
    "destination": {
      "id": "632beead-018c-1000-cbec-e625a264e27a",
      "type": "PROCESSOR",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "PublishKafka_2_6",
      "running": false,
      "comments": ""
    },
    "name": "Test connection",
    "labelIndex": 1,
    "zIndex": 0,
    "selectedRelationships": [
      "success"
    ],
    "availableRelationships": [
      "success"
    ],
    "backPressureObjectThreshold": 10000,
    "backPressureDataSizeThreshold": "1 GB",
    "flowFileExpiration": "0 sec",
    "prioritizers": [],
    "bends": [],
    "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
    "loadBalanceCompression": "DO_NOT_COMPRESS",
    "loadBalanceStatus": "LOAD_BALANCE_NOT_CONFIGURED"
  },
  "status": {
    "id": "6339c013-018c-1000-cabb-b497fd92db23",
    "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "name": "Test connection",
    "statsLastRefreshed": "12:49:38 UTC",
    "sourceId": "632d5f0c-018c-1000-3d78-410a76f1f451",
    "sourceName": "GenerateFlowFile",
    "destinationId": "632beead-018c-1000-cbec-e625a264e27a",
    "destinationName": "PublishKafka_2_6",
    "aggregateSnapshot": {
      "id": "6339c013-018c-1000-cabb-b497fd92db23",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "Test connection",
      "sourceName": "GenerateFlowFile",
      "destinationName": "PublishKafka_2_6",
      "flowFilesIn": 0,
      "bytesIn": 0,
      "input": "0 (0 bytes)",
      "flowFilesOut": 0,
      "bytesOut": 0,
      "output": "0 (0 bytes)",
      "flowFilesQueued": 0,
      "bytesQueued": 0,
      "queued": "0 (0 bytes)",
      "queuedSize": "0 bytes",
      "queuedCount": "0",
      "percentUseCount": 0,
      "percentUseBytes": 0,
      "flowFileAvailability": "ACTIVE_QUEUE_EMPTY"
    }
  },
  "bends": [],
  "labelIndex": 1,
  "zIndex": 0,
  "sourceId": "632d5f0c-018c-1000-3d78-410a76f1f451",
  "sourceGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
  "sourceType": "PROCESSOR",
  "destinationId": "632beead-018c-1000-cbec-e625a264e27a",
  "destinationGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
  "destinationType": "PROCESSOR"
}

Once created, all processors and connections are displayed on the NiFi user interface page.

Created components showing in NiFi UI
Created components showing in NiFi UI
Created components showing in NiFi UI
Created components showing in NiFi UI

Run processors using NiFi REST API

All components placed in a process group have a unique version number ("version"), which is used in all requests when managing components. The version number is incremented after any changes are made to the component. The processor version number can be obtained upon request by specifying the processor identifier ("id"):

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a

A list and parameters of all processors located in a process group, including identifiers ("id") and current versions ("version"), can be obtained upon request by specifying the process group identifier ("id"):

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors |jq

To start the processor, use the PUT/processors/{id}/run-status request form.

Below is the query to start the processor. The endpoint specifies the processor identifier ("id"), obtained during processor creation, the body of the request in JSON format specifies the "RUNNING" status and the current version of the processor.

$ curl   -X PUT   \
   http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a/run-status  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "RUNNING","revision":{"version":4}}' | jq

The parameters in the request body are entered in the format suggested by the ProcessorRunStatusEntity link for this request on the /nifi-api page.

Parameters for starting the processor
Parameters for starting the processor
Parameters for starting the processor
Parameters for starting the processor

The response to the request contains a new version number ("version": 5), a new status ("runStatus": "Running"), and other processor parameters.

Get parameters, attributes of Flowfile, view Flowfile contents using NiFi REST API

Create a Flowfiles list

To create a list of Flowfiles that are in the connection queue, use the POST/flowfile-queues/{id}/listing-requests request form.

Below is a request to create a Flowfiles list. The endpoint specifies the connection identifier ("id") obtained during connection creation.

$ curl -X POST http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests |jq

The response contains the request identifier ("id"), by which you can get a Flowfiles list.

Request parameters
{
  "listingRequest": {
    "id": "63a416da-018c-1000-aea5-326feb307321",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/63a416da-018c-1000-aea5-326feb307321",
    "submissionTime": "12/13/2023 14:45:47.354 UTC",
    "lastUpdated": "14:45:47 UTC",
    "percentCompleted": 0,
    "finished": false,
    "maxResults": 100,
    "state": "Waiting for other queue requests to complete",
    "queueSize": {
      "byteCount": 0,
      "objectCount": 0
    },
    "sourceRunning": true,
    "destinationRunning": true
  }
}

Get a Flowfiles list

To get the created list of Flowfiles, use the GET/flowfile-queues/{id}/listing-requests/{listing-request-id} request form.

Below is a request to get the created list of Flowfiles. The endpoint specifies the connection identifier ("id") obtained during connection creation and the request identifier ("id"), obtained during request to create a Flowfiles list.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/64a4cd16-018c-1000-c526-80916ab73224 |jq

The response to the request contains the request parameters and a list of Flowfiles that are in the queue at the time of the request, with identifiers ("uuid").

Flowfiles list
{
  "listingRequest": {
    "id": "64c154a8-018c-1000-3ffb-ef7ac87a4763",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/64c154a8-018c-1000-3ffb-ef7ac87a4763",
    "submissionTime": "12/13/2023 19:57:20.936 UTC",
    "lastUpdated": "19:57:20 UTC",
    "percentCompleted": 100,
    "finished": true,
    "maxResults": 100,
    "state": "Completed successfully",
    "queueSize": {
      "byteCount": 7,
      "objectCount": 1
    },
    "flowFileSummaries": [
      {
        "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196",
        "uuid": "c7400263-7613-46f6-a359-e49442995196",
        "filename": "c7400263-7613-46f6-a359-e49442995196",
        "position": 1,
        "size": 7,
        "queuedDuration": 93576,
        "lineageDuration": 93576,
        "penaltyExpiresIn": 0,
        "penalized": false
      }
    ],
    "sourceRunning": true,
    "destinationRunning": false
  }
}

Get Flowfile parameters and attributes

To get Flowfile parameters and attributes, use the GET/flowfile-queues/{id}/flowfiles/{flowfile-uuid} request form.

Below is a request for Flowfile parameters and attributes. The endpoint specifies the connection identifier ("id") obtained from connection creation and the Flowfile identifier ("uuid") obtained from Flowfiles list.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196 |jq

The response to the request contains the parameters and attributes of the Flowfile.

Flowfile parameters and attributes
{
  "flowFile": {
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196",
    "uuid": "c7400263-7613-46f6-a359-e49442995196",
    "filename": "c7400263-7613-46f6-a359-e49442995196",
    "size": 7,
    "queuedDuration": 686167,
    "lineageDuration": 686167,
    "penaltyExpiresIn": 0,
    "attributes": {
      "path": "./",
      "filename": "c7400263-7613-46f6-a359-e49442995196",
      "uuid": "c7400263-7613-46f6-a359-e49442995196"
    },
    "contentClaimSection": "1",
    "contentClaimContainer": "repo0",
    "contentClaimIdentifier": "1702392729887-1",
    "contentClaimOffset": 427364,
    "contentClaimFileSize": "7 bytes",
    "contentClaimFileSizeBytes": 7,
    "penalized": false
  }
}

View Flowfile contents

To view the Flowfile contents, use the GET/flowfile-queues/{id}/flowfiles/{flowfile-uuid}/content request form.

Below is the Flowfile content request. The endpoint specifies the connection identifier ("id") obtained from connection creation and the Flowfile identifier ("uuid") obtained from Flowfiles list.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196/content |jq

The response displays the contents of the Flowfile — the text specified as the value of the "generate-ff-custom-text" parameter when creating the GenerateFlowFile processor.

my custom message
NOTE

To read messages from a Kafka topic recorded via the processor PublishKafka_2_6, you can use the following methods:

  • Read data from the topic using the command line on the host where the Kafka service is installed.

  • Read data from the topic via the RESTful interface using the pre-installed Kafka REST Proxy service.

The topic name is the value of the "topic" parameter specified when creating the PublishKafka_2_6 processor.

Stop and terminate processors using NiFi REST API

Stop the processor

Similar to run the processor, to stop the processor, use the PUT/processors/{id}/run-status request form. Below is a request to stop the processor. The endpoint specifies the processor identifier ("id"), obtained during processor creation, the body of the request in JSON format specifies the status "STOPPED" and the current version of the processor.

$ curl   -X PUT   \
   http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632d5f0c-018c-1000-3d78-410a76f1f451/run-status  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "STOPPED","revision":{"version":23}}' | jq

The response to the request contains a new version number ("version": 24), a new status ("runStatus": ""Stopped"), and other processor parameters.

Terminate the processor

To terminate the processor (delete all threads and active tasks), use the DELETE/processors/{id}/threads request form. Terminate can only be performed after a complete processor stop.

Below is the request to shutdown the processor. The endpoint specifies the processor identifier ("id") obtained during processor creation.

$ curl -X DELETE http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632d5f0c-018c-1000-3d78-410a76f1f451/threads |jq

The response to the request contains the new version number ("version": 24) and other processor parameters.

User authorization using NiFi REST API

After authentication is configured, connection to NiFi via REST API is performed only after user authorization. Immediately after authentication is configured, authorization can only be performed for NiFi Initial Admin — the primary administrator user. To authorize, you need to get a special key (token) and use it in each NiFi REST API request.

To get a token, run the command:

$ curl --insecure 'https://sov-test-1.ru-central1.internal:9090/nifi-api/access/token' -H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' --data 'username=ldap-user&password=ldap-password' --compressed

Where:

  • ldap-user — user registered in Active Directory.

  • ldap-password — password with which the user is registered in Active Directory.

The result is a token containing the user’s data in encrypted form, for example:

eyJraWQiOiI0NzU1NTFmYS1hZmQxLTQxNjAtYWJmOS0wM2E1YjRmZGJlYWUiLCJhbGciOiJQUzUxMiJ9.eyJzdWIiOiJwcGV0cm92IiwiYXVkIjoiTGRhcFByb3ZpZGVyIiwibmJmIjoxNzAyODkzNDM1LCJpc3MiOiJMZGFwUHJvdmlkZXIiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJwcGV0cm92IiwiZXhwIjoxNzAyOTM2NjM1LCJpYXQiOjE3MDI4OTM0MzUsImp0aSI6IjY2MTA1NGY5LWM5ZjctNGViMC1hNDMzLTdmNGYzMTVmOTMxOCJ9.kW8MPLNxKy8kCi33SQg-FKEcmIxmo3EWQlUX_mBStF9WnZ1MFAy2rSnPOaqNI6nXgV5nVXKNKYQpGrlzVUPjxImBh18Z2MgJ_YKeeJuvRpJZdZL4ejlgh62JCTuwgbS2m2swtGY2mk1_BjS4MwrTJL87dW4By6r73KKQAdVZ1XwwjIU2NHfDJDq5QVJzirRlyfNqrkscb_fE8jLnnLqFA3WCcD9DxUBnOhByoVKhSh-7qzYKuBS-sakYhCkv881XlFeVnjR3LYAsAYomqt68wkDbphAixqqDuQYKA1kEmxAlI2vWf8bdPZI9BF4CCRbDMTXCZWE2FXSrhkiJLd60Jki3FVLUt3DSoV39ne31l1FsOTFL3KnhwbiRYEhuVKQ8xAYDQfhHpU2O8y9_2bN3zRi5DHcpkEdI4a1IzqSnY8XaK8KqXYxI1c9VZEuYMuuUqRX9OIVxUtNDgs2-wDKNLw-8kOLCMZuyAff36Zvfgkp5tj7-vhJkbA07_5HOdh2FofxoeQjUwIFg4vCOXk1T38GxLuLuaCAea_YxbNP5EpU9elbwpX7dPisfr0tuCZnlawffvC_tLp9XQGLuDGBH-PEaBizttUmRDfsTEpT7Fz5WAO2QZkOFEtZYY7adEyjoAfVElz48suobLmtJiKsfc7l-mAx6upnmsIpSFmlyfv8

Next, to authorize the user, in each NiFi REST API request you need to use the received token in the -H 'Authorization: Bearer <token>' header.

For example, the request for runnning the processor by an authorized user looks like this:

$ curl   -X PUT   \
  https://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a/run-status  \
  -H 'Authorization: Bearer eyJraWQiOiI1YzY3MzJhMS1iZDhjLTQ2MTktOTYwZi0wNDdiOWNhZjUyN2MiLCJhbGciOiJQUzUxMiJ9.eyJzdWIiOiJwcGV0cm92IiwiYXVkIjoiTGRhcFByb3ZpZGVyIiwibmJmIjoxNzAyODkxMDczLCJpc3MiOiJMZGFwUHJvdmlkZXIiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJwcGV0cm92IiwiZXhwIjoxNzAyOTM0MjczLCJpYXQiOjE3MDI4OTEwNzMsImp0aSI6IjFhZDMzODZmLWUyMTQtNDkyNS1hYWMyLTkzYjdmOGIyZDlhZCJ9.a3XbITwWxFJ3XKrL1kVfCW11gjXX7hn6kWE1wh9pdSud7Q9Ro0V6qWHQVNj3tsJhu4l0M9W13nzp-R6ysUK5vqoP3zy4XOgAjUXmxBONoPj_JJPVtwebNGzNUXDUV0oS0qXJctq7uWlpIr5145l2MLBVmAHRxD-n9L1fw6HVaRdqnlKU5wWDGdKJBHifmzDvYJjKeskMoeDq6SM9nIchsUYTNJxiMrZbKzWW6apequn6q5PHwCFm5HUwt3ThAVW0ojUoQWFutxN0p5DMfgKgC5JfogkctF_wRDGxvQLJSm82v-RnR0iZExHz1qVTy-EsJdF13GVyILs4ZxT_hzVkdnILeMFjT2aPviRdibXBFSfBk8abxfUsjkM6D1PzEBBgXtMizf1Cjrn-nxu7cLAPJMwKRM0CqZKdyhltuSftR5IM0XL1YF89azkj7R1ad90-4EaEVdCn4PXKD8ZipiTiX4nZVm1R2c6SJ4LSQ3iKZBCP7W7DuKjDA_WTUij9CoYNU_6hbWBK36gMkZ9RWYxBwS_cYXVL7SeGk0v50DsRPwwioztb_DM4Wzfwdsl6wxkVK1azsW0-UeELrhIQxjL0HmYyAD_Rc3w8erIELWE8CDCwhNJ40S3q4TCM3m-oy_DW55ObR1a1IDI4um_h6QIo5Cdr1BvipRNQbZvnWBJTozM'  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "RUNNING","revision":{"version":2}}' | jq
CAUTION

Authentication in the NiFi service is configured through activation of the SSL protocol, therefore you need to change the http protocol in the URL of all REST requests to the https protocol.

Found a mistake? Seleсt text and press Ctrl+Enter to report it