Обзор Schema Registry
Возможности Schema Registry
Schema Registry — централизованный репозиторий, использующийся для обеспечения согласованности форматов данных между производителями и потребителями сообщений в Kafka.
Основные функциональные возможности Schema Registry перечислены ниже:
-
Обеспечение сериализации и десериализации данных при помощи специальных сериализаторов, предоставляемых клиентам Kafka.
-
Взаимодействие с клиентами Kafka для записи и чтения сериализованных сообщений:
-
с отдельными менеджерами схем (например, подключаемый модуль Schema Registry Maven) как часть технологии CI/CD.
-
Регистрация и хранение схем данных для каждого субъекта.
-
Поддержка эволюции схем c хранением каждой версии с собственным идентификатором и возможностью проверки совместимости версий для каждого субьекта.
-
Обеспечение доступности для осуществления действий со схемами и субьектами по интерфейсу RESTful при помощи Schema Registry API.
Архитектура Schema Registry
Обзор
Основные понятия, используемые в Schema Registry:
-
Сериализация — процесс записи данных из формата-структуры, семантически понятной человеку, в поток/массив байтов (byte array) в формате двоичного кода для машинной обработки. Schema Registry поддерживает следующие форматы сериализации: JSON, AVRO, PROTOBUF.
Преимущества сериализации:
-
возможность использовать различные форматы данных в сообщениях Kafka;
-
возможность обмена данными между приложениями, реализованными на разных языках программирования;
-
уменьшение общего объема сохраненных данных.
-
-
Десериализация — восстановление структуры данных из потока байтов.
-
Сериализатор и десериализатор — это объекты (или классы), которые представляют собой набор функций и методов, обеспечивающих взаимодействие клиента со Schema Registry для обеспечения сериализации и десериализации. Существуют готовые встроенные сериализаторы и десериализаторы Kafka для разных форматов схем. Kafka API поддерживает также создание пользовательских сериализаторов и десериализаторов на основе интерфейса Serializer. Сериализатор определяется в конфигурации производителя при помощи параметров
key.serializer
и/илиvalue.serializer
. Десериализатор определяется в конфигурации потребителя при помощи параметровkey.deserializer
и/илиvalue.deserializer
. -
Cубъект — область, в которой могут развиваться схемы. Имя субьекта создается при первой регистрации схемы и используется:
-
при изменении (эволюции) схемы как пространство имен для различных версий;
-
для проверки совместимости версий схем;
-
для получения схемы из хранилища.
Существует три настраиваемых способа присвоения имени субъекту:
-
TopicNameStrategy
— настройка по умолчанию. Схемы, которые регистрируются производителями, будут зарегистрированы под именем топика с добавлением-key
или-value
(например, для топикаtopic
, у которого сериализуются только значения, субъект получит имяtopic-value
). -
RecordNameStrategy
— позволяет использовать разные схемы в одном топике, поскольку каждая отдельная запись будет в своей схеме. При этом придется использовать одну и ту же схему и одну и ту же версию во всех топиках кластера для этого конкретного типа записи, поскольку невозможно определить, к какому топику принадлежит запись. -
TopicRecordNameStrategy
— представляет собой комбинацию первых двух стратегий.
Имя субъекта, отличное от заданного по умолчанию, устанавливается при помощи параметра производителя при регистрации новой схемы —
key.subject.name.strategy
и/илиvalue.subject.name.strategy
. -
-
Схема — структура формата данных, подвергнутых сериализации. Cхема описывает данные, записываемые в топик, какой тип информации они содержат. Эта информация связывает производителей и потребителей. Существуют специальные требования к каждому формату, например, схема формата должна соответствовать требованиям к формату AVRO.
Ниже приведены примеры схем поддерживаемых форматов для одного и того же образца используемых данных —
{ "id":"1000", "amount":500 }
.JSON{ "type": "object", "properties": { "id": { "type": "string" }, "amount": { "type": "number" }
AVRO{ "type": "record", "name": "Transaction", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" } ] }
PROTOBUFsyntax = "proto3"; message MyRecord { string id = 1; float amount = 2; }
-
Совместимость схемы — соответствие новой версии схемы предыдущим версиям для данного субъекта. Могут быть определены несколько типов совместимости в зависимости от того, какие данные были изменены или удалены. Типы совместимости являются, по сути, шаблонами того, как может эволюционировать схема, чтобы ее новая версия успешно могла быть обработана производителями и потребителями. Тип совместимости определяется при помощи параметра сервера Schema Registry schema.compatibility.level, который указывается в файле schema-registry.properties.
Кластер Schema Registry
Каждый сервер Schema Registry является, по сути, сервером REST API. Серверы Schema Registry предоставляют API для управления схемами, а также сериализаторы и десериализаторы для чтения и записи данных в соответствии со схемами.
Все cерверы Schema Registry в составе кластера ADS равноправно осуществляют обслуживание процессов сериализации и имеют доступ к хранилищу схем. В производственных средах рекомендуется указывать URL всех cерверов Schema Registry.
Производители
Ниже на рисунке показано, как работают производители Kafka при сериализации и записи сообщений с использованием Schema Registry.
Для сериализации и записи сериализованного сообщения в Kafka к экземпляру производителя подключается сериализатор, предоставляемый Schema Registry, тип которого определяет конфигурация производителя. Сериализатор взаимодействует с сервером Schema Registry по HTTP-протоколу.
Производитель передает сообщение сериализатору. Сериализатор определяет, существует ли идентификатор схемы для данного субъекта в кеш-памяти производителя. Сериализатор получает имя субъекта на основе нескольких стратегий при первом подключении экземпляра производителя к Schema Registry. При необходимости имя субъекта может быть указано явно.
Если идентификатор схемы отсутствует в кеш-памяти производителя, сериализатор регистрирует схему в Schema Registry (или получает последнюю версию схемы для данного субьекта) и сохраняет полученный ID (идентификатор) схемы и саму схему в кеш-память производителя. При регистрации Schema Registry записывает новую схему в специальный топик _schemas
на брокере Kafka.
Перед сериализацией данные проверяются на соответствие схеме. Любое несоответствие между данными и схемой приведут к ошибке во время сериализации.
После получения ID схемы сериализатор записывает в топик Kafka сериализованные данные в соответствии с форматом сообщения, при этом добавляя перед ними специальный magic byte (номер версии формата сериализации) и ID схемы.
Если тот же экземпляр производителя сериализует новое сообщение для этого же субьекта, сериализатор обнаружит в кеш-памяти уже имеющийся ID схемы и схему и в соответствии с ней сериализует сообщение.
Для записи сообщения в новый топик при перезапуске производителя или при запросе производителя на изменение (эволюцию) схемы сериализатор выполняет новый запрос к Schema Registry для регистрации или получения схемы. При этом производится проверка на совместимость новой версии с предыдущими.
Для производителей, в том числе в приложениях Kafka Streams и коннекторах Kafka Connect, могут быть настроены следующие параметры, определяющие поведение системы при различиях между предварительно зарегистрированными и клиентскими схемами:
-
auto.register.schemas
— определяет, должен ли сериализатор пытаться зарегистрировать схему в Schema Registry. -
use.latest.version
— применяется только в том случае, если дляauto.register.schemas
установлено значениеfalse
. Если дляauto.register.schemas
установлено значениеfalse
, а дляuse.latest.version
установлено значениеtrue
, то вместо получения схемы для объекта, передаваемого клиенту для сериализации, Schema Registry будет использовать последнюю версию схемы в субъекте. -
latest.compatibility.strict
— значение по умолчанию равноtrue
, но это применимо только в том случае, еслиuse.latest.version
равноtrue
. Если оба свойства имеют значениеtrue
, во время сериализации выполняется проверка, чтобы убедиться, что последняя версия субъекта обратно совместима со схемой сериализуемого объекта. Если проверка не удалась, выдается ошибка. Если значениеlatest.compatibility.strict
равноfalse
, то для сериализации используется последняя версия субъекта без какой-либо проверки совместимости.
Подробно о нюансах настройки данных параметров можно прочитать в статье Handling differences between preregistered and client-derived schemas
Потребители
Ниже на рисунке показано, как работают потребители Kafka при десериализации и чтении сообщений с использованием Schema Registry.
Для десериализации сообщения в Kafka к экземпляру производителя подключается десериализатор, предоставляемый Schema Registry, тип которого определяет конфигурация потребителя. Десериализатор взаимодействует с сервером Schema Registry по HTTP-протоколу.
Потребитель получает данные из кластера Kafka (сериализованное сообщение) и передает их десериализатору. Десериализатор проверяет наличие magic byte и извлекает ID схемы из сообщения.
Затем десериализатор считывает идентификатор схемы и проверяет, существует ли соответствующая схема в кеш-памяти потребителя. Если существует, десериализация происходит с этой схемой. В противном случае десериализатор извлекает схему из Schema Registry на основе идентификатора схемы. Как только схема готова, десериализатор приступает к десериализации.
Если сериализованные данные не соответствуют полученной схеме (например, отсутствуют некоторые поля), десериализация завершится неудачно и возникнет ошибка.
Для потребителей при сериализации имеет значение параметр use.latest.version
, определяющий поведение системы при различиях между предварительно зарегистрированными и клиентскими схемами.
Schema Registry в ADS
Подключение
После добавления и установки сервиса Schema Registry в составе кластера ADS вы можете подключиться к серверу Schema Registry, используя его URL в формате http://<schema-registry>:<schema-registry-port>
с указанием хоста и порта, приведенных на странице информации о сервисе в интерфейсе ADCM.

Kafka
В ADS сервис Schema Registry может быть установлен только после установки сервиса Kafka. После установки Schema Registry в конфигурационном файле /etc/schema-registry/schema-registry.properties автоматически устанавливается параметр kafkastore.bootstrap.servers
для связи с брокером Kafka (расположение служебного топика _schemas
).
Для тестовых целей могут быть использованы скрипты, предназначенные для работы Kafka с сериализацией, например, kafka-avro-console-producer. После установки сервиса Schema Registry такие скрипты размещены в папке /usr/lib/schema-registry/bin/.
Ниже приведен пример запуска производителя с указанием схемы сообщения и потребителя (в данном примере Schema Registry и Kafka установлены на одном хосте).
Запись сообщения:
$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic topic-avro --property value.schema='{"type":"record","name":"message","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}'
Для записи сообщения не выводится >
, как в работе с обычными скриптами Kafka:
{ "id":"1000", "amount":500 }
Чтение сообщений из топика:
$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic topic-avro --property schema.registry.url=http://localhost:8081
ksqlDB
Сервис Schema Registry, установленный в ADS одновременно c сервисом ksqlDB, позволяет выполнять сериализацию и десериализацию данных при работе с потоками и таблицами в ksqlDB. После установки сервиса ksqlDB параметры key.converter.schema.registry.url
и value.converter.schema.registry.url
, отвечающие за взаимодействие ksqlDB и Schema Registry, можно установить на стороне ksqlDB (в конфигурационном файле /etc/ksqldb/connect.properties).
Kafka Connect
Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность преобразования данных из внутренних типов данных, используемых Kafka Connect, в данные формата AVRO и наоборот.
Для работы с данными в формате AVRO в конфигурации Kafka Connect необходимо установить специальный AvroConverter в качестве параметра key.converter
и/или value.converter
, а также для параметра key.converter.schema.registry.url
и/или value.converter.schema.registry.url
установить URL сервера Schema Registry.
Kafka REST Proxy
Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka REST Proxy, позволяет выполнять сериализацию и десериализацию данных при работе с топиками Kafka по интерфейсу RESTful. После установки обоих сервисов параметр schema.registry.url
автоматически устанавливается в конфигурационном файле Kafka REST Proxy /etc/kafka-rest/kafka-rest.properties.
Используя встроенные типы схем AVRO, PROTOBUF и JSON, вы можете напрямую встраивать данные в нужном формате вместе со схемой (или идентификатором схемы) в запрос, определяя соответствующий тип контента в пользовательских заголовках:
$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.avro.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
NiFi
Некоторые службы контроллера NiFi, например, ConfluentSchemaRegistry, предоставляют доступ к схемам, хранящимся на сервере Schema Registry, указанном в параметрах службы.
Настройка Schema Registry
Настройка параметров Schema Registry в интерфейсе ADCM выполняется на странице конфигурирования сервиса Schema Registry.
Для настройки параметров конфигурационного файла /etc/schema-registry/schema-registry.properties переведите в активное состояние переключатель Show advanced, раскройте узел schema-registry.properties и введите новые значения для параметров. Для изменения параметров Schema Registry, отсутствующих в интерфейсе ADCM, используйте поле Add key,value. Выберите Add property и введите наименование параметра и его значение.
После изменения параметров при помощи интерфейса ADCM перезагрузите сервис Schema Registry. Для этого примените действие Restart, нажав на иконку
в столбце Actions.