Интеграция NiFi и Kafka

Обзор

Начиная с ADS 3.9.1.1.b1 существует возможность автоматической настройки интеграции между NiFi и Kafka/Schema Registry при помощи сервисного действия Kafka integration, запускаемого для NiFi.

После запуска действия в зависимости от текущей топологии кластера автоматически создаются или обновляются:

  • контекст параметров platform-integration, содержащий параметры для подключения к Kafka и Schema Registry;

  • экземпляр сервиса контроллера ConfluentSchemaRegistry, настроенный для подключения к Schema Registry и связанный с SSL-контекстом;

  • экземпляр сервиса контроллера StandardSSLContextService, отражающий текущие настройки безопасности (SSL-контекст).

Принципы и ограничения Kafka integration

 
Каждый запуск действия Kafka integration влияет только на перечисленные объекты NiFi в соответствии со следующими принципами:

  • Значения полей созданных объектов при каждом следующем запуске действия перезаписываются на актуальные для кластера, даже если пользователем в них были внесены изменения.

  • В случае удаления сервиса Schema Registry из кластера, при следующем запуске действия параметр schema_registry_url в контексте параметров и сервис контроллера ConfluentSchemaRegistry удаляются.

  • Действие обновляет поле версии каждого объекта ("revision": {"version"}) в соответствии с требованиями API NiFi.

  • Действие обновляет конфигурацию существующих объектов с сохранением идентификаторов (id) без удаления и последующего создания. Отсутствующие объекты создаются заново.

Объекты, не изменяющиеся во время действия:

  • Параметры, специфичные для пользователя (топики, принципалы, учетные данные).

  • Созданные процессоры и потоки данных.

Ниже рассмотрены примеры интеграции с Kafka для нескольких случаев топологии кластера.

Базовое подключение к Kafka

Для описанного в разделе подключения в ADS установлены сервисы NiFi, Kafka, ZooKeeper. Дополнительные настройки безопасности отсутствуют.

Созданный во время выполнения действия Kafka integration контекст параметров platform-integration отображается в окне Parameter Contexts, вызываемом в глобальном меню интерфейса NiFi Server.

Контекст параметров, созданный ADCM
Контекст параметров, созданный ADCM
Контекст параметров, созданный ADCM
Контекст параметров, созданный ADCM

При нажатии на иконку info можно открыть содержимое контекста параметров. Сейчас он содержит только созданный параметр bootstrap_servers, отражающий список серверов Kafka.

Параметр, созданный ADCM
Параметр, созданный ADCM
Параметр, созданный ADCM
Параметр, созданный ADCM

Для работы с созданным контекстом platform-integration распространите его для корневой группы процессов. Также распространить контекст можно для любой группы процессов через окно Configure, вызываемое в контекстном меню группы.

Параметр bootstrap_servers может быть применен в качестве значения для свойства Kafka Brokers у процессоров, публикующих в Kafka или читающих из нее.

Ниже показано применение параметра bootstrap_servers для процессора ConsumeKafka.

Применение параметра bootstrap_servers
Применение параметра bootstrap_servers
Применение параметра bootstrap_servers
Применение параметра bootstrap_servers

После настройки свойства Kafka Brokers процессор автоматически подключается к серверам Kafka в соответствии с актуальной топологией кластера ADS.

Подключение к Kafka и Schema Registry

Для подключения, описанного в разделе, в ADS установлены сервисы NiFi, Kafka, Schema Registry, ZooKeeper. Дополнительные настройки безопасности отсутствуют.

Во время выполнения действия Kafka integration в NiFi создаются объекты:

  • Kонтекст параметров platform-integration, содержащий параметры bootstrap_servers и schema_registry_url, которые отражают текущую топологию кластера.

    Параметры, созданные ADCM
    Параметры, созданные ADCM
    Параметры, созданные ADCM
    Параметры, созданные ADCM

    Распространите контекст параметров на группу процессов для использования параметров при настройке процессоров и сервисов контроллера.

    Параметр schema_registry_url может быть использован при настройке сервисов, использующих сериализацию и десериализацию данных для указания места хранения схем, например, сервисов CSVRecordSetWriter, JsonRecordSetWriter и других.

  • Экземпляр сервиса контроллера ConfluentSchemaRegistry. Сервис контроллера отображается на вкладке CONTROLLER SERVICES окна Configure, вызываемого в контекстном меню группы.

    Сервис контроллера на странице CONTROLLER SERVICES
    Сервис контроллера на странице CONTROLLER SERVICES
    Сервис контроллера на странице CONTROLLER SERVICES
    Сервис контроллера на странице CONTROLLER SERVICES

    При нажатии на иконку config открываются страница конфигурации контроллера, где на вкладке RPOPERTIES в качестве значения параметра Schema Registry URLs установлено значение, соответствующее параметру schema_registry_url и текущей топологии кластера.

    Параметры сервиса контроллера ConfluentSchemaRegistry
    Параметры ConfluentSchemaRegistry
    Параметры сервиса контроллера ConfluentSchemaRegistry
    Параметры ConfluentSchemaRegistry

    Созданный и автоматически настроенный экземпляр сервиса контроллера ConfluentSchemaRegistry взаимодействует с хранилищем схем сервиса Schema Registry для извлечения и использования схем в NiFi.

    ConfluentSchemaRegistry может быть использован при чтении сериализованных сообщений из Kafka при помощи процессора ConsumeKafkaRecord, если схема для сообщений хранится в Schema-Registry. Один из вариантов настройки такого подключения:

    • Для ConsumeKafkaRecord в качестве значения Value Record Reader используется AvroReader, который десериализует данные Avro и возвращает каждую запись Avro как отдельный объект Record.

    • Для AvroReader в качестве значения Schema Access Strategy выбирается значение Confluent Content-Encoded Schema Reference, тогда для параметра Schema Registry может быть установлен автоматически созданный экземпляр сервиса ConfluentSchemaRegistry.

Подключение к Kafka и Schema Registry с SSL

Для подключения, описанного в разделе, в ADS выполнено:

  • Установлены сервисы NiFi, Kafka, Schema Registry, ZooKeeper.

  • Выполнена интеграция с Kafka и Schema-Registry, созданы platform-integration и ConfluentSchemaRegistry, как описано выше.

  • Включен SSL.

Во время следующего выполнения действия Kafka integration в NiFi выполняется:

  • Создание экземпляра сервиса контроллера StandardSSLContextService с конфигурациями SSL, соответствующими настройкам безопасности кластера.

    Параметры сервиса контроллера StandardSSLContextService
    Параметры StandardSSLContextService
    Параметры сервиса контроллера StandardSSLContextService
    Параметры StandardSSLContextService+

    Этот сервис контроллера может быть указан в качестве значения параметра SSL Context Service для процессоров, подключающихся к Kafka, например, ConsumeKafkaRecord, использование которого описано выше.

  • Обновление объектов:

    • Kонтекст параметров platform-integration, который содержит отражающие текущую топологию кластера параметры bootstrap_servers и schema_registry_url.

    • ConfluentSchemaRegistry, подключенный к StandardSSLContextService для связи с SSL-контекстом.

Параметры сервиса контроллера ConfluentSchemaRegistry
Параметры ConfluentSchemaRegistry
Параметры сервиса контроллера ConfluentSchemaRegistry
Параметры ConfluentSchemaRegistry

Изменение топологии кластера

Для подключения, описанного в разделе, выполнено:

  • Установлены сервисы NiFi, Kafka, Schema Registry, ZooKeeper.

  • Выполнена интеграция с Kafka и Schema-Registry, созданы platform-integration и ConfluentSchemaRegistry, как описано выше.

  • Для изменения топологии кластера:

    • B Kafka при помощи действия Add/Remove components добавлены новые компоненты Kafka Broker.

    • В Schema-Registry при помощи действия Add/Remove components изменен хост для компонента Schema-Registry.

Во время следующего выполнения действия Kafka integration в NiFi выполняется:

  • Обновление параметров bootstrap_servers и schema_registry_url в контексте параметров platform-integration.

  • Обновление конфигурации сервиса контроллера ConfluentSchemaRegistry.

После обновления идентификаторы объектов в потоке NiFi не изменены.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней