Kafka Connect

Kafka Connect – компонент Apache Kafka с открытым исходным кодом, является основой для подключения Kafka к внешним системам, таким как базы данных, хранилища key-value, поисковые индексы и файловые системы. С Kafka Connect можно использовать существующие реализации коннекторов для перемещения данных в сервис Kafka и из него:

  • Source Connector – принимает базы данных и обновляет таблицы потоков для топиков Kafka. Также собирает метрики со всех серверов приложений в топики Kafka, делая данные доступными для потоковой обработки с низкой задержкой;
  • Sink Connector – доставляет данные из топиков Kafka во вторичные индексы, такие как Elasticsearch, или в пакетные системы для автономного анализа, такие как Hadoop.

Kafka Connect ориентирован на потоковую передачу данных из сервиса Kafka и в него, что упрощает написание высококачественных, надежных и высокопроизводительных плагинов. Это также позволяет фреймворку давать гарантии, которые трудно достичь с помощью других структур. Kafka Connect является неотъемлемым компонентом конвейера ETL в сочетании с сервисом Kafka и потоковой обработкой.

Kafka Connect может работать либо как автономный процесс для выполнения заданий на одной машине (например, сбор журналов), либо как распределенный, масштабируемый, отказоустойчивый сервис, поддерживающий всю структуру. Это позволяет сократить масштаб до разработки, тестирования и небольших продуктовых развертываний с низким барьером для входа и низкими эксплуатационными накладными расходами, а также увеличить масштаб поддержки конвейера данных большой организации.

Основные преимущества использования Kafka Connect:

  • Data Centric Pipeline – использование значимых абстракций данных для извлечения или передачи данных в Kafka;
  • Flexibility and Scalability (гибкость и масштабируемость) – работа с потоковыми и пакетно-ориентированными системами на одном узле или масштабирование до сервиса по всей ширине организации;
  • Reusability and Extensibility (повторное использование и расширяемость) – использование существующих коннекторов и возможность расширения их для адаптации к конкретным потребностям и сокращения времени на разработку.

Connectors & Tasks

Копирование данных между сервисом Kafka и сторонней системой осуществляется посредством создаваемых пользователями инстансов Kafka Connectors. Коннекторы бывают двух видов: SourceConnectors – импортируют данные из другой системы, и SinkConnectors – экспортируют данные в другую систему. Например, JDBCSourceConnector импортирует реляционную базу данных в Kafka, а HDFSSinkConnector экспортирует содержимое топика Kafka в файлы HDFS.

Реализации класса Connector не выполняют копирование данных самостоятельно: их конфигурация описывает набор данных для копирования, и Connector отвечает за разбиение этого задания на набор задач – Tasks, которые могут быть распределены между объектами Kafka Connect. Tasks также бывают двух видов: SourceTask и SinkTask. При необходимости реализация класса Connector может отслеживать изменения данных внешних систем и запрашивать реконфигурацию задачи.

С назначением данных, которые должны быть скопированы, каждая задача Task должна скопировать свое подмножество данных в сервис Kafka или из него. Данные, которые копирует коннектор, должны быть представлены как партиционированный поток, аналогично модели топика Kafka, где каждая партиция представляет собой упорядоченную последовательность записей со смещениями. Каждой задаче назначается подмножество партиций для обработки. Порой это сопоставление очевидно: каждый файл в наборе файлов журнала можно считать партицией, каждую строку в файле – записью, а смещения – просто позициии в файле. В иных случаях сопоставление с моделью требует больше усилий: коннектор JDBC может сопоставить каждую таблицу с партицией, но смещение менее ясно. Один из возможных вариантов сопоставления это использовать в качестве смещения последнюю запрашиваемую отметку времени при генерации запросов.

Source Connector, создавший две задачи, которые копируют данные из входных партиций и записывают в сервис Kafka, приведен на Рис.149..

../../_images/DeveloperKafka_Connect_Example-connector-model.png

Рис. 149. Пример реализации Source Connector

Partitions & Records

Каждая партиция представляет собой упорядоченную последовательность записей ключ-значение, где и ключи, и значения могут иметь сложные структуры. Поддерживаются многие примитивные типы, а также массивы, структуры и вложенные структуры данных. Для большинства типов можно напрямую использовать стандартные типы Java, такие как java.lang.Integer, java.lang.Map и java.lang.Collection. Для структурированных записей следует использовать класс Struct.

На Рис.150. представлен партиционированный поток: модель данных, в которой коннекторы сопоставляют все системы source и sink. Каждая запись содержит ключи и значения (со схемами), идентификатор партиции и смещения в ней.

../../_images/DeveloperKafka_Connect_data-model.png

Рис. 150. Пример партиционированного потока

Для отслеживания структуры и совместимости записей в партициях схемы (Schemas) могут быть включены в каждую запись. Поскольку схемы обычно генерируются “на лету” на основе источника данных, класс SchemaBuilder включен, что делает их построение очень простым.

Schemas:Определение абстрактного типа данных. Типы данных могут быть примитивными типами (целочисленные типы, типы с плавающей запятой, логические, строки и байты) или сложными типами (типизированные массивы, карты с одной схемой ключей и схемами значений, а также структурами, которые имеют фиксированный набор имен полей, каждый из которых имеет схема связанных значений). Любой тип может быть указан как необязательный, что позволяет его опускать (в результате чего значения отсутствуют) и может указывать значение по умолчанию.

Такой формат данных среды выполнения не предполагает какого-либо конкретного формата сериализации; это преобразование осуществляется с помощью Converter, которые обрабатывают формат времени выполнения org.apache.kafka.connect.data и сериализованные данные byte[].

Converter:Интерфейс конвертера обеспечивает поддержку перевода между форматом данных выполнения Kafka Connect и byte[]. Внутренне это включает промежуточный шаг к формату, используемому слоем сериализации (например, JsonNode, GenericRecord, Message).

В дополнение к ключу и значению записи имеют идентификаторы партиций и смещения, которые используются фреймворком для периодической фиксации смещений обработанных данных. В случае сбоя обработка может возобновиться с последнего зафиксированного смещения, что позволяет избежать повторной обработки и дублирования событий.