Обзор Kafka Connect

Возможности Kafka Connect

Kafka Connect — инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Работа Kafka Connect основана на создании специальных коннекторов, перемещающих данные в Kafka или наборот.

Возможности Kafka Connect
Возможности Kafka Connect
Возможности Kafka Connect
Возможности Kafka Connect

Основные преимущества Kafka Connect перечислены ниже:

  • Копирование данных (создание коннекторов и задач, управление ими) выполняется при помощи REST-интерфейса.

  • Широкие возможности масштабируемости: изменение количества исполнителей, подключение новых коннекторов к имеющимся платформам, управление количеством запущенных задач.

  • Автоматическое управление смещением данных при работе коннекторов.

  • Широкий выбор уже существующих плагинов коннекторов, а также возможность создать свой для нужд вашей системы.

  • Интеграция потоковой/пакетной обработки данных.

Архитектура Kafka Connect

Архитектура Kafka Connect описана ниже для случая, когда требуется перенос данных из внешней системы (source) в Kafka. Для копирования данных осуществляется запуск source-коннектора в кластере Kafka Connect с двумя компонентами Kafka Connect Worker и двумя задачами, созданными коннектором.

Архитектура Kafka Connect
Архитектура Kafka Connect
Архитектура Kafka Connect
Архитектура Kafka Connect

Кластер Kafka Connect состоит из нескольких исполнителей (worker).

Для создания экземпляра коннектора (connector instance) запрос может быть направлен в REST API любого из исполнителей. Созданный коннектор подключается к хранилищу данных, создает необходимое (указанное в конфигурации коннектора) количество задач (task) и равномерно разделяет объем копируемых данных между ними. После создания коннектора исполнители (участники кластера) запускают созданные задачи, а также равномерно перебалансируют все созданные коннекторы и задачи так, чтобы каждый исполнитель имел равную нагрузку.

Данные, копируемые каждой задачей, в процессе прохождения данных через Kafka Connect могут быть изменены при помощи модуля преобразования одиночных сообщений (Single Message Transform, SMT). Необходимые преобразования данных указываются в конфигурации коннектора при создании.

В процессе прохождения через Kafka Connect данные перед записью в Kafka подвергаются сериализации при помощи специального конвертера (converter). Тип конвертера может быть указан для всего сервиса Kafka Connect, а также для каждого конкретного коннектора.

Worker

Исполнитель (worker) — контейнер, запускающий коннекторы и выполняющий задачи. Каждый исполнитель представляет собой Java-процесс, выполняемый внутри JVM.

Работа исполнителя может осуществляться в двух режимах:

  • Автономный режим (standalone mode) — режим работы Kafka Connect, когда используется только один исполнитель. При этом все данные о коннекторах хранятся внутри Kafka Connect. Автономный режим имеет ограниченную функциональность и не обеспечивает отказоустойчивость.

  • Распределенный режим (distributed mode) — режим, при котором используется несколько исполнителей. Для объединения нескольких исполнителей в кластер используется параметр group.id. В данном режиме для создания, настройки и управления коннекторами используется REST API каждого исполнителя. Запросы могут быть отправлены любому члену кластера.

В распределенном режиме исполнители выполняют функции:

  • Обработка REST-запросов для создания и настройки коннекторов при помощи специальных плагинов коннекторов.

  • Запись и хранение данных о коннекторах в специальных топиках Kafka, определяемых следующими параметрами:

    • config.storage.topic — наименование топика, который хранит настройки коннекторов.

    • offset.storage.topic — наименование топика, который хранит смещения для данных, уже отработанных коннекторами. Подробнее об этои описано ниже в описании архитектуры задачи.

    • status.storage.topic — наименование топика, который хранит информацию о состоянии коннекторов.

      Для сохранения данных в брокере Kafka исполнители используют API Kafka.

  • Управление коннекторами и задачами.

  • Обеспечение масштабируемости и автоматической отказоустойчивости для Kafka Connect:

    • при добавлении исполнителей задачи перераспределяются между всеми исполнителями;

    • при сокращении количества исполнителей (управляемом или в случае сбоя) выполняется перебалансировка задач между доступными исполнителями.

Параметры исполнителей Kafka Connect для распределенного режима устанавливаются в конфигурационном файле connect-distributed.properties.

Ниже на рисунке показан пример распределения задач между несколькими исполнителями и взаимодействие исполнителя с Kafka.

Kafka Connect workers
Kafka Connect workers
Kafka Connect workers
Kafka Connect workers

Connector

Экземпляр коннектора (connector instance) — это логическое задание, осуществляющее копирование сообщений из входного потока в выходной поток, при этом топик (или несколько топиков) Kafka являются источником данных или целевой точкой копирования. В зависимости от того, чем является Kafka в этом процессе, определяется вид коннектора:

  • Source-коннектор — коннектор, который взаимодействует с API системы-источника данных, извлекает данные и схему данных и передает их в Kafka. Такие коннекторы могут принимать целые базы данных и записывать новые данные в топики Kafka при обновлениях таблиц.

  • Sink-коннектор — коннектор, который принимает данные из Kafka и записывает их в целевую систему с помощью своего API. Такие коннекторы могут доставлять данные из топиков Kafka в такие системы, как Elasticsearch или Hadoop.

Ниже на рисунке показан процесс создания экземпляра коннектора.

Создание экземпляра коннектора
Создание экземпляра коннектора
Создание экземпляра коннектора
Создание экземпляра коннектора

Исполнитель создает и запускает экземпляр коннектора на основании REST-запроса. Для создания коннектора необходимо следующее:

  • Плагин коннектора — JAR-файл, содержащий исполняемые class-файлы Java. Плагин коннектора не читает и не записывает сообщения в Kafka, а предоставляют архитектуру интерфейса между Kafka и внешней системой. Место хранения плагинов определяет параметр Kafka Connect plugin.path.

    Классы, которые должен содержать плагин:

    • Connector class — класс коннектора, основанный на интерфейсе SourceConnector или SinkConnector, используется для создания самого экземпляра коннектора.

    • Task class — класс задачи, основанный на интерфейсе SourceTask или SinkTask, используется для создания экземпляра задачи. Методы, использующиеся в данных классах, обеспечивают взаимодействие экземпляра коннектора с Kafka через API Kafka (осуществляют запись или чтение сообщений).

  • набор конфигурационных параметров коннектора — параметры source-коннектора или sink-коннектора, указывающие источник и цель копирования данных, количество задач, параметры сериализации и преобразования и т.д.

Экземпляр коннектора выполняет:

  • мониторинг входных данных на предмет изменений и уведомление среды выполнения Kafka Connect об этом для запуска задач;

  • создание необходимого количества экземпляров задач и разделение работы по копированию данных между задачами;

  • получение от исполнителей настроек для задач.

Минимальные параметры, которые необходимо предоставить в запросе для успешного создания коннектора:

  • name — уникальное имя коннектора. Это имя не должно повторяться для коннекторов, созданных в этом кластере.

  • connector.class — наименование класса коннектора.

  • tasks.max — максимальное количество потоков (задач), в которых может работать коннектор (по умолчанию — 1).

Ниже описаны примеры создания коннектора при помощи нескольких технологий.

Kafka Connect REST API

 

Каждый исполнитель Kafka Connect предоставлет собственный REST API для осуществления запросов на создание и управление коннекторами, а также управление задачами.

Например, запрос на создание простого коннектора FileStreamSource (копирует данные из локального файла в топик Kafka) может выглядеть следующим образом:

$ curl -X POST --data-binary "@source.json" -H "Content-Type: application/json" http://localhost:8083/connectors | jq

где @source.json — файл, в котором находятся все параметры, описывающие коннектор, в JSON-формате:

{
    "name":"FileStream-Kafka",
    "config": {
        "connector.class": "FileStreamSource",
        "file": "/data/source.csv",
        "topic": "data"
    }
}
ksqlDB

 

В среде ksqlDB коннекторы создаются при помощи специальных операторов CREATE CONNECTOR. Например, команда для создания Debezium-коннектора для связи с Postgres выглядит следующим образом:

 CREATE SOURCE CONNECTOR `debezium` WITH (
    'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
    'database.hostname' = 'postgres',
    'database.port' = '5432',
    'database.user' = 'postgres',
    'database.password' = 'password',
    'database.dbname' = 'postgres',
    'database.server.name' = 'postgres',
    'table.whitelist' = 'public.customers',
    'topic.prefix' = 'postgres',
    'transforms' = 'unwrap',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.unwrap.drop.tombstones' = 'false',
    'transforms.unwrap.delete.handling.mode' = 'rewrite',
    'plugin.name' = 'pgoutput',
    'tasks.max' = '1'
 );
ADS Control

 

Для ADS существует возможность создавать коннекторы на странице Kafka Connect пользовательского интерфейса ADS Control.

Доступны к использованию следующие плагины:

  • MirrorCheckpointConnector, MirrorHeartbeatConnector, MirrorSourceConnector — коннекторы, предназаченные для репликации топиков с использованием механизма Mirror Maker 2. Для получения информации о создании этих коннекторов при помощи ADS Control обратитесь к статье Mirror Maker 2 в ADS Control.

  • PostgreSQL — коннектор Debezium для PostgreSQL Server.

  • MS SQL — коннектор Debezium для MS SQL Server.

  • Iceberg Sink Connector — коннектор для записи данных из Kafka в таблицы Iceberg.

Task

Задача (task) — реализация копирования данных между внешней системой и Kafka. Каждый экземляр задачи создается внутри экземпляра коннектора при помощи специального класса задачи и параметров, установленных при создании коннектора.

Каждая задача выполняется в своем собственном потоке (Java thread).

Ниже на рисунке показано, как данные проходят через source-задачу в Kafka Connect.

Kafka Connect task
Kafka Connect task
Kafka Connect task
Kafka Connect Task

Экземпляр задачи (SourceTask) для каждой записи создает экземпляр записи SourceRecord, который передает сообщение (ключ, значение) в платформу Kafka Connect (исполнителю) для записи в Kafka, при этом осуществляет для сообщения все преобразования (при необходимости) и сериализацию. Экземпляр записи передает исполнителю при помощи специальных полей информацию о том, где хранятся данные в Kafka (топик, kafkaPartition), а также может передавать данные об источнике записи при помощи методов sourcePartition (передает источник записи, например, имя файла, имя таблицы и т.д.) и sourceOffset (передает позицию в sourcePartition). Эти данные могут быть использованы для возобновления копирования данных из источника с предыдущего положения в случае сбоев или перезапуска для обслуживания.

Сообщение и данные о смещении, переданные SourceRecord исполнителю, исполнитель записывает в топики Kafka при помощи Kafka API.

Сами задачи не имеют сохраненного в них состояния. Состояние задачи хранится в специальных топиках Kafka, определяемых параметрами config.storage.topic и status.storage.topic, и управляется соответствующим коннектором. Задачи можно запускать, останавливать или перезапускать в любое время при помощи REST API исполнителя.

Converter

Конвертер (converter) — готовый компонент, преобразующий форматы данных. Конвертеры предоставляют механизм преобразования данных из внутренних типов данных, используемых Kafka Connect, в типы данных, представленные как AVRO, PROTOBUF или JSON Schema.

Конвертеры выполняют:

  • сериализацию и десериализацию данных, подключая сериализаторы и десериализаторы из клиентских библиотек Kafka;

  • сохранение и извлечение схем AVRO, PROTOBUF или JSON из Schema Registry;

  • взаимодействие коннектора с сервисом Schema Registry для сохранения и извлечения схем.

Класс конвертера указывается в конфигурационных параметрах исполнителей Kafka Connect:

Также класс конвертера может быть определен отдельно для каждого коннектора при создании.

Ниже на рисунке показано, как осуществляется сериализация данных при использовании source-коннектора.

Kafka Connect converter
Kafka Connect converter
Kafka Connect converter
Kafka Connect converter

При первом подключении коннекторы создают схемы для входящих данных и при помощи конвертера регистрируют схему в Schema Registry. В процессе копирования данных конверторы извлекают нужную схему и сериализуют данные перед записью в топик Kafka.

Transform

Модуль преобразования одиночных сообщений (single message transform) — готовый компонент с простой логикой изменения одного сообщения.

SMT-преобразования не являются обязательными, а используются для изменения данных перед их записью в Kafka, а также для изменения данных, считанных из Kafka, перед их записью в sink.

Необходимые параметры для SMT-преобразования настраиваются при помощи конфигурационного параметра коннектора transforms.

Примеры использования SMT-преобразований:

  • Удаление при приеме данных таких полей, как личная информация (PII), если это указано системными требованиями.

  • Добавление информации метаданных, такой как происхождение, к данным, полученным через Kafka Connect.

  • Изменение типов данных полей.

  • Изменение названия топика для включения отметки времени.

  • Переименование полей.

Для более сложных преобразований, например, агрегации или объединения с другими топиками, рекомендуется использовать обработку потока в ksqlDB или Kafka Streams.

Обработка ошибок

Ниже описаны возможные варианты обработки ошибок:

  • Fail fast — стратегия обработки ошибок, при которой любая ошибка при работе задачи и коннекторов Kafka Connect приводит к сбою коннектора.

    Такая стратегия настроена по умолчанию при помощи параметра коннектора errors.tolerance, если его значение равно none. Если поменять значение параметра на all, при возникновении ошибки работа коннектора будет продолжена. При этом возможны два сценария работы с ошибками: игнорирование ошибок и перенаправление проблемных записей в очередь недоставленных сообщений (dead letter queue).

  • Dead letter queue — очередь недоставленных сообщений, место хранения сообщений, во время обработки которых возникла ошибка. Недоставленные сообщения записываются в специальный лог. Настройка очереди недоставленных сообщений выполняется при помощи параметров:

    • errors.log.enable — если значение параметра равно true, проблемные записи, вызывающие ошибку, записываются в специальный лог.

    • errors.log.include.messages — если значение параметра равно true, проблемные записи, вызывающие ошибку, записываются в специальный лог. При этом для записей также будут регистрироваться и метаданные (для sink-коннектора: топик, партиция, смещение и метка времени; для source-коннектора: ключ и значение (и их схемы), все заголовки, а также временная метка, топик Kafka, партиция Kafka, партиция и смещение источника).

Kafka Connect в ADS

Подключение

После добавления и установки сервиса Kafka Connect в составе кластера ADS вы можете подключиться к Kafka Connect, используя хост, где размещен компонент Kafka Connect Worker, и порт, указанный в качестве параметра rest.port (по умолчанию 8083) в группе connect-distributed.properties на странице конфигурирования сервиса Kafka Connect.

Информация о сервисе Kafka Connect в интерфейсе ADCM
Информация о сервисе Kafka Connect в интерфейсе ADCM

Kafka

В ADS сервис Kafka Connect может быть установлен только после установки сервиса Kafka. После установки Kafka Connect в конфигурационном файле /etc/kafka-connect/config/connect-distributed.properties автоматически устанавливается параметр bootstrap.servers для связи с брокером Kafka, а также другие параметры для взаимодействия Kafka и Kafka Connect (например, для внутренних топиков Kafka Connect, создаваемых в Kafka).

Schema Registry

Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность преобразования данных из внутренних типов данных, используемых Kafka Connect, в данные формата AVRO, PROTOBUF или JSON и наоборот.

Например, для работы с данными в формате AVRO в конфигурационном файле connect-distributed.properties необходимо установить специальный AvroConverter в качестве параметра key.converter и/или value.converter, а также для параметра key.converter.schema.registry.url и/или value.converter.schema.registry.url установить URL сервера Schema Registry.

ksqlDB

Сервис ksqlDB, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность управлять коннекторами Kafka Connect. Настройка взаимодействия двух сервисов выполняется на стороне cервиса ksqlDB. Подробнее см. в статье Обзор ksqlDB.

Настройка

Настройка параметров Kafka Connect в интерфейсе ADCM выполняется на странице конфигурирования сервиса Kafka Connect.

Для настройки параметров конфигурационного файла /etc/kafka-connect/config/connect-distributed.properties переведите в активное состояние переключатель Show advanced, раскройте узел connect-distributed.properties и введите новые значения для параметров. Для изменения параметров Kafka Connect, отсутствующих в интерфейсе ADCM, используйте поле Add key,value. Выберите Add property и введите наименование параметра и его значение.

После изменения параметров при помощи интерфейса ADCM перезагрузите сервис Kafka Connect. Для этого примените действие Restart, нажав на иконку actions default dark actions default light в столбце Actions.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней