Schema Registry overview

Schema Registry features

Schema Registry is a centralized repository used to ensure consistency of data formats between message producers and consumers in Kafka.

Schema Registry features
Schema Registry features
Schema Registry features
Schema Registry features

The main functionality of Schema Registry is listed below:

  • Ensuring serialization and deserialization of data via special serializers provided to Kafka clients.

  • Interaction with the Kafka clients to write and read serialized messages:

  • Registration and storage of data schemas for each subject.

  • Support for the evolution of schemas with storage of each version with its own identifier and the ability to check version compatibility for each subject.

  • Ensuring accessibility for performing actions with schemas and subjects via the RESTful interface using Schema Registry API.

Schema Registry architecture

Overview

Basic Schema Registry terminology:

  • Serialization is the process of writing data from a format-structure that is semantically understandable to humans into a stream/byte array in a binary code format for machine processing. Schema Registry supports the following serialization formats: JSON, AVRO, PROTOBUF.

    Benefits of the serialization:

    • ability to use different data formats in Kafka messages;

    • ability to exchange data between applications implemented in different programming languages;

    • reduction in the total volume of stored data.

  • Deserialization is the restoration of a data structure from a byte stream.

  • Serializer and deserializer are objects (or classes) that represent a set of functions and methods that allow the client to interact with the Schema Registry to provide serialization and deserialization. There are built-in Kafka serializers and deserializers for different schema formats. The Kafka API also supports the creation of custom serializers and deserializers based on the Serializer interface. The serializer is defined in the producer configuration using the key.serializer and/or value.serializer parameters. The deserializer is defined in the consumer configuration using the key.deserializer and/or value.deserializer parameters.

  • Subject is the area in which schemas can evolve. The subject name is created when the schema is first registered and is used:

    • when changing (evolving) the schema as a namespace for different versions;

    • to check the compatibility of circuit versions;

    • to get the schema from the repository.

    There are three configurable ways to name a subject:

    • TopicNameStrategy is the default setting. Schemas that are registered by producers will be registered under the topic name with the addition of -key or -value (for example, for a topic topic, which only serializes values, the subject will be named topic-value).

    • RecordNameStrategy — allows you to use different schemas in one topic, since each individual record will have its own schema. In this case, you will have to use the same schema and the same version in all topics of the cluster for this particular record type, since it is impossible to determine which topic the record belongs to.

    • TopicRecordNameStrategy — a combination of the first two strategies.

    A non-default subject name is set using the producer parameter when registering a new schema — key.subject.name.strategy and/or value.subject.name.strategy.

  • Schema is the format structure of the serialized data. The schema describes the structure of the data written to the topic and what type of information they contain. This information connects producers and consumers. There are specific requirements for each format, for example, the AVRO format schema must comply with AVRO format requirements.

    Below are examples of supported format schemas for the same sample data used — { "id":"1000", "amount":500 }.

    JSON
    {
      "type": "object",
      "properties": {
        "id": {
          "type": "string"
        },
        "amount": {
          "type": "number"
        }
    AVRO
    {
        "type": "record",
        "name": "Transaction",
        "fields": [
            {
                "name": "id",
                "type": "string"
            },
            {
                "name": "amount",
                "type": "double"
            }
        ]
    }
    PROTOBUF
    syntax = "proto3";
    
    message MyRecord {
      string id = 1;
      float amount = 2;
    }
  • Schema compatibility — compliance of the new version of the schema with previous versions for a given subject. Multiple compatibility types may be defined depending on what data has been changed or deleted. Compatibility types are essentially templates for how a schema can evolve so that a new version of it can be successfully processed by producers and consumers. The compatibility type is specified using the schema.compatibility.level Schema Registry server parameter, which is specified in the schema-registry.properties file.

Schema Registry cluster

Every Schema Registry server is essentially a REST API server. Schema Registry servers provide APIs for managing schemas, as well as serializers and deserializers for reading and writing data according to schemas.

All Schema Registry servers within the ADS cluster equally serve serialization processes and have access to the schema storage. In production environments, it is recommended to specify the URL of all Schema Registry servers.

Producers

The figure below shows how Kafka producers work when serializing and writing messages using Schema Registry.

Serializing and recording messages
Serializing and recording messages
Serializing and recording messages
Serializing and recording messages

To serialize and write a serialized message to Kafka, a serializer provided by Schema Registry, the type of which is determined by the producer configuration, is connected to the producer instance. The serializer interacts with the Schema Registry server using the HTTP protocol.

The producer sends the message to the serializer. The serializer determines whether a schema ID for a given subject exists in the producer cache. The serializer obtains the subject name based on multiple strategies when the producer instance is first connected to the Schema Registry. If necessary, the subject name can be specified explicitly.

If the schema ID is not in the producer cache, the serializer registers the schema in the Schema Registry (or obtains the latest version of the schema for a given subject) and stores the resulting schema ID and the schema itself in the producer cache. When registering, Schema Registry writes the new schema to a special topic _schemas on the Kafka broker.

Before serialization, the data is checked against the schema. Any discrepancy or inconsistency between the data and the schema will result in an error during serialization.

After receiving the schema identifier, the serializer writes the serialized data to the Kafka topic in accordance with message format, while adding a special magic byte (serialization format version number) and schema ID ​​in front of it.

If the same producer instance serializes a new message for the same subject, the serializer will find an existing ID and schema in the memory cache and serialize the message according to it.

To write a message to a new topic, when the producer is restarted, or when the producer requests a schema change (evolution), the serializer makes a new request to the Schema Registry to register or retrieve the schema. In this case, a check is made for compatibility of the new version with the previous ones.

For producers, including Kafka Streams applications and Kafka Connect connectors, the following parameters can be configured to determine how the system behaves when there are differences between pre-registered and client-side schemas:

  • auto.register.schemas — determines whether the serializer should attempt to register a schema with the Schema Registry.

  • use.latest.version — only applies if auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of obtaining the schema for the object passed to the client for serialization, Schema Registry will use the latest version of the schema in subject.

  • latest.compatibility.strict — defaults to true, but this only applies if use.latest.version is true. If both properties are set to true, a check is performed during serialization to ensure that the latest version of the subject is backwards compatible with the schema of the object being serialized. If the check fails, an error is thrown. If latest.compatibility.strict is false, then the latest version of the subject is used for serialization without any compatibility checking.

You can read more about the nuances of setting these parameters in the Handling differences between preregistered and client-derived schemas article.

Consumers

The figure below shows how Kafka consumers work when deserializing and reading messages using Schema Registry.

Deserializing and reading messages
Deserializing and reading messages
Deserializing and reading messages
Deserializing and reading messages

To deserialize a message in Kafka, a deserializer provided by the Schema Registry, the type of which is determined by the consumer configuration, is connected to the producer instance. The deserializer interacts with the Schema Registry server via the HTTP protocol.

The consumer receives data from the Kafka cluster (serialized message), passes it to the deserializer. The deserializer checks for the presence of the magic byte and extracts the schema identifier from the message.

The deserializer then reads the schema ID and checks whether a matching schema exists in the consumer cache. If exists, deserialization occurs with this schema. Otherwise, the deserializer retrieves the schema from the registry based on the schema ID. Once the schema is ready, the deserializer starts deserializing.

If the serialized data does not match the expected schema (e.g. missing fields), deserialization will fail and an error will be raised.

For consumers, serialization has a value of use.latest.version, which determines how the system behaves when there are differences between pre-registered and client schemas.

Schema Registry in ADS

Connection

After addition and installation of the Schema Registry service as part of an ADS cluster, you can connect to the Schema Registry server using its URL in the http://<schema-registry>:<schema-registry-port> format with the host and port specified on the service information page in the ADCM interface.

Information about the Schema Registry service in the ADCM interface
Information about the Schema Registry service in the ADCM interface

Kafka

In ADS, the Schema Registry service can be installed only after installing the Kafka service. After installing Schema Registry, the kafkastore.bootstrap.servers parameter is automatically set in the /etc/schema-registry/schema-registry.properties configuration file to communicate with the Kafka broker (location of the _schemas service topic).

For test purposes, scripts designed to work with Kafka with serialization can be used, for example, kafka-avro-console-producer. After installing the Schema Registry service, such scripts are located in the /usr/lib/schema-registry/bin/ folder.

Below are examples of running a producer, indicating the message schema and consumer (in this example, Schema Registry and Kafka are installed on the same host).

Recording a message:

$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic topic-avro --property value.schema='{"type":"record","name":"message","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}'

To write a message, > is not printed, as is the case with regular Kafka scripts:

{ "id":"1000", "amount":500 }

Reading messages from a topic:

$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic topic-avro --property schema.registry.url=http://localhost:8081

ksqlDB

The Schema Registry service, installed in ADS simultaneously with the ksqlDB service, allows you to serialize and deserialize data when working with streams and tables in ksqlDB. After installing the ksqlDB service, the key.converter.schema.registry.url and value.converter.schema.registry.url parameters, which are responsible for the interaction between ksqlDB and Schema Registry, are set in ksqlDB in the /etc/ksqldb/connect.properties configuration file.

Kafka Connect

The Schema Registry service, installed in ADS simultaneously with the Kafka Connect service, provides the ability to convert data from internal data types used by Kafka Connect to AVRO format data and back.

To work with data in the AVRO format in the Kafka Connect configuration, set the special AvroConverter as the key.converter and/or value.converter parameter, and for the key.converter.schema.registry.url and/or value.converter.schema.registry.url parameters set the Schema Registry server URL.

Kafka REST Proxy

The Schema Registry service, installed in ADS simultaneously with the Kafka REST Proxy service, allows you to serialize and deserialize data when working with Kafka topics via the RESTful interface. After installing both services, the schema.registry.url parameter is automatically set in Kafka REST Proxy in the /etc/kafka-rest/kafka-rest.properties configuration file.

Using the built-in types AVRO, PROTOBUF и JSON, you can directly embed data in the desired format along with a schema (or schema identifier) ​​into a request by defining the appropriate content type in custom headers:

$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
      -H "Accept: application/vnd.kafka.avro.v2+json" \
      --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
      "http://localhost:8082/topics/avrotest"

NiFi

Some NiFi controller services, for example ConfluentSchemaRegistry, provide access to schemas stored on the Schema Registry server specified in the service parameters.

Configure Schema Registry

Configuring Schema Registry parameters in the ADCM interface is performed on the configuration page of the Schema Registry service.

To configure the parameters of the /etc/schema-registry/schema-registry.properties configuration file, set the Show advanced switch to active, expand the schema-registry.properties node, and enter new values ​​for the parameters. To change Schema Registry parameters that are not available in the ADCM interface, use the Add key,value field. Select Add property and enter the name of the parameter and its value.

After changing the parameters using the ADCM interface, restart the Schema Registry service. To do this, apply the Restart action by clicking actions default dark actions default light in the Actions column.

Found a mistake? Seleсt text and press Ctrl+Enter to report it