Schema Registry usage example

The article describes a usage example of the Schema Registry service — serializing data written to Kafka using the Avro format and reading data using the generated schema.

Basic concepts used in Schema Registry:

  • Serialization is a translation from a human-readable format-structure into a binary representation for machine processing.

  • Scheme is a structure of the data format subjected to serialization.

  • Subject is an area in which schemas can develop.

To work with Avro schemas in Kafka, scripts for Python3 with the confluent_kafka API library are used in this article.

In the example described in this article, Python scripts are launched on one of the hosts with pre-installed Kafka and Schema Registry services of the ADS cluster.

Prepare to work

NOTE

In this section, all commands are given as reference information for Centos 7 OS. To install software packages on other operating systems, please refer to the documentation of the corresponding operating system.

Install Python3

Before installing Python3, you should first install:

  • packages for working with libraries in Python:

    $ sudo yum install gcc yum-utils zlib-devel python-tools cmake git pkgconfig -y --skip-broken
  • Development Tools package:

    $ sudo yum groupinstall -y "Development Tools" --skip-broken
  • libraries for working with SSL:

    $ sudo yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel zlib* libffi-devel readline-devel tk-devel --skip-broken
  • library for working with compressed LZMA and XZ files:

    $ yum install -y xz-devel python-backports-lzma
  • wget utility, if missing:

    $ sudo yum install wget

    You can download and install the necessary Python3 version using the commands in the following sequence:

    $ cd /usr/src
    $ wget https://www.python.org/ftp/python/3.9.1/Python-3.9.1.tgz
    $ tar xvzf Python-3.9.1.tgz
    $ cd Python-3.9.1
    $ sudo ./configure
    $ sudo make install

    You can check the version of Python3 with the command:

    $ python3 --version

Install libraries

You can install the confluent_kafka API library using the command:

$ pip3 install confluent_kafka

You can install the requests library using the command:

$ pip3 install requests

You can install the rest of the necessary libraries using the commands:

$ pip3 install fastavro
$ pip3 install urllib3==1.26.6
$ pip3 install avro
$ pip3 install backports.lzma
NOTE

When running Python3 scripts, there is a possibility of the No module named <module_name> error, which defines that you should install the required module using the command:

$ pip3 install <module_name>
Solving the "ModuleNotFoundError: No module named '_lzma'" error

If you get the ModuleNotFoundError: No module named '_lzma' error after installing all the libraries, do the following:

  1. Open file /usr/local/lib/python3.9/lzma.py.

  2. Change lines:

    from _lzma import *
    from _lzma import _encode_filter_properties, _decode_filter_properties

    to the lines:

    try:
        from _lzma import *
        from _lzma import _encode_filter_properties, _decode_filter_properties
    except ImportError:
        from backports.lzma import *
        from backports.lzma import _encode_filter_properties, _decode_filter_properties

Get data

The example uses Wikimedia event streams as the data source.

In order to get the data from which the Avro schema will be created, use the Python3 script that executes:

  1. Recording 10 events.

  2. Removing data:.

  3. Decoding a string with data in UTF-8.

  4. Converting the data to a valid JSON object.

To receive events, do the following:

  1. Create a script file:

    $ sudo vim /tmp/events.py
  2. Fill events.py with the script text below.

    events.py
    import requests
    import json
    
    init_string = 'data: '
    source_url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    
    def avro_producer(source_url):
        s = requests.Session()
    
        with s.get(source_url, headers=None, stream=True) as resp:
            times = 10
            for line in resp.iter_lines():
                if line:
                    decoded_line = line.decode()
                    if decoded_line.find(init_string) >= 0:
                        print(str(11 - times) + ")")
                        # remove data: to create a valid json
                        decoded_line = decoded_line.replace(init_string, "")
                        # convert to json
                        decoded_json = json.loads(decoded_line)
                        print(decoded_json)
                        times -= 1
                if times <= 0: break
    
    avro_producer(source_url)
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/events.py
  4. Run the script:

    $ python3 /tmp/events.py

The result is listed below. The data of one event is shown.

{'$schema': '/mediawiki/recentchange/1.0.0', 'meta': {'uri': 'https://www.wikidata.org/wiki/Q91975510', 'request_id': '6d958229-0f1f-459b-b12b-e2724d816f1c', 'id': 'ec50d547-adfb-4cc5-beda-c3f8251d646c', 'dt': '2023-08-30T14:51:32Z', 'domain': 'www.wikidata.org', 'stream': 'mediawiki.recentchange', 'topic': 'eqiad.mediawiki.recentchange', 'partition': 0, 'offset': 4903433673}, 'id': 2024634666, 'type': 'edit', 'namespace': 0, 'title': 'Q91975510', 'title_url': 'https://www.wikidata.org/wiki/Q91975510', 'comment': '/* wbeditentity-update-languages-short:0||an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo */ BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo', 'timestamp': 1693407092, 'user': 'Emijrpbot', 'bot': True, 'notify_url': 'https://www.wikidata.org/w/index.php?diff=1964666701&oldid=1869851195&rcid=2024634666', 'minor': False, 'patrolled': True, 'length': {'old': 13772, 'new': 15400}, 'revision': {'old': 1869851195, 'new': 1964666701}, 'server_url': 'https://www.wikidata.org', 'server_name': 'www.wikidata.org', 'server_script_path': '/w', 'wiki': 'wikidatawiki', 'parsedcomment': '\u200e<span dir="auto"><span class="autocomment">Changed label, description and/or aliases in an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo: </span></span> BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo'}

Create and registry a schema

Create a schema

Based on the received data, you need to create a schema in one of the supported Schema Registry serialization formats, in this case Avro format.

The schema can be created manually, according to Avro format requirements. You can also use dedicated sites such as Data tools.AVRO SCHEMA.

For this example, the schema is created using only a few of the fields used in the event so that fields can be added as a demonstration of the schema change.

After the schema has been created, it must be added to a special script that will be called by the Schema Registry when registering the schema. For this, do the following:

  1. Create a script file:

    $ sudo vim /tmp/constants.py
  2. Fill constants.py with the created Avro schema as shown below.

    SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/constants.py

Register a schema

In order to register a schema in the Schema Registry, use the Python3 script that executes:

  1. Set the name of the Kafka topic.

  2. Specify schema subject name — -value suffix is ​​added to the topic name (in this example, schema is created for values, not for keys).

  3. Create Schema RegistryClient instance .

  4. Define a function to get the latest version of the schema for the wikimedia-value subject.

  5. Define a function for registering a new AVRO schema in the Schema Registry using the data from the constants.py script and returning the ID of the new schema.

  6. Define a function to delete the old schema and register a new schema for the subject.

  7. Launch the schema registration function.

  8. Display the last registered scheme.

To register a schema for the wikimedia-value subject, do the following:

  1. Create a script file:

    $ sudo vim /tmp/schema_registry.py
  2. Fill in schema_registry.py with the script text below.

    schema_registry.py
    # 3rd party library imported
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry import Schema
    
    # import from constants
    from constants import SCHEMA_STR
    
    schema_registry_url = 'http://localhost:8081'
    kafka_topic = 'wikimedia'
    schema_registry_subject = f"{kafka_topic}-value"
    
    def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        latest_version = sr.get_latest_version(schema_registry_subject)
    
        return sr, latest_version
    
    def register_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        schema = Schema(schema_str, schema_type="AVRO")
        schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema)
    
        return schema_id
    
    def update_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        versions_deleted_list = sr.delete_subject(schema_registry_subject)
        print(f"versions of schema deleted list: {versions_deleted_list}")
    
        schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str)
        return schema_id
    
    schema_id = register_schema(schema_registry_url, schema_registry_subject, SCHEMA_STR)
    print(schema_id)
    
    sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    print(latest_version.schema.schema_str)
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/schema_registry.py
  4. Run the script:

    $ python3 /tmp/schema_registry.py

As a result, the identifier of the newly recorded scheme and the scheme itself, registered under this identifier, are displayed:

1
{"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null}]}
NOTE

The returned identifier is used to retrieve this schema from the schema resource and is different from the schema version associated with the subject. If the same schema is registered under a different entity, the same identifier will be returned. However, the schema version can be different for different entities.

Write data to a Kafka topic using a schema

NOTE

To automatically create a wikimedia topic, you must enable the auto.create.topics.enable option in the server.properties group when configuring the Kafka service.

In order to write data to a Kafka topic using a schema, use the Python3 script that executes:

  1. Define the delivery_report function that is called when the message is delivered. If the message is not delivered, an error is logged. If the message is delivered, the key, value, topic, partition, and offset are logged.

  2. Define the avro_producer function — record data in Avro format:

    • Create Avro Serializer using the latest registered scheme.

    • Configure Kafka producer with serialization capabilities, including features:

      • delivery.timeout.ms — maximum time during which a producer can deliver a message (including retries);

      • enable.idempotence — when this option is set to true, messages are created exactly once and in original order.

    • Handle streaming data, just as it is done in the script for getting events.py data, described by above.

  3. Define the functions to get, register, and update the schema, in the same way as in the schema_registry.py script that is described above.

  4. Launch the avro_producer function.

To write event messages to the wikimedia topic in Avro format, do the following:

  1. Create a script file:

    $ sudo vim /tmp/avro_producer.py
  2. Fill in avro_producer.py with the text of the script below.

    avro_producer.py
    import requests
    import json
    
    # 3rd party library imported
    from confluent_kafka import SerializingProducer
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.avro import AvroSerializer
    from confluent_kafka.schema_registry import Schema
    
    # import from constants
    from constants import SCHEMA_STR
    
    init_string = 'data: '
    source_url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    kafka_url = 'localhost:9092'
    schema_registry_url = 'http://localhost:8081'
    kafka_topic = 'wikimedia'
    schema_registry_subject = f"{kafka_topic}-value"
    
    def delivery_report(errmsg, msg):
        if errmsg is not None:
            print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
            return
        print('Message: {} unsuccessfully produced to Topic: {} Partition: [{}] at offset {}'.format(msg.key(), msg.topic(), msg.partition(), msg.offset()))
    
    def avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject):
        # schema registry
        sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    
    
        value_avro_serializer = AvroSerializer(schema_registry_client = sr,
                                              schema_str = latest_version.schema.schema_str,
                                              conf={
                                                  'auto.register.schemas': False
                                                }
                                              )
    
        # Kafka Producer
        producer = SerializingProducer({
            'bootstrap.servers': kafka_url,
            'security.protocol': 'plaintext',
            'value.serializer': value_avro_serializer,
            'delivery.timeout.ms': 120000, # set it to 2 mins
            'enable.idempotence': 'true'
        })
    
        s = requests.Session()
    
        with s.get(source_url, headers=None, stream=True) as resp:
            for line in resp.iter_lines():
                if line:
                    decoded_line = line.decode()
                    if decoded_line.find(init_string) >= 0:
                        # remove data: to create a valid json
                        decoded_line = decoded_line.replace(init_string, "")
                        # convert to json
                        decoded_json = json.loads(decoded_line)
    
                        try:
                            # print(decoded_line + '\n')
                            producer.produce(topic=kafka_topic, value=decoded_json, on_delivery=delivery_report)
    
                            # Trigger any available delivery report callbacks from previous produce() calls
                            events_processed = producer.poll(1)
                            print(f"events_processed: {events_processed}")
    
                            messages_in_queue = producer.flush(1)
                            print(f"messages_in_queue: {messages_in_queue}")
                        except Exception as e:
                            print(e)
    
    
    def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        latest_version = sr.get_latest_version(schema_registry_subject)
    
        return sr, latest_version
    
    def register_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        schema = Schema(schema_str, schema_type="AVRO")
        schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema)
    
        return schema_id
    
    def update_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        versions_deleted_list = sr.delete_subject(schema_registry_subject)
        print(f"versions of schema deleted list: {versions_deleted_list}")
    
        schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str)
        return schema_id
    
    
    avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject)
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/avro_producer.py
  4. Run the script:

    $ python3 /tmp/avro_producer.py

