Пример использования Schema Registry

В статье описан пример использования сервиса Schema Registry — сериализация данных, записываемых в Kafka, при помощи формата Avro и чтение данных с использованием созданной схемы.

Основные понятия, используемые в Schema Registry:

  • Сериализация — перевод из формата-структуры, семантически понятной человеку, в двоичное представление для машинной обработки.

  • Схема — структура формата данных, подвергнутых сериализации.

  • Cубъект — область, в которой могут развиваться схемы.

Для работы со схемами Avro в Kafka в данной статье используются скрипты для Python3 с применением библиотеки confluent_kafka API.

В описанном в данной статье примере запуск Python-скриптов производится на одном из хостов с предустановленными сервисами Kafka и Schema Registry кластера ADS.

Подготовка к работе

ПРИМЕЧАНИЕ

В данном разделе все команды приведены как справочная информация для ОС Centos 7. Для установки программных пакетов в других ОС необходимо обратиться к документации соответстующей ОС.

Установка Python3

Перед установкой Python3 необходимо предварительно установить:

  • пакеты для работы с библиотеками в Python:

    $ sudo yum install gcc yum-utils zlib-devel python-tools cmake git pkgconfig -y --skip-broken
  • пакет Development Tools:

    $ sudo yum groupinstall -y "Development Tools" --skip-broken
  • библиотеки для работы с SSL:

    $ sudo yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel zlib* libffi-devel readline-devel tk-devel --skip-broken
  • библиотеку для работы со сжатыми файлами LZMA и XZ:

    $ yum install -y xz-devel python-backports-lzma
  • утилиту wget, если отсутствует:

    $ sudo yum install wget

    Загрузить и установить необходимую версию Python3 можно при помощи команд в следующей последовательности:

    $ cd /usr/src
    $ wget https://www.python.org/ftp/python/3.9.1/Python-3.9.1.tgz
    $ tar xvzf Python-3.9.1.tgz
    $ cd Python-3.9.1
    $ sudo ./configure
    $ sudo make install

    Проверить версию Python3 можно при помощи команды:

    $ python3 --version

Установка библиотек

Установить библиотеку confluent_kafka API можно при помощи команды:

$ pip3 install confluent_kafka

Установить библиотеку requests можно при помощи команды:

$ pip3 install requests

Установить остальные необходимые библиотеки можно при помощи команд:

$ pip3 install fastavro
$ pip3 install urllib3==1.26.6
$ pip3 install avro
$ pip3 install backports.lzma
ПРИМЕЧАНИЕ

При запуске скриптов Python3 существует вероятность возникновения ошибки No module named <module_name>, при этом необходимо установить требуемый модуль при помощи команды:

$ pip3 install <module_name>
Решение проблемы с ошибкой "ModuleNotFoundError: No module named '_lzma'"

Если после установки всех библиотек при запуске скрипта появляется ошибка ModuleNotFoundError: No module named '_lzma', необходимо выполнить:

  1. Открыть файл /usr/local/lib/python3.9/lzma.py.

  2. Изменить строки:

    from _lzma import *
    from _lzma import _encode_filter_properties, _decode_filter_properties
    +

    на строки:

    try:
        from _lzma import *
        from _lzma import _encode_filter_properties, _decode_filter_properties
    except ImportError:
        from backports.lzma import *
        from backports.lzma import _encode_filter_properties, _decode_filter_properties

Получение данных

В примере в качестве источника данных используются потоки событий Wikimedia.

Для того чтобы получить данные, на основе которых будет создана схема Avro, используется скрипт Python3, выполняющий:

  1. Запись 10 событий.

  2. Удаление data:.

  3. Декодирование строки с данными в UTF-8.

  4. Преобразование данных в действительный объект JSON.

Для получения событий необходимо:

  1. Cоздать файл скрипта:

    $ sudo vim /tmp/events.py
  2. Заполнить events.py текстом скрипта, приведенным ниже.

    events.py
    import requests
    import json
    
    init_string = 'data: '
    source_url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    
    def avro_producer(source_url):
        s = requests.Session()
    
        with s.get(source_url, headers=None, stream=True) as resp:
            times = 10
            for line in resp.iter_lines():
                if line:
                    decoded_line = line.decode()
                    if decoded_line.find(init_string) >= 0:
                        print(str(11 - times) + ")")
                        # remove data: to create a valid json
                        decoded_line = decoded_line.replace(init_string, "")
                        # convert to json
                        decoded_json = json.loads(decoded_line)
                        print(decoded_json)
                        times -= 1
                if times <= 0: break
    
    avro_producer(source_url)
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/events.py
  4. Запустить скрипт:

    $ python3 /tmp/events.py

Результат приведен ниже. Показаны данные одного события.

{'$schema': '/mediawiki/recentchange/1.0.0', 'meta': {'uri': 'https://www.wikidata.org/wiki/Q91975510', 'request_id': '6d958229-0f1f-459b-b12b-e2724d816f1c', 'id': 'ec50d547-adfb-4cc5-beda-c3f8251d646c', 'dt': '2023-08-30T14:51:32Z', 'domain': 'www.wikidata.org', 'stream': 'mediawiki.recentchange', 'topic': 'eqiad.mediawiki.recentchange', 'partition': 0, 'offset': 4903433673}, 'id': 2024634666, 'type': 'edit', 'namespace': 0, 'title': 'Q91975510', 'title_url': 'https://www.wikidata.org/wiki/Q91975510', 'comment': '/* wbeditentity-update-languages-short:0||an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo */ BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo', 'timestamp': 1693407092, 'user': 'Emijrpbot', 'bot': True, 'notify_url': 'https://www.wikidata.org/w/index.php?diff=1964666701&oldid=1869851195&rcid=2024634666', 'minor': False, 'patrolled': True, 'length': {'old': 13772, 'new': 15400}, 'revision': {'old': 1869851195, 'new': 1964666701}, 'server_url': 'https://www.wikidata.org', 'server_name': 'www.wikidata.org', 'server_script_path': '/w', 'wiki': 'wikidatawiki', 'parsedcomment': '\u200e<span dir="auto"><span class="autocomment">Changed label, description and/or aliases in an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo: </span></span> BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo'}

Создание и регистрация схемы AVRO

Создание схемы

На основании полученных данных необходимо создать схему в одном из поддерживаемых Schema Registry форматов сериализации, в данном случае формате Avro.

Схема может быть создана вручную, в соответствии с требованиями к формату Avro. Также можно использовать специальные сайты, например Data tools.AVRO SCHEMA.

Для данного примера схема создана с использованием лишь нескольких использующихся в событии полей (fields). При этом можно добавить поля в качестве демонстрации изменения схемы.

После создания схему необходимо внести в специальный скрипт, к которому будет обращаться Schema Registry при регистрации схемы. Для этого необходимо выполнить:

  1. Cоздать файл скрипта:

    $ sudo vim /tmp/constants.py
  2. Заполнить constants.py созданной схемой Avro, как показано ниже.

    SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/constants.py

Регистрация схемы

Для того чтобы зарегистрировать схему в Schema Registry, используется скрипт Python3, выполняющий:

  1. Указание наименования топика Kafka.

  2. Указание имени субьекта схем — к наименованию топика добавляется суффикс -value (в данном примере создается схема для значений, а не для ключей).

  3. Cоздание экземпляра клиента Schema Registry.

  4. Определение функции получения последней версии схемы для субъекта wikimedia-value.

  5. Определение функции регистрации в Schema Registry новой схемы AVRO с использованием данных скрипта constants.py и возвращение идентификатора новой схемы.

  6. Определение функции удаления старой схемы и регистрации новой схемы для субьекта.

  7. Запуск функции регистрации схемы.

  8. Отображение последней зарегистрированной схемы.

Для регистрации схемы для субъекта wikimedia-value необходимо:

  1. Cоздать файл скрипта:

    $ sudo vim /tmp/schema_registry.py
  2. Заполнить schema_registry.py текстом скрипта, приведенным ниже.

    schema_registry.py
    # 3rd party library imported
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry import Schema
    
    # import from constants
    from constants import SCHEMA_STR
    
    schema_registry_url = 'http://localhost:8081'
    kafka_topic = 'wikimedia'
    schema_registry_subject = f"{kafka_topic}-value"
    
    def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        latest_version = sr.get_latest_version(schema_registry_subject)
    
        return sr, latest_version
    
    def register_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        schema = Schema(schema_str, schema_type="AVRO")
        schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema)
    
        return schema_id
    
    def update_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        versions_deleted_list = sr.delete_subject(schema_registry_subject)
        print(f"versions of schema deleted list: {versions_deleted_list}")
    
        schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str)
        return schema_id
    
    schema_id = register_schema(schema_registry_url, schema_registry_subject, SCHEMA_STR)
    print(schema_id)
    
    sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    print(latest_version.schema.schema_str)
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/schema_registry.py
  4. Запустить скрипт:

    $ python3 /tmp/schema_registry.py

В результате выводится идентификатор новой записанной схемы и сама схема, зарегистрированная под этим идентификатором:

1
{"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null}]}
ПРИМЕЧАНИЕ

Возвращенный идентификатор используется для извлечения этой схемы из ресурса схем и отличается от версии схемы, связанной с субъектом. Если одна и та же схема зарегистрирована под другим субъектом, будет возвращен тот же идентификатор. Однако версия схемы может быть разной для разных субъектов.

Запись данных в топик Kafka с использованием схемы

ПРИМЕЧАНИЕ

Для автоматического создания топика wikimedia необходимо включить параметр auto.create.topics.enable в группе server.properties при конфигурировании сервиса Kafka.

Для того чтобы записать сериализованные данные в топик Kafka, используется скрипт Python3, выполняющий:

  1. Определение функции delivery_report, которая вызывается при доставке сообщения. Если сообщение не доставлено, регистрируется ошибка. В случае, если сообщение доставлено, регистрируется ключ, значение, топик, партиция и смещение.

  2. Определение функции avro_producer — записи данных в формате Avro:

    • Создание сериализатора Avro с использованием последней зарегистированной схемы.

    • Настройка производителя Kafka с возможностями сериализации, в том числе функций:

      • delivery.timeout.ms — максимальное время, в течение которого производитель может доставить сообщение (включая повторные попытки);

      • enable.idempotence — при установке данного параметра в значение true сообщения создаются ровно один раз и в исходном порядке.

    • Обработка потоковых данных в том виде, как это выполняется в скрипте для получения данных events.py, описанном выше.

  3. Определение функций получения, регистрации и обновления схемы в том виде, как это выполняется в скрипте для регистрации схемы schema_registry.py, описанном выше.

  4. Запуск функции avro_producer.

Для записи в топик wikimedia сообщений о событиях в формате Avro необходимо выполнить:

  1. Cоздать файл скрипта:

    $ sudo vim /tmp/avro_producer.py
  2. Заполнить avro_producer.py текстом скрипта, приведенным ниже.

    avro_producer.py
    import requests
    import json
    
    # 3rd party library imported
    from confluent_kafka import SerializingProducer
    from confluent_kafka.schema_registry import SchemaRegistryClient
    from confluent_kafka.schema_registry.avro import AvroSerializer
    from confluent_kafka.schema_registry import Schema
    
    # import from constants
    from constants import SCHEMA_STR
    
    init_string = 'data: '
    source_url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    kafka_url = 'localhost:9092'
    schema_registry_url = 'http://localhost:8081'
    kafka_topic = 'wikimedia'
    schema_registry_subject = f"{kafka_topic}-value"
    
    def delivery_report(errmsg, msg):
        if errmsg is not None:
            print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
            return
        print('Message: {} unsuccessfully produced to Topic: {} Partition: [{}] at offset {}'.format(msg.key(), msg.topic(), msg.partition(), msg.offset()))
    
    def avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject):
        # schema registry
        sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    
    
        value_avro_serializer = AvroSerializer(schema_registry_client = sr,
                                              schema_str = latest_version.schema.schema_str,
                                              conf={
                                                  'auto.register.schemas': False
                                                }
                                              )
    
        # Kafka Producer
        producer = SerializingProducer({
            'bootstrap.servers': kafka_url,
            'security.protocol': 'plaintext',
            'value.serializer': value_avro_serializer,
            'delivery.timeout.ms': 120000, # set it to 2 mins
            'enable.idempotence': 'true'
        })
    
        s = requests.Session()
    
        with s.get(source_url, headers=None, stream=True) as resp:
            for line in resp.iter_lines():
                if line:
                    decoded_line = line.decode()
                    if decoded_line.find(init_string) >= 0:
                        # remove data: to create a valid json
                        decoded_line = decoded_line.replace(init_string, "")
                        # convert to json
                        decoded_json = json.loads(decoded_line)
    
                        try:
                            # print(decoded_line + '\n')
                            producer.produce(topic=kafka_topic, value=decoded_json, on_delivery=delivery_report)
    
                            # Trigger any available delivery report callbacks from previous produce() calls
                            events_processed = producer.poll(1)
                            print(f"events_processed: {events_processed}")
    
                            messages_in_queue = producer.flush(1)
                            print(f"messages_in_queue: {messages_in_queue}")
                        except Exception as e:
                            print(e)
    
    
    def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        latest_version = sr.get_latest_version(schema_registry_subject)
    
        return sr, latest_version
    
    def register_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        schema = Schema(schema_str, schema_type="AVRO")
        schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema)
    
        return schema_id
    
    def update_schema(schema_registry_url, schema_registry_subject, schema_str):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        versions_deleted_list = sr.delete_subject(schema_registry_subject)
        print(f"versions of schema deleted list: {versions_deleted_list}")
    
        schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str)
        return schema_id
    
    
    avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject)
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/avro_producer.py
  4. Запустить скрипт:

    $ python3 /tmp/avro_producer.py

    В результате выводится сообщение об отсутствии неудачных записей в топик и номер смещения (offset) (показаны данные одного события):

