Schema Registry usage example
The article describes a usage example of the Schema Registry service — serializing data written to Kafka using the Avro format and reading data using the generated schema.
Basic concepts used in Schema Registry:
-
Serialization is a translation from a human-readable format-structure into a binary representation for machine processing.
-
Scheme is a structure of the data format subjected to serialization.
-
Subject is an area in which schemas can develop.
To work with Avro schemas in Kafka, scripts for Python3 with the confluent_kafka API library are used in this article.
In the example described in this article, Python scripts are launched on one of the hosts with pre-installed Kafka and Schema Registry services of the ADS cluster.
Prepare to work
NOTE
In this section, all commands are given as reference information for Centos 7 OS. To install software packages on other operating systems, please refer to the documentation of the corresponding operating system. |
Install Python3
Before installing Python3, you should first install:
-
packages for working with libraries in Python:
$ sudo yum install gcc yum-utils zlib-devel python-tools cmake git pkgconfig -y --skip-broken
-
Development Tools package:
$ sudo yum groupinstall -y "Development Tools" --skip-broken
-
libraries for working with SSL:
$ sudo yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel zlib* libffi-devel readline-devel tk-devel --skip-broken
-
library for working with compressed LZMA and XZ files:
$ yum install -y xz-devel python-backports-lzma
-
wget
utility, if missing:$ sudo yum install wget
You can download and install the necessary Python3 version using the commands in the following sequence:
$ cd /usr/src $ wget https://www.python.org/ftp/python/3.9.1/Python-3.9.1.tgz $ tar xvzf Python-3.9.1.tgz $ cd Python-3.9.1 $ sudo ./configure $ sudo make install
You can check the version of Python3 with the command:
$ python3 --version
Install libraries
You can install the confluent_kafka API library using the command:
$ pip3 install confluent_kafka
You can install the requests library using the command:
$ pip3 install requests
You can install the rest of the necessary libraries using the commands:
$ pip3 install fastavro
$ pip3 install urllib3==1.26.6
$ pip3 install avro
$ pip3 install backports.lzma
NOTE
When running Python3 scripts, there is a possibility of the
|
If you get the ModuleNotFoundError: No module named '_lzma'
error after installing all the libraries, do the following:
-
Open file /usr/local/lib/python3.9/lzma.py.
-
Change lines:
from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties
to the lines:
try: from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties except ImportError: from backports.lzma import * from backports.lzma import _encode_filter_properties, _decode_filter_properties
Get data
The example uses Wikimedia event streams as the data source.
In order to get the data from which the Avro schema will be created, use the Python3 script that executes:
-
Recording 10 events.
-
Removing
data:
. -
Decoding a string with data in UTF-8.
-
Converting the data to a valid JSON object.
To receive events, do the following:
-
Create a script file:
$ sudo vim /tmp/events.py
-
Fill events.py with the script text below.
events.pyimport requests import json init_string = 'data: ' source_url = 'https://stream.wikimedia.org/v2/stream/recentchange' def avro_producer(source_url): s = requests.Session() with s.get(source_url, headers=None, stream=True) as resp: times = 10 for line in resp.iter_lines(): if line: decoded_line = line.decode() if decoded_line.find(init_string) >= 0: print(str(11 - times) + ")") # remove data: to create a valid json decoded_line = decoded_line.replace(init_string, "") # convert to json decoded_json = json.loads(decoded_line) print(decoded_json) times -= 1 if times <= 0: break avro_producer(source_url)
-
Give permission to execute the script:
$ sudo chmod +x /tmp/events.py
-
Run the script:
$ python3 /tmp/events.py
The result is listed below. The data of one event is shown.
{'$schema': '/mediawiki/recentchange/1.0.0', 'meta': {'uri': 'https://www.wikidata.org/wiki/Q91975510', 'request_id': '6d958229-0f1f-459b-b12b-e2724d816f1c', 'id': 'ec50d547-adfb-4cc5-beda-c3f8251d646c', 'dt': '2023-08-30T14:51:32Z', 'domain': 'www.wikidata.org', 'stream': 'mediawiki.recentchange', 'topic': 'eqiad.mediawiki.recentchange', 'partition': 0, 'offset': 4903433673}, 'id': 2024634666, 'type': 'edit', 'namespace': 0, 'title': 'Q91975510', 'title_url': 'https://www.wikidata.org/wiki/Q91975510', 'comment': '/* wbeditentity-update-languages-short:0||an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo */ BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo', 'timestamp': 1693407092, 'user': 'Emijrpbot', 'bot': True, 'notify_url': 'https://www.wikidata.org/w/index.php?diff=1964666701&oldid=1869851195&rcid=2024634666', 'minor': False, 'patrolled': True, 'length': {'old': 13772, 'new': 15400}, 'revision': {'old': 1869851195, 'new': 1964666701}, 'server_url': 'https://www.wikidata.org', 'server_name': 'www.wikidata.org', 'server_script_path': '/w', 'wiki': 'wikidatawiki', 'parsedcomment': '\u200e<span dir="auto"><span class="autocomment">Changed label, description and/or aliases in an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo: </span></span> BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo'}
Create and registry a schema
Create a schema
Based on the received data, you need to create a schema in one of the supported Schema Registry serialization formats, in this case Avro format.
The schema can be created manually, according to Avro format requirements. You can also use dedicated sites such as Data tools.AVRO SCHEMA.
For this example, the schema is created using only a few of the fields
used in the event so that fields can be added as a demonstration of the schema change.
After the schema has been created, it must be added to a special script that will be called by the Schema Registry when registering the schema. For this, do the following:
-
Create a script file:
$ sudo vim /tmp/constants.py
-
Fill constants.py with the created Avro schema as shown below.
SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
-
Give permission to execute the script:
$ sudo chmod +x /tmp/constants.py
Register a schema
In order to register a schema in the Schema Registry, use the Python3 script that executes:
-
Set the name of the Kafka topic.
-
Specify schema subject name —
-value
suffix is added to the topic name (in this example, schema is created for values, not for keys). -
Create Schema RegistryClient instance .
-
Define a function to get the latest version of the schema for the
wikimedia-value
subject. -
Define a function for registering a new AVRO schema in the Schema Registry using the data from the constants.py script and returning the ID of the new schema.
-
Define a function to delete the old schema and register a new schema for the subject.
-
Launch the schema registration function.
-
Display the last registered scheme.
To register a schema for the wikimedia-value
subject, do the following:
-
Create a script file:
$ sudo vim /tmp/schema_registry.py
-
Fill in schema_registry.py with the script text below.
schema_registry.py# 3rd party library imported from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry import Schema # import from constants from constants import SCHEMA_STR schema_registry_url = 'http://localhost:8081' kafka_topic = 'wikimedia' schema_registry_subject = f"{kafka_topic}-value" def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject): sr = SchemaRegistryClient({'url': schema_registry_url}) latest_version = sr.get_latest_version(schema_registry_subject) return sr, latest_version def register_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) schema = Schema(schema_str, schema_type="AVRO") schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema) return schema_id def update_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) versions_deleted_list = sr.delete_subject(schema_registry_subject) print(f"versions of schema deleted list: {versions_deleted_list}") schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str) return schema_id schema_id = register_schema(schema_registry_url, schema_registry_subject, SCHEMA_STR) print(schema_id) sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject) print(latest_version.schema.schema_str)
-
Give permission to execute the script:
$ sudo chmod +x /tmp/schema_registry.py
-
Run the script:
$ python3 /tmp/schema_registry.py
As a result, the identifier of the newly recorded scheme and the scheme itself, registered under this identifier, are displayed:
1 {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null}]}
NOTE
The returned identifier is used to retrieve this schema from the schema resource and is different from the schema version associated with the subject. If the same schema is registered under a different entity, the same identifier will be returned. However, the schema version can be different for different entities. |
Write data to a Kafka topic using a schema
NOTE
To automatically create a |
In order to write data to a Kafka topic using a schema, use the Python3 script that executes:
-
Define the
delivery_report
function that is called when the message is delivered. If the message is not delivered, an error is logged. If the message is delivered, the key, value, topic, partition, and offset are logged. -
Define the
avro_producer
function — record data in Avro format:-
Create Avro Serializer using the latest registered scheme.
-
Configure Kafka producer with serialization capabilities, including features:
-
delivery.timeout.ms
— maximum time during which a producer can deliver a message (including retries); -
enable.idempotence
— when this option is set totrue
, messages are created exactly once and in original order.
-
-
Handle streaming data, just as it is done in the script for getting events.py data, described by above.
-
-
Define the functions to get, register, and update the schema, in the same way as in the schema_registry.py script that is described above.
-
Launch the
avro_producer
function.
To write event messages to the wikimedia
topic in Avro format, do the following:
-
Create a script file:
$ sudo vim /tmp/avro_producer.py
-
Fill in avro_producer.py with the text of the script below.
avro_producer.pyimport requests import json # 3rd party library imported from confluent_kafka import SerializingProducer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.schema_registry import Schema # import from constants from constants import SCHEMA_STR init_string = 'data: ' source_url = 'https://stream.wikimedia.org/v2/stream/recentchange' kafka_url = 'localhost:9092' schema_registry_url = 'http://localhost:8081' kafka_topic = 'wikimedia' schema_registry_subject = f"{kafka_topic}-value" def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message: {} unsuccessfully produced to Topic: {} Partition: [{}] at offset {}'.format(msg.key(), msg.topic(), msg.partition(), msg.offset())) def avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject): # schema registry sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject) value_avro_serializer = AvroSerializer(schema_registry_client = sr, schema_str = latest_version.schema.schema_str, conf={ 'auto.register.schemas': False } ) # Kafka Producer producer = SerializingProducer({ 'bootstrap.servers': kafka_url, 'security.protocol': 'plaintext', 'value.serializer': value_avro_serializer, 'delivery.timeout.ms': 120000, # set it to 2 mins 'enable.idempotence': 'true' }) s = requests.Session() with s.get(source_url, headers=None, stream=True) as resp: for line in resp.iter_lines(): if line: decoded_line = line.decode() if decoded_line.find(init_string) >= 0: # remove data: to create a valid json decoded_line = decoded_line.replace(init_string, "") # convert to json decoded_json = json.loads(decoded_line) try: # print(decoded_line + '\n') producer.produce(topic=kafka_topic, value=decoded_json, on_delivery=delivery_report) # Trigger any available delivery report callbacks from previous produce() calls events_processed = producer.poll(1) print(f"events_processed: {events_processed}") messages_in_queue = producer.flush(1) print(f"messages_in_queue: {messages_in_queue}") except Exception as e: print(e) def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject): sr = SchemaRegistryClient({'url': schema_registry_url}) latest_version = sr.get_latest_version(schema_registry_subject) return sr, latest_version def register_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) schema = Schema(schema_str, schema_type="AVRO") schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema) return schema_id def update_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) versions_deleted_list = sr.delete_subject(schema_registry_subject) print(f"versions of schema deleted list: {versions_deleted_list}") schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str) return schema_id avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject)
-
Give permission to execute the script:
$ sudo chmod +x /tmp/avro_producer.py
-
Run the script:
$ python3 /tmp/avro_producer.py
As a result, a message is displayed about the absence of unsuccessful entries in the topic and the offset
number (the data of one event is shown):
Message: None unsuccessfully produced to Topic: wikimedia Partition: [0] at offset 284 events_processed: 1 messages_in_queue: 0
Read messages according to the scheme
In order to read the serialized data from a Kafka topic using a schema, use the Python3 script that executes:
-
Define the
avro_consumer
function — reading data using the Avro format schema:-
Set consumer messages.
-
Create AvroConsumer.
-
Determinate of the form of output messages.
-
-
Launch the
avro_consumer
function.
To read event messages from a wikimedia
topic, do the following:
-
Create a script file:
$ sudo nano /tmp/avro_consumer.py
-
Fill in avro_consumer.py with the text of the script below.
avro_consumer.pyimport json from confluent_kafka import Producer, KafkaException from confluent_kafka.avro import AvroConsumer from confluent_kafka import avro def avro_consumer(): consumer_config = { "bootstrap.servers": "localhost:9092", "schema.registry.url": "http://localhost:8081", "group.id": "1", "auto.offset.reset": "earliest" } #print(value_schema) consumer = AvroConsumer(consumer_config) consumer.subscribe(['wikimedia']) while True: try: msg = consumer.poll(1) if msg is None: continue print("Value is :" + json.dumps(msg.value())) print("-------------------------") except KafkaException as e: print('Kafka failure ' + e) consumer.close() def main(): avro_consumer() if __name__ == "__main__": main()
-
Give permission to execute the script:
$ sudo chmod +x /tmp/avro_consumer.py
-
Run the script:
$ python3 /tmp/avro_consumer.py
As a result, the data stream is read according to the given scheme (the data of one event is shown):
------------------------- Value is :{"bot": false, "comment": "[[:Category:2010s elections in Spain]] added to category", "id": 21173636} -------------------------
NOTE
Reading with data deserialization can also be done using the kcat client that supports deserialization of Avro messages using the Confluent Schema Registry. |
Schema evolution
To demonstrate schema evolution, you can do the follow:
-
Add a new field to constants.py, described by above, for example,
length
:{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}
The constants.py script after the change looks like this:
SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}]}"
-
Run the schema_registry.py script.
As a result, the identifier of the newly recorded scheme and the scheme itself, registered under this identifier, are displayed:
2 {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null},{"name":"length","type":["null",{"type":"record","name":"Length","fields":[{"name":"new","type":["null","int"],"default":null},{"name":"old","type":["null","int"],"default":null}]}],"default":null}]}
-
Run the scripts one by one:
As a result, the data stream is reading according to the changed schema — with a new
length
field (one event data is shown):
Value is :{"bot": false, "comment": "/* wbcreateclaim-create:1| */ [[Property:P279]]: [[Q11173]], [[:toollabs:quickstatements/#/batch/211994|batch #211994]]", "id": 2025996909, "length": {"new": 6175, "old": 5747}}