As a result, a message is displayed about the absence of unsuccessful entries in the topic and the offset number (the data of one event is shown):

Message: None unsuccessfully produced to Topic: wikimedia Partition: [0] at offset 284
events_processed: 1
messages_in_queue: 0

Read messages according to the scheme

In order to read the serialized data from a Kafka topic using a schema, use the Python3 script that executes:

  1. Define the avro_consumer function — reading data using the Avro format schema:

    • Set consumer messages.

    • Create AvroConsumer.

    • Determinate of the form of output messages.

  2. Launch the avro_consumer function.

To read event messages from a wikimedia topic, do the following:

  1. Create a script file:

    $ sudo nano /tmp/avro_consumer.py
  2. Fill in avro_consumer.py with the text of the script below.

    avro_consumer.py
    import json
    from confluent_kafka import Producer, KafkaException
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka import avro
    
    def avro_consumer():
        consumer_config = {
            "bootstrap.servers": "localhost:9092",
            "schema.registry.url": "http://localhost:8081",
            "group.id": "1",
            "auto.offset.reset": "earliest"
        }
    
        #print(value_schema)
    
        consumer = AvroConsumer(consumer_config)
        consumer.subscribe(['wikimedia'])
    
        while True:
          try:
            msg = consumer.poll(1)
    
            if msg is None:
              continue
    
            print("Value is :" + json.dumps(msg.value()))
            print("-------------------------")
    
          except KafkaException as e:
            print('Kafka failure ' + e)
    
        consumer.close()
    
    def main():
        avro_consumer()
    
    if __name__ == "__main__":
        main()
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/avro_consumer.py
  4. Run the script:

    $ python3 /tmp/avro_consumer.py

    As a result, the data stream is read according to the given scheme (the data of one event is shown):

-------------------------
Value is :{"bot": false, "comment": "[[:Category:2010s elections in Spain]] added to category", "id": 21173636}
-------------------------
NOTE

Reading with data deserialization can also be done using the kcat client that supports deserialization of Avro messages using the Confluent Schema Registry.

Schema evolution

To demonstrate schema evolution, you can do the follow:

  1. Add a new field to constants.py, described by above, for example, length:

    {\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}

    The constants.py script after the change looks like this:

    SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}]}"
  2. Run the schema_registry.py script.

    As a result, the identifier of the newly recorded scheme and the scheme itself, registered under this identifier, are displayed:

    2
    {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null},{"name":"length","type":["null",{"type":"record","name":"Length","fields":[{"name":"new","type":["null","int"],"default":null},{"name":"old","type":["null","int"],"default":null}]}],"default":null}]}
  3. Run the scripts one by one:

    As a result, the data stream is reading according to the changed schema — with a new length field (one event data is shown):

Value is :{"bot": false, "comment": "/* wbcreateclaim-create:1| */ [[Property:P279]]: [[Q11173]], [[:toollabs:quickstatements/#/batch/211994|batch #211994]]", "id": 2025996909, "length": {"new": 6175, "old": 5747}}
Found a mistake? Seleсt text and press Ctrl+Enter to report it