Schema Registry overview
Schema Registry features
Schema Registry is a centralized repository used to ensure consistency of data formats between message producers and consumers in Kafka.
The main functionality of Schema Registry is listed below:
-
Ensuring serialization and deserialization of data via special serializers provided to Kafka clients.
-
Interaction with the Kafka clients to write and read serialized messages:
-
with separate schema managers (e.g. the Schema Registry Maven plugin) as part of CI/CD technology.
-
Registration and storage of data schemas for each subject.
-
Support for the evolution of schemas with storage of each version with its own identifier and the ability to check version compatibility for each subject.
-
Ensuring accessibility for performing actions with schemas and subjects via the RESTful interface using Schema Registry API.
Schema Registry architecture
Overview
Basic Schema Registry terminology:
-
Serialization is the process of writing data from a format-structure that is semantically understandable to humans into a stream/byte array in a binary code format for machine processing. Schema Registry supports the following serialization formats: JSON, AVRO, PROTOBUF.
Benefits of the serialization:
-
ability to use different data formats in Kafka messages;
-
ability to exchange data between applications implemented in different programming languages;
-
reduction in the total volume of stored data.
-
-
Deserialization is the restoration of a data structure from a byte stream.
-
Serializer and deserializer are objects (or classes) that represent a set of functions and methods that allow the client to interact with the Schema Registry to provide serialization and deserialization. There are built-in Kafka serializers and deserializers for different schema formats. The Kafka API also supports the creation of custom serializers and deserializers based on the Serializer interface. The serializer is defined in the producer configuration using the
key.serializer
and/orvalue.serializer
parameters. The deserializer is defined in the consumer configuration using thekey.deserializer
and/orvalue.deserializer
parameters. -
Subject is the area in which schemas can evolve. The subject name is created when the schema is first registered and is used:
-
when changing (evolving) the schema as a namespace for different versions;
-
to check the compatibility of circuit versions;
-
to get the schema from the repository.
There are three configurable ways to name a subject:
-
TopicNameStrategy
is the default setting. Schemas that are registered by producers will be registered under the topic name with the addition of-key
or-value
(for example, for a topictopic
, which only serializes values, the subject will be namedtopic-value
). -
RecordNameStrategy
— allows you to use different schemas in one topic, since each individual record will have its own schema. In this case, you will have to use the same schema and the same version in all topics of the cluster for this particular record type, since it is impossible to determine which topic the record belongs to. -
TopicRecordNameStrategy
— a combination of the first two strategies.
A non-default subject name is set using the producer parameter when registering a new schema —
key.subject.name.strategy
and/orvalue.subject.name.strategy
. -
-
Schema is the format structure of the serialized data. The schema describes the structure of the data written to the topic and what type of information they contain. This information connects producers and consumers. There are specific requirements for each format, for example, the AVRO format schema must comply with AVRO format requirements.
Below are examples of supported format schemas for the same sample data used —
{ "id":"1000", "amount":500 }
.JSON{ "type": "object", "properties": { "id": { "type": "string" }, "amount": { "type": "number" }
AVRO{ "type": "record", "name": "Transaction", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" } ] }
PROTOBUFsyntax = "proto3"; message MyRecord { string id = 1; float amount = 2; }
-
Schema compatibility — compliance of the new version of the schema with previous versions for a given subject. Multiple compatibility types may be defined depending on what data has been changed or deleted. Compatibility types are essentially templates for how a schema can evolve so that a new version of it can be successfully processed by producers and consumers. The compatibility type is specified using the schema.compatibility.level Schema Registry server parameter, which is specified in the schema-registry.properties file.
Schema Registry cluster
Every Schema Registry server is essentially a REST API server. Schema Registry servers provide APIs for managing schemas, as well as serializers and deserializers for reading and writing data according to schemas.
All Schema Registry servers within the ADS cluster equally serve serialization processes and have access to the schema storage. In production environments, it is recommended to specify the URL of all Schema Registry servers.
Producers
The figure below shows how Kafka producers work when serializing and writing messages using Schema Registry.
To serialize and write a serialized message to Kafka, a serializer provided by Schema Registry, the type of which is determined by the producer configuration, is connected to the producer instance. The serializer interacts with the Schema Registry server using the HTTP protocol.
The producer sends the message to the serializer. The serializer determines whether a schema ID for a given subject exists in the producer cache. The serializer obtains the subject name based on multiple strategies when the producer instance is first connected to the Schema Registry. If necessary, the subject name can be specified explicitly.
If the schema ID is not in the producer cache, the serializer registers the schema in the Schema Registry (or obtains the latest version of the schema for a given subject) and stores the resulting schema ID and the schema itself in the producer cache. When registering, Schema Registry writes the new schema to a special topic _schemas
on the Kafka broker.
Before serialization, the data is checked against the schema. Any discrepancy or inconsistency between the data and the schema will result in an error during serialization.
After receiving the schema identifier, the serializer writes the serialized data to the Kafka topic in accordance with message format, while adding a special magic byte (serialization format version number) and schema ID in front of it.
If the same producer instance serializes a new message for the same subject, the serializer will find an existing ID and schema in the memory cache and serialize the message according to it.
To write a message to a new topic, when the producer is restarted, or when the producer requests a schema change (evolution), the serializer makes a new request to the Schema Registry to register or retrieve the schema. In this case, a check is made for compatibility of the new version with the previous ones.
For producers, including Kafka Streams applications and Kafka Connect connectors, the following parameters can be configured to determine how the system behaves when there are differences between pre-registered and client-side schemas:
-
auto.register.schemas
— determines whether the serializer should attempt to register a schema with the Schema Registry. -
use.latest.version
— only applies ifauto.register.schemas
is set tofalse
. Ifauto.register.schemas
is set tofalse
anduse.latest.version
is set totrue
, then instead of obtaining the schema for the object passed to the client for serialization, Schema Registry will use the latest version of the schema in subject. -
latest.compatibility.strict
— defaults totrue
, but this only applies ifuse.latest.version
istrue
. If both properties are set totrue
, a check is performed during serialization to ensure that the latest version of the subject is backwards compatible with the schema of the object being serialized. If the check fails, an error is thrown. Iflatest.compatibility.strict
isfalse
, then the latest version of the subject is used for serialization without any compatibility checking.
You can read more about the nuances of setting these parameters in the Handling differences between preregistered and client-derived schemas article.
Consumers
The figure below shows how Kafka consumers work when deserializing and reading messages using Schema Registry.
To deserialize a message in Kafka, a deserializer provided by the Schema Registry, the type of which is determined by the consumer configuration, is connected to the producer instance. The deserializer interacts with the Schema Registry server via the HTTP protocol.
The consumer receives data from the Kafka cluster (serialized message), passes it to the deserializer. The deserializer checks for the presence of the magic byte and extracts the schema identifier from the message.
The deserializer then reads the schema ID and checks whether a matching schema exists in the consumer cache. If exists, deserialization occurs with this schema. Otherwise, the deserializer retrieves the schema from the registry based on the schema ID. Once the schema is ready, the deserializer starts deserializing.
If the serialized data does not match the expected schema (e.g. missing fields), deserialization will fail and an error will be raised.
For consumers, serialization has a value of use.latest.version
, which determines how the system behaves when there are differences between pre-registered and client schemas.
Schema Registry in ADS
Connection
After addition and installation of the Schema Registry service as part of an ADS cluster, you can connect to the Schema Registry server using its URL in the http://<schema-registry>:<schema-registry-port>
format with the host and port specified on the service information page in the ADCM interface.
Kafka
In ADS, the Schema Registry service can be installed only after installing the Kafka service. After installing Schema Registry, the kafkastore.bootstrap.servers
parameter is automatically set in the /etc/schema-registry/schema-registry.properties configuration file to communicate with the Kafka broker (location of the _schemas
service topic).
For test purposes, scripts designed to work with Kafka with serialization can be used, for example, kafka-avro-console-producer. After installing the Schema Registry service, such scripts are located in the /usr/lib/schema-registry/bin/ folder.
Below are examples of running a producer, indicating the message schema and consumer (in this example, Schema Registry and Kafka are installed on the same host).
Recording a message:
$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic topic-avro --property value.schema='{"type":"record","name":"message","fields":[{"name":"id","type":"string"},{"name": "amount", "type": "double"}]}'
To write a message, >
is not printed, as is the case with regular Kafka scripts:
{ "id":"1000", "amount":500 }
Reading messages from a topic:
$ sudo /usr/lib/schema-registry/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic topic-avro --property schema.registry.url=http://localhost:8081
ksqlDB
The Schema Registry service, installed in ADS simultaneously with the ksqlDB service, allows you to serialize and deserialize data when working with streams and tables in ksqlDB. After installing the ksqlDB service, the key.converter.schema.registry.url
and value.converter.schema.registry.url
parameters, which are responsible for the interaction between ksqlDB and Schema Registry, are set in ksqlDB in the /etc/ksqldb/connect.properties configuration file.
Kafka Connect
The Schema Registry service, installed in ADS simultaneously with the Kafka Connect service, provides the ability to convert data from internal data types used by Kafka Connect to AVRO format data and back.
To work with data in the AVRO format in the Kafka Connect configuration, set the special AvroConverter as the key.converter
and/or value.converter
parameter, and for the key.converter.schema.registry.url
and/or value.converter.schema.registry.url
parameters set the Schema Registry server URL.
Kafka REST Proxy
The Schema Registry service, installed in ADS simultaneously with the Kafka REST Proxy service, allows you to serialize and deserialize data when working with Kafka topics via the RESTful interface. After installing both services, the schema.registry.url
parameter is automatically set in Kafka REST Proxy in the /etc/kafka-rest/kafka-rest.properties configuration file.
Using the built-in types AVRO, PROTOBUF и JSON, you can directly embed data in the desired format along with a schema (or schema identifier) into a request by defining the appropriate content type in custom headers:
$ curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.avro.v2+json" \
--data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
"http://localhost:8082/topics/avrotest"
NiFi
Some NiFi controller services, for example ConfluentSchemaRegistry, provide access to schemas stored on the Schema Registry server specified in the service parameters.
Configure Schema Registry
Configuring Schema Registry parameters in the ADCM interface is performed on the configuration page of the Schema Registry service.
To configure the parameters of the /etc/schema-registry/schema-registry.properties configuration file, set the Show advanced switch to active, expand the schema-registry.properties node, and enter new values for the parameters. To change Schema Registry parameters that are not available in the ADCM interface, use the Add key,value field. Select Add property and enter the name of the parameter and its value.
After changing the parameters using the ADCM interface, restart the Schema Registry service. To do this, apply the Restart action by clicking in the Actions column.