Обзор Schema Registry

Возможности Schema Registry

Schema Registry — централизованный репозиторий, использующийся для обеспечения согласованности форматов данных между производителями и потребителями сообщений в Kafka.

Возможности Schema Registry
Возможности Schema Registry
Возможности Schema Registry
Возможности Schema Registry

Основные функциональные возможности Schema Registry перечислены ниже:

  • Обеспечение сериализации и десериализации данных при помощи специальных сериализаторов, предоставляемых клиентам Kafka.

  • Взаимодействие с клиентами Kafka для записи и чтения сериализованных сообщений:

  • Регистрация и хранение схем данных для каждого субъекта.

  • Поддержка эволюции схем c хранением каждой версии с собственным идентификатором и возможностью проверки совместимости версий для каждого субьекта.

  • Обеспечение доступности для осуществления действий со схемами и субьектами по интерфейсу RESTful при помощи Schema Registry API.

Архитектура Schema Registry

Обзор

Основные понятия, используемые в Schema Registry:

  • Сериализация — процесс записи данных из формата-структуры, семантически понятной человеку, в поток/массив байтов (byte array) в формате двоичного кода для машинной обработки. Schema Registry поддерживает следующие форматы сериализации: JSON, AVRO, PROTOBUF.

    Преимущества сериализации:

    • возможность использовать различные форматы данных в сообщениях Kafka;

    • возможность обмена данными между приложениями, реализованными на разных языках программирования;

    • уменьшение общего объема сохраненных данных.

  • Десериализация — восстановление структуры данных из потока байтов.

  • Сериализатор и десериализатор — это объекты (или классы), которые представляют собой набор функций и методов, обеспечивающих взаимодействие клиента со Schema Registry для обеспечения сериализации и десериализации. Существуют готовые встроенные сериализаторы и десериализаторы Kafka для разных форматов схем. Kafka API поддерживает также создание пользовательских сериализаторов и десериализаторов на основе интерфейса Serializer. Сериализатор определяется в конфигурации производителя при помощи параметров key.serializer и/или value.serializer. Десериализатор определяется в конфигурации потребителя при помощи параметров key.deserializer и/или value.deserializer.

  • Cубъект — область, в которой могут развиваться схемы. Имя субьекта создается при первой регистрации схемы и используется:

    • при изменении (эволюции) схемы как пространство имен для различных версий;

    • для проверки совместимости версий схем;

    • для получения схемы из хранилища.

    Существует три настраиваемых способа присвоения имени субъекту:

    • TopicNameStrategy — настройка по умолчанию. Схемы, которые регистрируются производителями, будут зарегистрированы под именем топика с добавлением -key или -value (например, для топика topic, у которого сериализуются только значения, субъект получит имя topic-value).

    • RecordNameStrategy — позволяет использовать разные схемы в одном топике, поскольку каждая отдельная запись будет в своей схеме. При этом придется использовать одну и ту же схему и одну и ту же версию во всех топиках кластера для этого конкретного типа записи, поскольку невозможно определить, к какому топику принадлежит запись.

    • TopicRecordNameStrategy — представляет собой комбинацию первых двух стратегий.

    Имя субъекта, отличное от заданного по умолчанию, устанавливается при помощи параметра производителя при регистрации новой схемы — key.subject.name.strategy и/или value.subject.name.strategy.

  • Схема — структура формата данных, подвергнутых сериализации. Cхема описывает данные, записываемые в топик, какой тип информации они содержат. Эта информация связывает производителей и потребителей. Существуют специальные требования к каждому формату, например, схема формата должна соответствовать требованиям к формату AVRO.

    Ниже приведены примеры схем поддерживаемых форматов для одного и того же образца используемых данных — { "id":"1000", "amount":500 }.

    JSON
    {
      "type": "object",
      "properties": {
        "id": {
          "type": "string"
        },
        "amount": {
          "type": "number"
        }
    AVRO
    {
        "type": "record",
        "name": "Transaction",
        "fields": [
            {
                "name": "id",
                "type": "string"
            },
            {
                "name": "amount",
                "type": "double"
            }
        ]
    }
    PROTOBUF
    syntax = "proto3";
    
    message MyRecord {
      string id = 1;
      float amount = 2;
    }
  • Совместимость схемы — соответствие новой версии схемы предыдущим версиям для данного субъекта. Могут быть определены несколько типов совместимости в зависимости от того, какие данные были изменены или удалены. Типы совместимости являются, по сути, шаблонами того, как может эволюционировать схема, чтобы ее новая версия успешно могла быть обработана производителями и потребителями. Тип совместимости определяется при помощи параметра сервера Schema Registry schema.compatibility.level, который указывается в файле schema-registry.properties.

Кластер Schema Registry

Каждый сервер Schema Registry является, по сути, сервером REST API. Серверы Schema Registry предоставляют API для управления схемами, а также сериализаторы и десериализаторы для чтения и записи данных в соответствии со схемами.

Все cерверы Schema Registry в составе кластера ADS равноправно осуществляют обслуживание процессов сериализации и имеют доступ к хранилищу схем. В производственных средах рекомендуется указывать URL всех cерверов Schema Registry.

Производители

Ниже на рисунке показано, как работают производители Kafka при сериализации и записи сообщений с использованием Schema Registry.

Сериализация и запись сообщений
Сериализация и запись сообщений
Сериализация и запись сообщений
Сериализация и запись сообщений

Для сериализации и записи сериализованного сообщения в Kafka к экземпляру производителя подключается сериализатор, предоставляемый Schema Registry, тип которого определяет конфигурация производителя. Сериализатор взаимодействует с сервером Schema Registry по HTTP-протоколу.

Производитель передает сообщение сериализатору. Сериализатор определяет, существует ли идентификатор схемы для данного субъекта в кеш-памяти производителя. Сериализатор получает имя субъекта на основе нескольких стратегий при первом подключении экземпляра производителя к Schema Registry. При необходимости имя субъекта может быть указано явно.

Если идентификатор схемы отсутствует в кеш-памяти производителя, сериализатор регистрирует схему в Schema Registry (или получает последнюю версию схемы для данного субьекта) и сохраняет полученный ID (идентификатор) схемы и саму схему в кеш-память производителя. При регистрации Schema Registry записывает новую схему в специальный топик _schemas на брокере Kafka.

Перед сериализацией данные проверяются на соответствие схеме. Любое несоответствие между данными и схемой приведут к ошибке во время сериализации.

После получения ID схемы сериализатор записывает в топик Kafka сериализованные данные в соответствии с форматом сообщения, при этом добавляя перед ними специальный magic byte (номер версии формата сериализации) и ID схемы.

Если тот же экземпляр производителя сериализует новое сообщение для этого же субьекта, сериализатор обнаружит в кеш-памяти уже имеющийся ID схемы и схему и в соответствии с ней сериализует сообщение.

Для записи сообщения в новый топик при перезапуске производителя или при запросе производителя на изменение (эволюцию) схемы сериализатор выполняет новый запрос к Schema Registry для регистрации или получения схемы. При этом производится проверка на совместимость новой версии с предыдущими.

Для производителей, в том числе в приложениях Kafka Streams и коннекторах Kafka Connect, могут быть настроены следующие параметры, определяющие поведение системы при различиях между предварительно зарегистрированными и клиентскими схемами:

  • auto.register.schemas — определяет, должен ли сериализатор пытаться зарегистрировать схему в Schema Registry.

  • use.latest.version — применяется только в том случае, если для auto.register.schemas установлено значение false. Если для auto.register.schemas установлено значение false, а для use.latest.version установлено значение true, то вместо получения схемы для объекта, передаваемого клиенту для сериализации, Schema Registry будет использовать последнюю версию схемы в субъекте.

  • latest.compatibility.strict — значение по умолчанию равно true, но это применимо только в том случае, если use.latest.version равно true. Если оба свойства имеют значение true, во время сериализации выполняется проверка, чтобы убедиться, что последняя версия субъекта обратно совместима со схемой сериализуемого объекта. Если проверка не удалась, выдается ошибка. Если значение latest.compatibility.strict равно false, то для сериализации используется последняя версия субъекта без какой-либо проверки совместимости.

Подробно о нюансах настройки данных параметров можно прочитать в статье Handling differences between preregistered and client-derived schemas

Потребители

Ниже на рисунке показано, как работают потребители Kafka при десериализации и чтении сообщений с использованием Schema Registry.

Десериализация и чтение сообщений
Десериализация и чтение сообщений
Десериализация и чтение сообщений
Десериализация и чтение сообщений

Для десериализации сообщения в Kafka к экземпляру производителя подключается десериализатор, предоставляемый Schema Registry, тип которого определяет конфигурация потребителя. Десериализатор взаимодействует с сервером Schema Registry по HTTP-протоколу.

Потребитель получает данные из кластера Kafka (сериализованное сообщение) и передает их десериализатору. Десериализатор проверяет наличие magic byte и извлекает ID схемы из сообщения.

Затем десериализатор считывает идентификатор схемы и проверяет, существует ли соответствующая схема в кеш-памяти потребителя. Если существует, десериализация происходит с этой схемой. В противном случае десериализатор извлекает схему из Schema Registry на основе идентификатора схемы. Как только схема готова, десериализатор приступает к десериализации.

Если сериализованные данные не соответствуют полученной схеме (например, отсутствуют некоторые поля), десериализация завершится неудачно и возникнет ошибка.

Для потребителей при сериализации имеет значение параметр use.latest.version, определяющий поведение системы при различиях между предварительно зарегистрированными и клиентскими схемами.

Schema Registry в ADS

Подключение

После добавления и установки сервиса Schema Registry в составе кластера ADS вы можете подключиться к серверу Schema Registry, используя его URL в формате http://<schema-registry>:<schema-registry-port> с указанием хоста и порта, приведенных на странице информации о сервисе в интерфейсе ADCM.

Информация о сервисе Schema Registry в интерфейсе ADCM
Информация о сервисе Schema Registry в интерфейсе ADCM

Kafka

В ADS сервис Schema Registry может быть установлен только после установки сервиса Kafka. После установки Schema Registry в конфигурационном файле /etc/schema-registry/schema-registry.properties автоматически устанавливается параметр kafkastore.bootstrap.servers для связи с брокером Kafka (расположение служебного топика _schemas).

Для тестовых целей могут быть использованы скрипты, предназначенные для работы Kafka с сериализацией, например, kafka-avro-console-producer. После установки сервиса Schema Registry такие скрипты размещены в папке /usr/lib/schema-registry/bin/.

Ниже приведен пример запуска производителя с указанием схемы сообщения и потребителя (в данном примере Schema Registry и Kafka установлены на одном хосте).

Запись сообщения:

$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic topic-avro --property value.schema='{"type":"record","name":"message","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}'

Для записи сообщения не выводится >, как в работе с обычными скриптами Kafka:

{ "id":"1000", "amount":500 }

Чтение сообщений из топика:

$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic topic-avro --property schema.registry.url=http://localhost:8081

ksqlDB

Сервис Schema Registry, установленный в ADS одновременно c сервисом ksqlDB, позволяет выполнять сериализацию и десериализацию данных при работе с потоками и таблицами в ksqlDB. После установки сервиса ksqlDB параметры key.converter.schema.registry.url и value.converter.schema.registry.url, отвечающие за взаимодействие ksqlDB и Schema Registry, можно установить на стороне ksqlDB (в конфигурационном файле /etc/ksqldb/connect.properties).

Kafka Connect

Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka Connect, предоставляет возможность преобразования данных из внутренних типов данных, используемых Kafka Connect, в данные формата AVRO и наоборот.

Для работы с данными в формате AVRO в конфигурации Kafka Connect необходимо установить специальный AvroConverter в качестве параметра key.converter и/или value.converter, а также для параметра key.converter.schema.registry.url и/или value.converter.schema.registry.url установить URL сервера Schema Registry.

Kafka REST Proxy

Сервис Schema Registry, установленный в ADS одновременно c сервисом Kafka REST Proxy, позволяет выполнять сериализацию и десериализацию данных при работе с топиками Kafka по интерфейсу RESTful. После установки обоих сервисов параметр schema.registry.url автоматически устанавливается в конфигурационном файле Kafka REST Proxy /etc/kafka-rest/kafka-rest.properties.

Используя встроенные типы схем AVRO, PROTOBUF и JSON, вы можете напрямую встраивать данные в нужном формате вместе со схемой (или идентификатором схемы) в запрос, определяя соответствующий тип контента в пользовательских заголовках:

$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.avro.v2+json" \
      --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrotest"

NiFi

Некоторые службы контроллера NiFi, например, ConfluentSchemaRegistry, предоставляют доступ к схемам, хранящимся на сервере Schema Registry, указанном в параметрах службы.

Настройка Schema Registry

Настройка параметров Schema Registry в интерфейсе ADCM выполняется на странице конфигурирования сервиса Schema Registry.

Для настройки параметров конфигурационного файла /etc/schema-registry/schema-registry.properties переведите в активное состояние переключатель Show advanced, раскройте узел schema-registry.properties и введите новые значения для параметров. Для изменения параметров Schema Registry, отсутствующих в интерфейсе ADCM, используйте поле Add key,value. Выберите Add property и введите наименование параметра и его значение.

После изменения параметров при помощи интерфейса ADCM перезагрузите сервис Schema Registry. Для этого примените действие Restart, нажав на иконку actions default dark actions default light в столбце Actions.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней