Интеграция NiFi и Kafka
Обзор
Начиная с ADS 3.9.1.1.b1 существует возможность автоматической настройки интеграции между NiFi и Kafka/Schema Registry при помощи сервисного действия Kafka integration, запускаемого для NiFi.
После запуска действия в зависимости от текущей топологии кластера автоматически создаются или обновляются:
-
контекст параметров
platform-integration, содержащий параметры для подключения к Kafka и Schema Registry; -
экземпляр сервиса контроллера ConfluentSchemaRegistry, настроенный для подключения к Schema Registry и связанный с SSL-контекстом;
-
экземпляр сервиса контроллера StandardSSLContextService, отражающий текущие настройки безопасности (SSL-контекст).
Ниже рассмотрены примеры интеграции с Kafka для нескольких случаев топологии кластера.
Базовое подключение к Kafka
Для описанного в разделе подключения в ADS установлены сервисы NiFi, Kafka, ZooKeeper. Дополнительные настройки безопасности отсутствуют.
Созданный во время выполнения действия Kafka integration контекст параметров platform-integration отображается в окне Parameter Contexts, вызываемом в глобальном меню интерфейса NiFi Server.
При нажатии на иконку можно открыть содержимое контекста параметров. Сейчас он содержит только созданный параметр
bootstrap_servers, отражающий список серверов Kafka.
Для работы с созданным контекстом platform-integration распространите его для корневой группы процессов. Также распространить контекст можно для любой группы процессов через окно Configure, вызываемое в контекстном меню группы.
Параметр bootstrap_servers может быть применен в качестве значения для свойства Kafka Brokers у процессоров, публикующих в Kafka или читающих из нее.
Ниже показано применение параметра bootstrap_servers для процессора ConsumeKafka.
После настройки свойства Kafka Brokers процессор автоматически подключается к серверам Kafka в соответствии с актуальной топологией кластера ADS.
Подключение к Kafka и Schema Registry
Для подключения, описанного в разделе, в ADS установлены сервисы NiFi, Kafka, Schema Registry, ZooKeeper. Дополнительные настройки безопасности отсутствуют.
Во время выполнения действия Kafka integration в NiFi создаются объекты:
-
Kонтекст параметров
platform-integration, содержащий параметрыbootstrap_serversиschema_registry_url, которые отражают текущую топологию кластера.
Параметры, созданные ADCM
Параметры, созданные ADCMРаспространите контекст параметров на группу процессов для использования параметров при настройке процессоров и сервисов контроллера.
Параметр
schema_registry_urlможет быть использован при настройке сервисов, использующих сериализацию и десериализацию данных для указания места хранения схем, например, сервисов CSVRecordSetWriter, JsonRecordSetWriter и других. -
Экземпляр сервиса контроллера ConfluentSchemaRegistry. Сервис контроллера отображается на вкладке CONTROLLER SERVICES окна Configure, вызываемого в контекстном меню группы.
Сервис контроллера на странице CONTROLLER SERVICES
Сервис контроллера на странице CONTROLLER SERVICESПри нажатии на иконку
открываются страница конфигурации контроллера, где на вкладке RPOPERTIES в качестве значения параметра Schema Registry URLs установлено значение, соответствующее параметру
schema_registry_urlи текущей топологии кластера.
Параметры ConfluentSchemaRegistry
Параметры ConfluentSchemaRegistryСозданный и автоматически настроенный экземпляр сервиса контроллера ConfluentSchemaRegistry взаимодействует с хранилищем схем сервиса Schema Registry для извлечения и использования схем в NiFi.
ConfluentSchemaRegistry может быть использован при чтении сериализованных сообщений из Kafka при помощи процессора ConsumeKafkaRecord, если схема для сообщений хранится в Schema-Registry. Один из вариантов настройки такого подключения:
-
Для ConsumeKafkaRecord в качестве значения Value Record Reader используется AvroReader, который десериализует данные Avro и возвращает каждую запись Avro как отдельный объект Record.
-
Для AvroReader в качестве значения Schema Access Strategy выбирается значение
Confluent Content-Encoded Schema Reference, тогда для параметра Schema Registry может быть установлен автоматически созданный экземпляр сервиса ConfluentSchemaRegistry.
-
Подключение к Kafka и Schema Registry с SSL
Для подключения, описанного в разделе, в ADS выполнено:
Во время следующего выполнения действия Kafka integration в NiFi выполняется:
-
Создание экземпляра сервиса контроллера StandardSSLContextService с конфигурациями SSL, соответствующими настройкам безопасности кластера.
Параметры StandardSSLContextService
Параметры StandardSSLContextService+Этот сервис контроллера может быть указан в качестве значения параметра SSL Context Service для процессоров, подключающихся к Kafka, например, ConsumeKafkaRecord, использование которого описано выше.
-
Обновление объектов:
-
Kонтекст параметров
platform-integration, который содержит отражающие текущую топологию кластера параметрыbootstrap_serversиschema_registry_url. -
ConfluentSchemaRegistry, подключенный к StandardSSLContextService для связи с SSL-контекстом.
-
Изменение топологии кластера
Для подключения, описанного в разделе, выполнено:
-
Установлены сервисы NiFi, Kafka, Schema Registry, ZooKeeper.
-
Выполнена интеграция с Kafka и Schema-Registry, созданы
platform-integrationи ConfluentSchemaRegistry, как описано выше. -
Для изменения топологии кластера:
-
B Kafka при помощи действия Add/Remove components добавлены новые компоненты Kafka Broker.
-
В Schema-Registry при помощи действия Add/Remove components изменен хост для компонента Schema-Registry.
-
Во время следующего выполнения действия Kafka integration в NiFi выполняется:
-
Обновление параметров
bootstrap_serversиschema_registry_urlв контексте параметровplatform-integration. -
Обновление конфигурации сервиса контроллера ConfluentSchemaRegistry.
После обновления идентификаторы объектов в потоке NiFi не изменены.