Message: None unsuccessfully produced to Topic: wikimedia Partition: [0] at offset 284
events_processed: 1
messages_in_queue: 0

Чтение сообщений в соответствии со схемой

Для того чтобы прочитать сериализированные данные из топика Kafka, используется скрипт Python3, выполняющий:

  1. Определение функции avro_consumer — чтения данных с использованием схемы формата Avro:

    • Настройка потребителя сообщений.

    • Создание AvroConsumer.

    • Определение формы вывода сообщений.

  2. Запуск потребителя сообщений.

Для чтения из топика wikimedia сообщений о событиях с использованием схемы формата Avro необходимо выполнить:

  1. Cоздать файл скрипта:

    $ sudo nano /tmp/avro_consumer.py
  2. Заполнить avro_consumer.py текстом скрипта, приведенным ниже.

    avro_consumer.py
    import json
    from confluent_kafka import Producer, KafkaException
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka import avro
    
    def avro_consumer():
        consumer_config = {
            "bootstrap.servers": "localhost:9092",
            "schema.registry.url": "http://localhost:8081",
            "group.id": "1",
            "auto.offset.reset": "earliest"
        }
    
        #print(value_schema)
    
        consumer = AvroConsumer(consumer_config)
        consumer.subscribe(['wikimedia'])
    
        while True:
          try:
            msg = consumer.poll(1)
    
            if msg is None:
              continue
    
            print("Value is :" + json.dumps(msg.value()))
            print("-------------------------")
    
          except KafkaException as e:
            print('Kafka failure ' + e)
    
        consumer.close()
    
    def main():
        avro_consumer()
    
    if __name__ == "__main__":
        main()
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/avro_consumer.py
  4. Запустить скрипт:

    $ python3 /tmp/avro_consumer.py

    В результате чтение потока данных выполняется в соответствии с заданной схемой (показаны данные одного события):

-------------------------
Value is :{"bot": false, "comment": "[[:Category:2010s elections in Spain]] added to category", "id": 21173636}
-------------------------
ПРИМЕЧАНИЕ

Чтение с десериализацией данных можно также выполнить при помощи клиента kcat, поддерживающего десериализацию сообщений Avro с использованием Confluent Schema Registry.

Эволюция схемы

Для демонстрации эволюции схемы можно выполнить следующее:

  1. Добавить в constants.py, описанный выше, еще одно поле, например length:

    {\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}

    Скрипт constants.py после изменения выглядит следующим образом:

    SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}]}"
  2. Запустить скрипт schema_registry.py.

    В результате выводится идентификатор новой записанной схемы и сама схема, зарегистрированная под этим идентификатором:

    2
    {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null},{"name":"length","type":["null",{"type":"record","name":"Length","fields":[{"name":"new","type":["null","int"],"default":null},{"name":"old","type":["null","int"],"default":null}]}],"default":null}]}
  3. Поочередно запустить скрипты:

    В результате поток данных считывается по измененной схеме — появилось новое поле length (показаны данные одного события):

Value is :{"bot": false, "comment": "/* wbcreateclaim-create:1| */ [[Property:P279]]: [[Q11173]], [[:toollabs:quickstatements/#/batch/211994|batch #211994]]", "id": 2025996909, "length": {"new": 6175, "old": 5747}}
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней