Пример использования NiFi REST API

В статье описывается, как использовать NiFi REST API для создания и управления простой последовательностью процессоров NiFi с помощью инструмента командной строки cURL.

Ниже описаны следующие примеры использования NiFi Rest API:

  1. Создание процессоров:

    • GenerateFlowFile — предназначен для генерации FlowFile, состоящего из случайных данных заданного размера или данных, определяемых пользователем.

    • PublishKafka_2_6 — предназначен для публикации сообщений в топик Kafka.

  2. Создание соединения между процессорами.

  3. Запуск процессоров.

  4. Получение параметров, атрибутов, просмотр содержимого Flowfile.

  5. Остановка и завершение работы процессоров.

  6. Авторизация пользователя.

При выполнении всех примеров соблюдены следующие условия:

  • Кластер ADS установлен согласно руководству Online-установка.

  • В кластере ADS добавлены и установлены сервисы NiFi (на хосте sov-test-1.ru-central1.internal) и Kafka (на хосте sov-test-2.ru-central1.internal).

    ПРИМЕЧАНИЕ

    В описанных ниже примерах используется кластер ADS версии 3.3.2.1.b1 и соответствующие ему версии сервисов NiFi (1.20.0) и Kafka (3.3.2).

  • Для автоматического создания топика Kafka включен параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

  • С хоста, на котором формируются REST-запросы, имеется доступ к порту 9090 хоста sov-test-1.ru-central1.internal.

  • На хосте, на котором формируются REST-запросы, установлена утилита ./jq для вывода ответов JSON-формата в удобочитаемой форме.

Перед выполнением примера Авторизация пользователя в кластере ADS дополнительно настраивается аутентификация пользователя, имеющего учетную запись в Active Directory.

Корневая группа процессов NiFi

Все компоненты существуют внутри группы процессов. Когда пользователь первоначально переходит на страницу пользовательского интерфеса NiFi, он попадает в корневую группу процессов (Root Process Group).

Для работы с NiFi через REST API в пути некоторых REST-запросов (конечной точке) используется идентификатор корневой группы процессов ("id"), который можно узнать, выполнив запрос:

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/process-groups/root |jq

В ответе на запрос отображаются параметры корневой группы процессов, в том числе идентификатор ("id").

Параметры корневой группы процессов
{
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "processGroupFlow": {
    "id": "4410f0eb-018c-1000-9096-472c6e6db150",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/process-groups/4410f0eb-018c-1000-9096-472c6e6db150",
    "breadcrumb": {
      "id": "4410f0eb-018c-1000-9096-472c6e6db150",
      "permissions": {
        "canRead": true,
        "canWrite": true
      },
      "breadcrumb": {
        "id": "4410f0eb-018c-1000-9096-472c6e6db150",
        "name": "NiFi Flow"
      }
    },
    "flow": {
      "processGroups": [],
      "remoteProcessGroups": [],
      "processors": [],
      "inputPorts": [],
      "outputPorts": [],
      "connections": [],
      "labels": [],
      "funnels": []
    },
    "lastRefreshed": "19:32:09 UTC"
  }
}

Создание процессоров с использованием NiFi REST API

Для создания процессора используется форма запроса типа POST/process-groups/{id}/processors.

Ниже представлены запросы для создания процессоров. В конечной точке запроса указывается идентификатор корневой группы процессов ("id"), в теле запроса в JSON-формате указываются параметры процессоров.

