Обзор Kafka Connect
Возможности Kafka Connect
Kafka Connect — инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Работа Kafka Connect основана на создании специальных коннекторов, перемещающих данные в Kafka или наборот.
Основные преимущества Kafka Connect перечислены ниже:
-
Копирование данных (создание коннекторов и задач, управление ими) выполняется при помощи REST-интерфейса.
-
Широкие возможности масштабируемости: изменение количества исполнителей, подключение новых коннекторов к имеющимся платформам, управление количеством запущенных задач.
-
Автоматическое управление смещением данных при работе коннекторов.
-
Широкий выбор уже существующих плагинов коннекторов, а также возможность создать свой для нужд вашей системы.
-
Интеграция потоковой/пакетной обработки данных.
Архитектура Kafka Connect
Архитектура Kafka Connect описана ниже для случая, когда требуется перенос данных из внешней системы (source) в Kafka. Для копирования данных осуществляется запуск source-коннектора в кластере Kafka Connect с двумя компонентами Kafka Connect Worker и двумя задачами, созданными коннектором.
Кластер Kafka Connect состоит из нескольких исполнителей (worker).
Для создания экземпляра коннектора (connector instance) запрос может быть направлен в REST API любого из исполнителей. Созданный коннектор подключается к хранилищу данных, создает необходимое (указанное в конфигурации коннектора) количество задач (task) и равномерно разделяет объем копируемых данных между ними. После создания коннектора исполнители (участники кластера) запускают созданные задачи, а также равномерно перебалансируют все созданные коннекторы и задачи так, чтобы каждый исполнитель имел равную нагрузку.
Данные, копируемые каждой задачей, в процессе прохождения данных через Kafka Connect могут быть изменены при помощи модуля преобразования одиночных сообщений (Single Message Transform, SMT). Необходимые преобразования данных указываются в конфигурации коннектора при создании.
В процессе прохождения через Kafka Connect данные перед записью в Kafka подвергаются сериализации при помощи специального конвертера (converter). Тип конвертера может быть указан для всего сервиса Kafka Connect, а также для каждого конкретного коннектора.
Worker
Исполнитель (worker) — контейнер, запускающий коннекторы и выполняющий задачи. Каждый исполнитель представляет собой Java-процесс, выполняемый внутри JVM.
Работа исполнителя может осуществляться в двух режимах:
-
Автономный режим (standalone mode) — режим работы Kafka Connect, когда используется только один исполнитель. При этом все данные о коннекторах хранятся внутри Kafka Connect. Автономный режим имеет ограниченную функциональность и не обеспечивает отказоустойчивость.
-
Распределенный режим (distributed mode) — режим, при котором используется несколько исполнителей. Для объединения нескольких исполнителей в кластер используется параметр group.id. В данном режиме для создания, настройки и управления коннекторами используется REST API каждого исполнителя. Запросы могут быть отправлены любому члену кластера.
В распределенном режиме исполнители выполняют функции:
-
Обработка REST-запросов для создания и настройки коннекторов при помощи специальных плагинов коннекторов.
-
Запись и хранение данных о коннекторах в специальных топиках Kafka, определяемых следующими параметрами:
-
config.storage.topic — наименование топика, который хранит настройки коннекторов.
-
offset.storage.topic — наименование топика, который хранит смещения для данных, уже отработанных коннекторами. Подробнее об этои описано ниже в описании архитектуры задачи.
-
status.storage.topic — наименование топика, который хранит информацию о состоянии коннекторов.
Для сохранения данных в брокере Kafka исполнители используют API Kafka.
-
-
Управление коннекторами и задачами.
-
Обеспечение масштабируемости и автоматической отказоустойчивости для Kafka Connect:
-
при добавлении исполнителей задачи перераспределяются между всеми исполнителями;
-
при сокращении количества исполнителей (управляемом или в случае сбоя) выполняется перебалансировка задач между доступными исполнителями.
-
Параметры исполнителей Kafka Connect для распределенного режима устанавливаются в конфигурационном файле connect-distributed.properties.
Ниже на рисунке показан пример распределения задач между несколькими исполнителями и взаимодействие исполнителя с Kafka.
Connector
Экземпляр коннектора (connector instance) — это логическое задание, осуществляющее копирование сообщений из входного потока в выходной поток, при этом топик (или несколько топиков) Kafka являются источником данных или целевой точкой копирования. В зависимости от того, чем является Kafka в этом процессе, определяется вид коннектора:
-
Source-коннектор — коннектор, который взаимодействует с API системы-источника данных, извлекает данные и схему данных и передает их в Kafka. Такие коннекторы могут принимать целые базы данных и записывать новые данные в топики Kafka при обновлениях таблиц.
-
Sink-коннектор — коннектор, который принимает данные из Kafka и записывает их в целевую систему с помощью своего API. Такие коннекторы могут доставлять данные из топиков Kafka в такие системы, как Elasticsearch или Hadoop.
Ниже на рисунке показан процесс создания экземпляра коннектора.
Исполнитель создает и запускает экземпляр коннектора на основании REST-запроса. Для создания коннектора необходимо следующее:
-
Плагин коннектора — JAR-файл, содержащий исполняемые class-файлы Java. Плагин коннектора не читает и не записывает сообщения в Kafka, а предоставляют архитектуру интерфейса между Kafka и внешней системой. Место хранения плагинов определяет параметр Kafka Connect plugin.path.
Классы, которые должен содержать плагин:
-
Connector class — класс коннектора, основанный на интерфейсе SourceConnector или SinkConnector, используется для создания самого экземпляра коннектора.
-
Task class — класс задачи, основанный на интерфейсе SourceTask или SinkTask, используется для создания экземпляра задачи. Методы, использующиеся в данных классах, обеспечивают взаимодействие экземпляра коннектора с Kafka через API Kafka (осуществляют запись или чтение сообщений).
-
-
набор конфигурационных параметров коннектора — параметры source-коннектора или sink-коннектора, указывающие источник и цель копирования данных, количество задач, параметры сериализации и преобразования и т.д.
Экземпляр коннектора выполняет:
-
мониторинг входных данных на предмет изменений и уведомление среды выполнения Kafka Connect об этом для запуска задач;
-
создание необходимого количества экземпляров задач и разделение работы по копированию данных между задачами;
-
получение от исполнителей настроек для задач.
Минимальные параметры, которые необходимо предоставить в запросе для успешного создания коннектора:
-
name — уникальное имя коннектора. Это имя не должно повторяться для коннекторов, созданных в этом кластере.
-
connector.class — наименование класса коннектора.
-
tasks.max — максимальное количество потоков (задач), в которых может работать коннектор (по умолчанию —
1
).
Ниже описаны примеры создания коннектора при помощи нескольких технологий.
Каждый исполнитель Kafka Connect предоставлет собственный REST API для осуществления запросов на создание и управление коннекторами, а также управление задачами.
Например, запрос на создание простого коннектора FileStreamSource (копирует данные из локального файла в топик Kafka) может выглядеть следующим образом:
$ curl -X POST --data-binary "@source.json" -H "Content-Type: application/json" http://localhost:8083/connectors | jq
где @source.json
— файл, в котором находятся все параметры, описывающие коннектор, в JSON-формате:
{
"name":"FileStream-Kafka",
"config": {
"connector.class": "FileStreamSource",
"file": "/data/source.csv",
"topic": "data"
}
}
В среде ksqlDB коннекторы создаются при помощи специальных операторов CREATE CONNECTOR. Например, команда для создания Debezium-коннектора для связи с Postgres выглядит следующим образом:
CREATE SOURCE CONNECTOR `debezium` WITH (
'connector.class' = 'io.debezium.connector.postgresql.PostgresConnector',
'database.hostname' = 'postgres',
'database.port' = '5432',
'database.user' = 'postgres',
'database.password' = 'password',
'database.dbname' = 'postgres',
'database.server.name' = 'postgres',
'table.whitelist' = 'public.customers',
'topic.prefix' = 'postgres',
'transforms' = 'unwrap',
'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones' = 'false',
'transforms.unwrap.delete.handling.mode' = 'rewrite',
'plugin.name' = 'pgoutput',
'tasks.max' = '1'
);
Для ADS существует возможность создавать коннекторы на странице Kafka Connect пользовательского интерфейса ADS Control.
Доступны к использованию следующие плагины:
-
MirrorCheckpointConnector, MirrorHeartbeatConnector, MirrorSourceConnector — коннекторы, предназаченные для репликации топиков с использованием механизма Mirror Maker 2. Для получения информации о создании этих коннекторов при помощи ADS Control обратитесь к статье Mirror Maker 2 в ADS Control.
-
PostgreSQL — коннектор Debezium для PostgreSQL Server.
-
MS SQL — коннектор Debezium для MS SQL Server.
-
Iceberg Sink Connector — коннектор для записи данных из Kafka в таблицы Iceberg.
Task
Задача (task) — реализация копирования данных между внешней системой и Kafka. Каждый экземляр задачи создается внутри экземпляра коннектора при помощи специального класса задачи и параметров, установленных при создании коннектора.
Каждая задача выполняется в своем собственном потоке (Java thread).
Ниже на рисунке показано, как данные проходят через source-задачу в Kafka Connect.
Экземпляр задачи (SourceTask) для каждой записи создает экземпляр записи SourceRecord, который передает сообщение (ключ, значение) в платформу Kafka Connect (исполнителю) для записи в Kafka, при этом осуществляет для сообщения все преобразования (при необходимости) и сериализацию. Экземпляр записи передает исполнителю при помощи специальных полей информацию о том, где хранятся данные в Kafka (топик, kafkaPartition), а также может передавать данные об источнике записи при помощи методов sourcePartition
(передает источник записи, например, имя файла, имя таблицы и т.д.) и sourceOffset
(передает позицию в sourcePartition
). Эти данные могут быть использованы для возобновления копирования данных из источника с предыдущего положения в случае сбоев или перезапуска для обслуживания.
Сообщение и данные о смещении, переданные SourceRecord исполнителю, исполнитель записывает в топики Kafka при помощи Kafka API.
Сами задачи не имеют сохраненного в них состояния. Состояние задачи хранится в специальных топиках Kafka, определяемых параметрами config.storage.topic и status.storage.topic, и управляется соответствующим коннектором. Задачи можно запускать, останавливать или перезапускать в любое время при помощи REST API исполнителя.
Converter
Конвертер (converter) — готовый компонент, преобразующий форматы данных. Конвертеры предоставляют механизм преобразования данных из внутренних типов данных, используемых Kafka Connect, в типы данных, представленные как AVRO, PROTOBUF или JSON Schema.
Конвертеры выполняют:
-
сериализацию и десериализацию данных, подключая сериализаторы и десериализаторы из клиентских библиотек Kafka;
-
сохранение и извлечение схем AVRO, PROTOBUF или JSON из Schema Registry;
-
взаимодействие коннектора с сервисом Schema Registry для сохранения и извлечения схем.
Класс конвертера указывается в конфигурационных параметрах исполнителей Kafka Connect:
Также класс конвертера может быть определен отдельно для каждого коннектора при создании.
Ниже на рисунке показано, как осуществляется сериализация данных при использовании source-коннектора.
При первом подключении коннекторы создают схемы для входящих данных и при помощи конвертера регистрируют схему в Schema Registry. В процессе копирования данных конверторы извлекают нужную схему и сериализуют данные перед записью в топик Kafka.
Transform
Модуль преобразования одиночных сообщений (single message transform) — готовый компонент с простой логикой изменения одного сообщения.
SMT-преобразования не являются обязательными, а используются для изменения данных перед их записью в Kafka, а также для изменения данных, считанных из Kafka, перед их записью в sink.
Необходимые параметры для SMT-преобразования настраиваются при помощи конфигурационного параметра коннектора transforms.
Примеры использования SMT-преобразований:
-
Удаление при приеме данных таких полей, как личная информация (PII), если это указано системными требованиями.
-
Добавление информации метаданных, такой как происхождение, к данным, полученным через Kafka Connect.
-
Изменение типов данных полей.
-
Изменение названия топика для включения отметки времени.
-
Переименование полей.
Для более сложных преобразований, например, агрегации или объединения с другими топиками, рекомендуется использовать обработку потока в ksqlDB или Kafka Streams.
Обработка ошибок
Ниже описаны возможные варианты обработки ошибок:
-
Fail fast — стратегия обработки ошибок, при которой любая ошибка при работе задачи и коннекторов Kafka Connect приводит к сбою коннектора.
Такая стратегия настроена по умолчанию при помощи параметра коннектора errors.tolerance, если его значение равно
none
. Если поменять значение параметра наall
, при возникновении ошибки работа коннектора будет продолжена. При этом возможны два сценария работы с ошибками: игнорирование ошибок и перенаправление проблемных записей в очередь недоставленных сообщений (dead letter queue). -
Dead letter queue — очередь недоставленных сообщений, место хранения сообщений, во время обработки которых возникла ошибка. Недоставленные сообщения записываются в специальный лог. Настройка очереди недоставленных сообщений выполняется при помощи параметров:
-
errors.log.enable — если значение параметра равно
true
, проблемные записи, вызывающие ошибку, записываются в специальный лог. -
errors.log.include.messages — если значение параметра равно
true
, проблемные записи, вызывающие ошибку, записываются в специальный лог. При этом для записей также будут регистрироваться и метаданные (для sink-коннектора: топик, партиция, смещение и метка времени; для source-коннектора: ключ и значение (и их схемы), все заголовки, а также временная метка, топик Kafka, партиция Kafka, партиция и смещение источника).
-
Kafka Connect в ADS
Подключение
После добавления и установки сервиса Kafka Connect в составе кластера ADS вы можете подключиться к Kafka Connect, используя хост, где размещен компонент Kafka Connect Worker, и порт, указанный в качестве параметра rest.port (по умолчанию 8083
) в группе connect-distributed.properties на странице конфигурирования сервиса Kafka Connect.

Kafka
В ADS сервис Kafka Connect может быть установлен только после установки сервиса Kafka. После установки Kafka Connect в конфигурационном файле /etc/kafka-connect/config/connect-distributed.properties автоматически устанавливается параметр bootstrap.servers
для связи с брокером Kafka, а также другие параметры для взаимодействия Kafka и Kafka Connect (например, для внутренних топиков Kafka Connect, создаваемых в Kafka).
Schema Registry
Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность преобразования данных из внутренних типов данных, используемых Kafka Connect, в данные формата AVRO, PROTOBUF или JSON и наоборот.
Например, для работы с данными в формате AVRO в конфигурационном файле connect-distributed.properties необходимо установить специальный AvroConverter в качестве параметра key.converter
и/или value.converter
, а также для параметра key.converter.schema.registry.url
и/или value.converter.schema.registry.url
установить URL сервера Schema Registry.
ksqlDB
Сервис ksqlDB, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность управлять коннекторами Kafka Connect. Настройка взаимодействия двух сервисов выполняется на стороне cервиса ksqlDB. Подробнее см. в статье Обзор ksqlDB.
Настройка
Настройка параметров Kafka Connect в интерфейсе ADCM выполняется на странице конфигурирования сервиса Kafka Connect.
Для настройки параметров конфигурационного файла /etc/kafka-connect/config/connect-distributed.properties переведите в активное состояние переключатель Show advanced, раскройте узел connect-distributed.properties и введите новые значения для параметров. Для изменения параметров Kafka Connect, отсутствующих в интерфейсе ADCM, используйте поле Add key,value. Выберите Add property и введите наименование параметра и его значение.
После изменения параметров при помощи интерфейса ADCM перезагрузите сервис Kafka Connect. Для этого примените действие Restart, нажав на иконку
в столбце Actions.