Создание процессора GenerateFlowFile
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
     "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "position": { \
            "x": 200, \
            "y": 0 \
        }, \
        "name": "GenerateFlowFile", \
        "type": "org.apache.nifi.processors.standard.GenerateFlowFile", \
        "bundle": { \
            "group": "org.apache.nifi", \
            "artifact": "nifi-standard-nar", \
            "version": "1.20.0" \
        }, \
        "state": "STOPPED", \
        "config": { \
            "properties": { \
                "File Size": "0B", \
                "Batch Size": "1", \
                "Data Format": "Text", \
                "Unique FlowFiles": "false", \
                "generate-ff-custom-text": "my custom message", \
                "character-set": "UTF-8", \
                "mime-type": null \
            } \
        } \
    } \
}' |jq
Создание процессора PublishKafka_2_6
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
    "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "position": { \
            "x": 200, \
            "y": 400 \
        }, \
        "name": "PublishKafka_2_6", \
        "type": "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6", \
        "bundle": { \
        "group": "org.apache.nifi", \
        "artifact": "nifi-kafka-2-6-nar", \
        "version": "1.20.0" \
        }, \
        "state": "STOPPED", \
        "config": { \
            "properties": { \
                "bootstrap.servers": "sov-test-2.ru-central1.internal:9092", \
                "topic": "my_new_topic" \
            }, \
         "autoTerminatedRelationships":["success","failure"] \
         } \
    } \
}' |jq

Параметры в теле запроса вводятся в формате, предложенном по ссылке ProcessorEntity для данного запроса на странице /nifi-api.

Параметры для создания процессора
Параметры для создания процессора
Параметры для создания процессора
Параметры для создания процессора

Ниже описаны некоторые параметры процессоров:

  • "revision":

    • "version" — номер версии, с которой начнется отсчет версий для процессора.

  • "component":

    • "name", "type", "bundle" — параметры, уникальные для каждого типа процессора. Эти параметры можно узнать, выполнив запрос всех доступных процессоров для текущей версии NiFi:

      $ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flow/processor-types
    • "state" — состояние процессора при создании.

    • "config" — конфигурационные параметры процессора ("properties"), отличающиеся от значений по умолчанию (конфигурационные параметры описаны в документации каждого процессора, например PublishKafka_2_6).

    • "autoTerminatedRelationships" — параметр, устанавливаемый для последнего процессора в связке. Подробнее см. в описании вкладки RELATIONSHIPS.

Ответ на запрос о создании процессора содержит идентификатор созданного процессора ("id") и другие параметры процессора.

Параметры созданного процессора
{
  "revision": {
    "version": 1,
    "lastModifier": "anonymous"
  },
  "id": "5f46b4af-018c-1000-f339-ca645d27debf",
  "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/5f46b4af-018c-1000-f339-ca645d27debf",
  "position": {
    "x": 200,
    "y": 0
  },
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "bulletins": [],
  "component": {
    "id": "5f46b4af-018c-1000-f339-ca645d27debf",
    "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "position": {
      "x": 200,
      "y": 0
    },
    "name": "GenerateFlowFile",
    "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
    "bundle": {
      "group": "org.apache.nifi",
      "artifact": "nifi-standard-nar",
      "version": "1.20.0"
    },
    "state": "STOPPED",
    "style": {},
    "relationships": [
      {
        "name": "success",
        "description": "",
        "autoTerminate": false,
        "retry": false
      }
    ],
    "supportsParallelProcessing": true,
    "supportsEventDriven": false,
    "supportsBatching": true,
    "supportsSensitiveDynamicProperties": false,
    "persistsState": false,
    "restricted": false,
    "deprecated": false,
    "executionNodeRestricted": false,
    "multipleVersionsAvailable": false,
    "inputRequirement": "INPUT_FORBIDDEN",
    "config": {
      "properties": {
        "File Size": "0B",
        "Batch Size": "1",
        "Data Format": "Text",
        "Unique FlowFiles": "false",
        "generate-ff-custom-text": "my custom message",
        "character-set": "UTF-8",
        "mime-type": null
      },
      "descriptors": {
        "File Size": {
          "name": "File Size",
          "displayName": "File Size",
          "description": "The size of the file that will be used",
          "defaultValue": "0B",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Batch Size": {
          "name": "Batch Size",
          "displayName": "Batch Size",
          "description": "The number of FlowFiles to be transferred in each invocation",
          "defaultValue": "1",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Data Format": {
          "name": "Data Format",
          "displayName": "Data Format",
          "description": "Specifies whether the data should be Text or Binary",
          "defaultValue": "Text",
          "allowableValues": [
            {
              "allowableValue": {
                "displayName": "Binary",
                "value": "Binary"
              },
              "canRead": true
            },
            {
              "allowableValue": {
                "displayName": "Text",
                "value": "Text"
              },
              "canRead": true
            }
          ],
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "Unique FlowFiles": {
          "name": "Unique FlowFiles",
          "displayName": "Unique FlowFiles",
          "description": "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput",
          "defaultValue": "false",
          "allowableValues": [
            {
              "allowableValue": {
                "displayName": "true",
                "value": "true"
              },
              "canRead": true
            },
            {
              "allowableValue": {
                "displayName": "false",
                "value": "false"
              },
              "canRead": true
            }
          ],
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "generate-ff-custom-text": {
          "name": "generate-ff-custom-text",
          "displayName": "Custom Text",
          "description": "If Data Format is text and if Unique FlowFiles is false, then this custom text will be used as content of the generated FlowFiles and the File Size will be ignored. Finally, if Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles",
          "required": false,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": true,
          "expressionLanguageScope": "Variable Registry Only",
          "dependencies": []
        },
        "character-set": {
          "name": "character-set",
          "displayName": "Character Set",
          "description": "Specifies the character set to use when writing the bytes of Custom Text to a flow file.",
          "defaultValue": "UTF-8",
          "required": true,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        },
        "mime-type": {
          "name": "mime-type",
          "displayName": "Mime Type",
          "description": "Specifies the value to set for the \"mime.type\" attribute.",
          "required": false,
          "sensitive": false,
          "dynamic": false,
          "supportsEl": false,
          "expressionLanguageScope": "Not Supported",
          "dependencies": []
        }
      },
      "schedulingPeriod": "1 min",
      "schedulingStrategy": "TIMER_DRIVEN",
      "executionNode": "ALL",
      "penaltyDuration": "30 sec",
      "yieldDuration": "1 sec",
      "bulletinLevel": "WARN",
      "runDurationMillis": 0,
      "concurrentlySchedulableTaskCount": 1,
      "comments": "",
      "lossTolerant": false,
      "defaultConcurrentTasks": {
        "TIMER_DRIVEN": "1",
        "EVENT_DRIVEN": "0",
        "CRON_DRIVEN": "1"
      },
      "defaultSchedulingPeriod": {
        "TIMER_DRIVEN": "0 sec",
        "CRON_DRIVEN": "* * * * * ?"
      },
      "retryCount": 10,
      "retriedRelationships": [],
      "backoffMechanism": "PENALIZE_FLOWFILE",
      "maxBackoffPeriod": "10 mins"
    },
    "validationErrors": [
      "'Relationship success' is invalid because Relationship 'success' is not connected to any component and is not auto-terminated"
    ],
    "validationStatus": "INVALID",
    "extensionMissing": false
  },
  "inputRequirement": "INPUT_FORBIDDEN",
  "status": {
    "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "id": "5f46b4af-018c-1000-f339-ca645d27debf",
    "name": "GenerateFlowFile",
    "runStatus": "Invalid",
    "statsLastRefreshed": "18:25:18 UTC",
    "aggregateSnapshot": {
      "id": "5f46b4af-018c-1000-f339-ca645d27debf",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "GenerateFlowFile",
      "type": "GenerateFlowFile",
      "runStatus": "Invalid",
      "executionNode": "ALL",
      "bytesRead": 0,
      "bytesWritten": 0,
      "read": "0 bytes",
      "written": "0 bytes",
      "flowFilesIn": 0,
      "bytesIn": 0,
      "input": "0 (0 bytes)",
      "flowFilesOut": 0,
      "bytesOut": 0,
      "output": "0 (0 bytes)",
      "taskCount": 0,
      "tasksDurationNanos": 0,
      "tasks": "0",
      "tasksDuration": "00:00:00.000",
      "activeThreadCount": 0,
      "terminatedThreadCount": 0
    }
  },
  "operatePermissions": {
    "canRead": true,
    "canWrite": true
  }
}

Создание соединения между процессорами с использованием NiFi REST API

Для создания соединения используется форма запроса POST/process-groups/{id}/connections.

Ниже представлен запрос для создания соединения. В конечной точке указывается идентификатор корневой группы процессов ("id"), в теле запроса в JSON-формате указываются параметры.

Создание соединения между процессорами
$ curl -X POST \
  http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/connections \
  -H 'Content-Type: application/json' \
  -d  '{ \
    "revision": { \
        "version": 0 \
    }, \
    "component": { \
        "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
        "source": { \
            "id": "632d5f0c-018c-1000-3d78-410a76f1f451", \
            "type": "PROCESSOR", \
            "groupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
            "name": "GenerateFlowFile" \
        }, \
        "destination": { \
            "id": "632beead-018c-1000-cbec-e625a264e27a", \
            "type": "PROCESSOR", \
            "groupId": "4410f0eb-018c-1000-9096-472c6e6db150", \
            "name": "PublishKafka_2_6" \
        }, \
        "name": "Test connection", \
        "selectedRelationships": [ \
            "success" \
        ] \
        } \
}' |jq

Параметры в теле запроса вводятся в формате, предложенном по ссылке ConnectionEntity для данного запроса на странице /nifi-api.

Параметры для создания соединения
Параметры для создания соединения
Параметры для создания соединения
Параметры для создания соединения

Ниже описаны некоторые параметры соединения:

  • "revision":

    • "version" — номер версии, с которой начнется отсчет версий для соединения.

  • "component":

    • "source" — параметры процессора-источника, полученные при создании процессора GenerateFlowFile.

    • "destination" — параметры процессора-приемника, полученные при создании процессора PublishKafka_2_6.

    • "name" — имя соединения.

    • "selectedRelationships" — параметр, определяющий отношения исходного и целевого компонента. Подробнее см. в описании настройки соединения.

Ответ на запрос о создании соединения содержит идентификатор созданного соединения ("id") и другие параметры соединения.

Параметры созданного соединения
{
  "revision": {
    "version": 1,
    "lastModifier": "anonymous"
  },
  "id": "6339c013-018c-1000-cabb-b497fd92db23",
  "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/connections/6339c013-018c-1000-cabb-b497fd92db23",
  "permissions": {
    "canRead": true,
    "canWrite": true
  },
  "component": {
    "id": "6339c013-018c-1000-cabb-b497fd92db23",
    "parentGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "source": {
      "id": "632d5f0c-018c-1000-3d78-410a76f1f451",
      "type": "PROCESSOR",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "GenerateFlowFile",
      "running": false,
      "comments": ""
    },
    "destination": {
      "id": "632beead-018c-1000-cbec-e625a264e27a",
      "type": "PROCESSOR",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "PublishKafka_2_6",
      "running": false,
      "comments": ""
    },
    "name": "Test connection",
    "labelIndex": 1,
    "zIndex": 0,
    "selectedRelationships": [
      "success"
    ],
    "availableRelationships": [
      "success"
    ],
    "backPressureObjectThreshold": 10000,
    "backPressureDataSizeThreshold": "1 GB",
    "flowFileExpiration": "0 sec",
    "prioritizers": [],
    "bends": [],
    "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
    "loadBalanceCompression": "DO_NOT_COMPRESS",
    "loadBalanceStatus": "LOAD_BALANCE_NOT_CONFIGURED"
  },
  "status": {
    "id": "6339c013-018c-1000-cabb-b497fd92db23",
    "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
    "name": "Test connection",
    "statsLastRefreshed": "12:49:38 UTC",
    "sourceId": "632d5f0c-018c-1000-3d78-410a76f1f451",
    "sourceName": "GenerateFlowFile",
    "destinationId": "632beead-018c-1000-cbec-e625a264e27a",
    "destinationName": "PublishKafka_2_6",
    "aggregateSnapshot": {
      "id": "6339c013-018c-1000-cabb-b497fd92db23",
      "groupId": "4410f0eb-018c-1000-9096-472c6e6db150",
      "name": "Test connection",
      "sourceName": "GenerateFlowFile",
      "destinationName": "PublishKafka_2_6",
      "flowFilesIn": 0,
      "bytesIn": 0,
      "input": "0 (0 bytes)",
      "flowFilesOut": 0,
      "bytesOut": 0,
      "output": "0 (0 bytes)",
      "flowFilesQueued": 0,
      "bytesQueued": 0,
      "queued": "0 (0 bytes)",
      "queuedSize": "0 bytes",
      "queuedCount": "0",
      "percentUseCount": 0,
      "percentUseBytes": 0,
      "flowFileAvailability": "ACTIVE_QUEUE_EMPTY"
    }
  },
  "bends": [],
  "labelIndex": 1,
  "zIndex": 0,
  "sourceId": "632d5f0c-018c-1000-3d78-410a76f1f451",
  "sourceGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
  "sourceType": "PROCESSOR",
  "destinationId": "632beead-018c-1000-cbec-e625a264e27a",
  "destinationGroupId": "4410f0eb-018c-1000-9096-472c6e6db150",
  "destinationType": "PROCESSOR"
}

После создания все процессоры и соединения отображаются на странице пользовательского интерфеса NiFi.

Созданные компоненты, отображающиеся в UI NiFi
Созданные компоненты, отображающиеся в UI NiFi
Созданные компоненты, отображающиеся в UI NiFi
Созданные компоненты, отображающиеся в UI NiFi

Запуск процессоров с использованием NiFi REST API

Все компоненты, размещенные в группе процессов, имеют уникальный номер версии ("version"), который используется во всех запросах при управлении компонентами. Номер версии увеличивается после внесения любых изменений в компонент. Номер версии процессора можно получить по запросу, указав идентификатор процессора ("id"):

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a

Список и параметры всех процессоров, размещенных в группе процессов, включая идентификаторы ("id") и текущие версии ("version"), можно получить по запросу, указав идентификатор группы процессов ("id"):

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/process-groups/4410f0eb-018c-1000-9096-472c6e6db150/processors |jq

Для запуска процессора используется форма запроса PUT/processors/{id}/run-status.

Ниже представлен запрос для запуска процессора. В конечной точке указывается идентификатор процессора ("id"), полученный при создании процессора, в теле запроса в JSON-формате указываются статус "RUNNING" и текущая версия процессора.

$ curl   -X PUT   \
   http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a/run-status  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "RUNNING","revision":{"version":4}}' | jq

Параметры в теле запроса вводятся в формате, предложенном по ссылке ProcessorRunStatusEntity для данного запроса на странице /nifi-api.

Параметры для запуска процессора
Параметры для запуска процессора
Параметры для запуска процессора
Параметры для запуска процессора

Ответ на запрос содержит новый номер версии ("version": 5), новый статус ("runStatus": "Running") и другие параметры процессора.

Получение параметров, атрибутов, просмотр содержимого Flowfile с использованием NiFi REST API

Создание списка Flowfiles

Для создания списка Flowfiles, которые находятся в очереди соединения, используется форма запроса POST/flowfile-queues/{id}/listing-requests.

Ниже представлен запрос на создание списка Flowfiles. В конечной точке указывается идентификатор соединения ("id"), полученный при создании соединения.

$ curl -X POST http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests |jq

Ответ содержит идентификатор запроса ("id"), по которому можно получить список Flowfiles.

Параметры запроса
{
  "listingRequest": {
    "id": "63a416da-018c-1000-aea5-326feb307321",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/63a416da-018c-1000-aea5-326feb307321",
    "submissionTime": "12/13/2023 14:45:47.354 UTC",
    "lastUpdated": "14:45:47 UTC",
    "percentCompleted": 0,
    "finished": false,
    "maxResults": 100,
    "state": "Waiting for other queue requests to complete",
    "queueSize": {
      "byteCount": 0,
      "objectCount": 0
    },
    "sourceRunning": true,
    "destinationRunning": true
  }
}

Получение списка Flowfiles

Для получения созданного списка Flowfiles используется форма запроса GET/flowfile-queues/{id}/listing-requests/{listing-request-id}.

Ниже представлен запрос на получение созданного списка Flowfiles. В конечной точке указывается идентификатор соединения ("id"), полученный при создании соединения и идентификатор запроса ("id"), полученный при запросе на создание списка Flowfiles.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/64a4cd16-018c-1000-c526-80916ab73224 |jq

Ответ на запрос содержит параметры запроса и список Flowfiles, находящихся в очереди в момент запроса, c идентификаторами ("uuid").

Cписок Flowfiles
{
  "listingRequest": {
    "id": "64c154a8-018c-1000-3ffb-ef7ac87a4763",
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/listing-requests/64c154a8-018c-1000-3ffb-ef7ac87a4763",
    "submissionTime": "12/13/2023 19:57:20.936 UTC",
    "lastUpdated": "19:57:20 UTC",
    "percentCompleted": 100,
    "finished": true,
    "maxResults": 100,
    "state": "Completed successfully",
    "queueSize": {
      "byteCount": 7,
      "objectCount": 1
    },
    "flowFileSummaries": [
      {
        "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196",
        "uuid": "c7400263-7613-46f6-a359-e49442995196",
        "filename": "c7400263-7613-46f6-a359-e49442995196",
        "position": 1,
        "size": 7,
        "queuedDuration": 93576,
        "lineageDuration": 93576,
        "penaltyExpiresIn": 0,
        "penalized": false
      }
    ],
    "sourceRunning": true,
    "destinationRunning": false
  }
}

Получение параметров и атрибутов Flowfile

Для получения параметров и атрибутов Flowfile используется форма запроса GET/flowfile-queues/{id}/flowfiles/{flowfile-uuid}.

Ниже представлен запрос параметров и атрибутов Flowfile. В конечной точке указывается идентификатор соединения ("id"), полученный при создании соединения и идентификатор Flowfile ("uuid"), полученный в списке Flowfiles.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196 |jq

Ответ на запрос содержит параметры и атрибуты Flowfile.

Параметры и атрибуты Flowfile
{
  "flowFile": {
    "uri": "http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196",
    "uuid": "c7400263-7613-46f6-a359-e49442995196",
    "filename": "c7400263-7613-46f6-a359-e49442995196",
    "size": 7,
    "queuedDuration": 686167,
    "lineageDuration": 686167,
    "penaltyExpiresIn": 0,
    "attributes": {
      "path": "./",
      "filename": "c7400263-7613-46f6-a359-e49442995196",
      "uuid": "c7400263-7613-46f6-a359-e49442995196"
    },
    "contentClaimSection": "1",
    "contentClaimContainer": "repo0",
    "contentClaimIdentifier": "1702392729887-1",
    "contentClaimOffset": 427364,
    "contentClaimFileSize": "7 bytes",
    "contentClaimFileSizeBytes": 7,
    "penalized": false
  }
}

Просмотр содержимого Flowfile

Для просмотра содержимого Flowfile используется форма запроса GET/flowfile-queues/{id}/flowfiles/{flowfile-uuid}/content.

Ниже представлен запрос содержимого Flowfile. В конечной точке указывается идентификатор соединения ("id"), полученный при создании соединения и идентификатор Flowfile ("uuid"), полученный в списке Flowfiles.

$ curl -X GET http://sov-test-1.ru-central1.internal:9090/nifi-api/flowfile-queues/633b2b86-018c-1000-8b77-d77831cb7057/flowfiles/c7400263-7613-46f6-a359-e49442995196/content |jq

В ответе выводится содержимое Flowfile — текст, указанный как значение параметра "generate-ff-custom-text" при создании процессора GenerateFlowFile.

my custom message
ПРИМЕЧАНИЕ

Прочитать сообщения из топика Kafka, записанные при помощи процессора PublishKafka_2_6, можно следующими способами:

  • Прочитать данные из топика при помощи командной строки на хосте, где установлен сервис Kafka.

  • Прочитать данные из топика по интерфейсу RESTful с использованием предустановленного сервиса Kafka REST Proxy.

Именем топика является значение параметра "topic", заданное при создании процессора PublishKafka_2_6.

Остановка и завершение работы процессоров с использованием NiFi REST API

Остановка процессора

Аналогично запуску процессора для остановки процессора используется форма запроса PUT/processors/{id}/run-status.

Ниже представлен запрос для остановки процессора. В конечной точке указывается идентификатор процессора ("id"), полученный при создании процессора, в теле запроса в JSON-формате указываются статус "STOPPED" и текущая версия процессора.

$ curl   -X PUT   \
   http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632d5f0c-018c-1000-3d78-410a76f1f451/run-status  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "STOPPED","revision":{"version":23}}' | jq

Ответ на запрос содержит новый номер версии ("version": 24), новый статус ("runStatus": ""Stopped") и другие параметры процессора.

Завершение работы процессора

Для завершения работы процессора (удаления всех потоков и активных задач) используется форма запроса DELETE/processors/{id}/threads. Завершение работы можно выполнить только после полной остановки процессора.

Ниже представлен запрос для завершения работы процессора. В конечной точке указывается идентификатор процессора ("id"), полученный при создании процессора.

$ curl -X DELETE http://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632d5f0c-018c-1000-3d78-410a76f1f451/threads |jq

Ответ на запрос содержит новый номер версии ("version": 24) и другие параметры процессора.

Авторизация пользователя с использованием NiFi REST API

После настройки аутентификации подключение к NiFi по REST API выполняется только после авторизации пользователя. Сразу после настройки аутентификации авторизацию можно выполнить только для NiFi Initial Admin — первичного пользователя-администратора. Для авторизации необходимо получить специальный ключ (токен) и использовать его в каждом запросе NiFi REST API.

Для получения токена выполните команду:

$ curl --insecure 'https://sov-test-1.ru-central1.internal:9090/nifi-api/access/token' -H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' --data 'username=ldap-user&password=ldap-password' --compressed

где:

  • ldap-user — пользователь, зарегистрированый в Active Directory.

  • ldap-password — пароль, c которым пользователь зарегистрирован в Active Directory.

В результате выводится токен, содержащий данные пользователя в зашифрованном виде, например:

eyJraWQiOiI0NzU1NTFmYS1hZmQxLTQxNjAtYWJmOS0wM2E1YjRmZGJlYWUiLCJhbGciOiJQUzUxMiJ9.eyJzdWIiOiJwcGV0cm92IiwiYXVkIjoiTGRhcFByb3ZpZGVyIiwibmJmIjoxNzAyODkzNDM1LCJpc3MiOiJMZGFwUHJvdmlkZXIiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJwcGV0cm92IiwiZXhwIjoxNzAyOTM2NjM1LCJpYXQiOjE3MDI4OTM0MzUsImp0aSI6IjY2MTA1NGY5LWM5ZjctNGViMC1hNDMzLTdmNGYzMTVmOTMxOCJ9.kW8MPLNxKy8kCi33SQg-FKEcmIxmo3EWQlUX_mBStF9WnZ1MFAy2rSnPOaqNI6nXgV5nVXKNKYQpGrlzVUPjxImBh18Z2MgJ_YKeeJuvRpJZdZL4ejlgh62JCTuwgbS2m2swtGY2mk1_BjS4MwrTJL87dW4By6r73KKQAdVZ1XwwjIU2NHfDJDq5QVJzirRlyfNqrkscb_fE8jLnnLqFA3WCcD9DxUBnOhByoVKhSh-7qzYKuBS-sakYhCkv881XlFeVnjR3LYAsAYomqt68wkDbphAixqqDuQYKA1kEmxAlI2vWf8bdPZI9BF4CCRbDMTXCZWE2FXSrhkiJLd60Jki3FVLUt3DSoV39ne31l1FsOTFL3KnhwbiRYEhuVKQ8xAYDQfhHpU2O8y9_2bN3zRi5DHcpkEdI4a1IzqSnY8XaK8KqXYxI1c9VZEuYMuuUqRX9OIVxUtNDgs2-wDKNLw-8kOLCMZuyAff36Zvfgkp5tj7-vhJkbA07_5HOdh2FofxoeQjUwIFg4vCOXk1T38GxLuLuaCAea_YxbNP5EpU9elbwpX7dPisfr0tuCZnlawffvC_tLp9XQGLuDGBH-PEaBizttUmRDfsTEpT7Fz5WAO2QZkOFEtZYY7adEyjoAfVElz48suobLmtJiKsfc7l-mAx6upnmsIpSFmlyfv8

Далее, для авторизации пользователя, в каждом запросе NiFi REST API необходимо использовать полученный токен в заголовке -H 'Authorization: Bearer <token>'.

Например, запрос для запуска процессора авторизованным пользователем выглядит следующим образом:

$ curl   -X PUT   \
  https://sov-test-1.ru-central1.internal:9090/nifi-api/processors/632beead-018c-1000-cbec-e625a264e27a/run-status  \
  -H 'Authorization: Bearer eyJraWQiOiI1YzY3MzJhMS1iZDhjLTQ2MTktOTYwZi0wNDdiOWNhZjUyN2MiLCJhbGciOiJQUzUxMiJ9.eyJzdWIiOiJwcGV0cm92IiwiYXVkIjoiTGRhcFByb3ZpZGVyIiwibmJmIjoxNzAyODkxMDczLCJpc3MiOiJMZGFwUHJvdmlkZXIiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJwcGV0cm92IiwiZXhwIjoxNzAyOTM0MjczLCJpYXQiOjE3MDI4OTEwNzMsImp0aSI6IjFhZDMzODZmLWUyMTQtNDkyNS1hYWMyLTkzYjdmOGIyZDlhZCJ9.a3XbITwWxFJ3XKrL1kVfCW11gjXX7hn6kWE1wh9pdSud7Q9Ro0V6qWHQVNj3tsJhu4l0M9W13nzp-R6ysUK5vqoP3zy4XOgAjUXmxBONoPj_JJPVtwebNGzNUXDUV0oS0qXJctq7uWlpIr5145l2MLBVmAHRxD-n9L1fw6HVaRdqnlKU5wWDGdKJBHifmzDvYJjKeskMoeDq6SM9nIchsUYTNJxiMrZbKzWW6apequn6q5PHwCFm5HUwt3ThAVW0ojUoQWFutxN0p5DMfgKgC5JfogkctF_wRDGxvQLJSm82v-RnR0iZExHz1qVTy-EsJdF13GVyILs4ZxT_hzVkdnILeMFjT2aPviRdibXBFSfBk8abxfUsjkM6D1PzEBBgXtMizf1Cjrn-nxu7cLAPJMwKRM0CqZKdyhltuSftR5IM0XL1YF89azkj7R1ad90-4EaEVdCn4PXKD8ZipiTiX4nZVm1R2c6SJ4LSQ3iKZBCP7W7DuKjDA_WTUij9CoYNU_6hbWBK36gMkZ9RWYxBwS_cYXVL7SeGk0v50DsRPwwioztb_DM4Wzfwdsl6wxkVK1azsW0-UeELrhIQxjL0HmYyAD_Rc3w8erIELWE8CDCwhNJ40S3q4TCM3m-oy_DW55ObR1a1IDI4um_h6QIo5Cdr1BvipRNQbZvnWBJTozM'  \
  -H 'Content-Type: application/json'  \
  -d '{"state": "RUNNING","revision":{"version":2}}' | jq
ВНИМАНИЕ

Так как аутентификация в сервисе NiFi настраивается через активацию протокола SSL, в URL всех REST-запросов протокол http должен быть изменен на протокол https.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней