Пример использования Schema Registry
В статье описан пример использования сервиса Schema Registry — сериализация данных, записываемых в Kafka, при помощи формата Avro и чтение данных с использованием созданной схемы.
Основные понятия, используемые в Schema Registry:
-
Сериализация — перевод из формата-структуры, семантически понятной человеку, в двоичное представление для машинной обработки.
-
Схема — структура формата данных, подвергнутых сериализации.
-
Cубъект — область, в которой могут развиваться схемы.
Для работы со схемами Avro в Kafka в данной статье используются скрипты для Python3 с применением библиотеки confluent_kafka API.
В описанном в данной статье примере запуск Python-скриптов производится на одном из хостов с предустановленными сервисами Kafka и Schema Registry кластера ADS.
Подготовка к работе
ПРИМЕЧАНИЕ
В данном разделе все команды приведены как справочная информация для ОС Centos 7. Для установки программных пакетов в других ОС необходимо обратиться к документации соответстующей ОС. |
Установка Python3
Перед установкой Python3 необходимо предварительно установить:
-
пакеты для работы с библиотеками в Python:
$ sudo yum install gcc yum-utils zlib-devel python-tools cmake git pkgconfig -y --skip-broken
-
пакет Development Tools:
$ sudo yum groupinstall -y "Development Tools" --skip-broken
-
библиотеки для работы с SSL:
$ sudo yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel zlib* libffi-devel readline-devel tk-devel --skip-broken
-
библиотеку для работы со сжатыми файлами LZMA и XZ:
$ yum install -y xz-devel python-backports-lzma
-
утилиту
wget
, если отсутствует:$ sudo yum install wget
Загрузить и установить необходимую версию Python3 можно при помощи команд в следующей последовательности:
$ cd /usr/src $ wget https://www.python.org/ftp/python/3.9.1/Python-3.9.1.tgz $ tar xvzf Python-3.9.1.tgz $ cd Python-3.9.1 $ sudo ./configure $ sudo make install
Проверить версию Python3 можно при помощи команды:
$ python3 --version
Установка библиотек
Установить библиотеку confluent_kafka API можно при помощи команды:
$ pip3 install confluent_kafka
Установить библиотеку requests можно при помощи команды:
$ pip3 install requests
Установить остальные необходимые библиотеки можно при помощи команд:
$ pip3 install fastavro
$ pip3 install urllib3==1.26.6
$ pip3 install avro
$ pip3 install backports.lzma
ПРИМЕЧАНИЕ
При запуске скриптов Python3 существует вероятность возникновения ошибки
|
Если после установки всех библиотек при запуске скрипта появляется ошибка ModuleNotFoundError: No module named '_lzma'
, необходимо выполнить:
-
Открыть файл /usr/local/lib/python3.9/lzma.py.
-
Изменить строки:
from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties +
на строки:
try: from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties except ImportError: from backports.lzma import * from backports.lzma import _encode_filter_properties, _decode_filter_properties
Получение данных
В примере в качестве источника данных используются потоки событий Wikimedia.
Для того чтобы получить данные, на основе которых будет создана схема Avro, используется скрипт Python3, выполняющий:
-
Запись 10 событий.
-
Удаление
data:
. -
Декодирование строки с данными в UTF-8.
-
Преобразование данных в действительный объект JSON.
Для получения событий необходимо:
-
Cоздать файл скрипта:
$ sudo vim /tmp/events.py
-
Заполнить events.py текстом скрипта, приведенным ниже.
events.pyimport requests import json init_string = 'data: ' source_url = 'https://stream.wikimedia.org/v2/stream/recentchange' def avro_producer(source_url): s = requests.Session() with s.get(source_url, headers=None, stream=True) as resp: times = 10 for line in resp.iter_lines(): if line: decoded_line = line.decode() if decoded_line.find(init_string) >= 0: print(str(11 - times) + ")") # remove data: to create a valid json decoded_line = decoded_line.replace(init_string, "") # convert to json decoded_json = json.loads(decoded_line) print(decoded_json) times -= 1 if times <= 0: break avro_producer(source_url)
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/events.py
-
Запустить скрипт:
$ python3 /tmp/events.py
Результат приведен ниже. Показаны данные одного события.
{'$schema': '/mediawiki/recentchange/1.0.0', 'meta': {'uri': 'https://www.wikidata.org/wiki/Q91975510', 'request_id': '6d958229-0f1f-459b-b12b-e2724d816f1c', 'id': 'ec50d547-adfb-4cc5-beda-c3f8251d646c', 'dt': '2023-08-30T14:51:32Z', 'domain': 'www.wikidata.org', 'stream': 'mediawiki.recentchange', 'topic': 'eqiad.mediawiki.recentchange', 'partition': 0, 'offset': 4903433673}, 'id': 2024634666, 'type': 'edit', 'namespace': 0, 'title': 'Q91975510', 'title_url': 'https://www.wikidata.org/wiki/Q91975510', 'comment': '/* wbeditentity-update-languages-short:0||an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo */ BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo', 'timestamp': 1693407092, 'user': 'Emijrpbot', 'bot': True, 'notify_url': 'https://www.wikidata.org/w/index.php?diff=1964666701&oldid=1869851195&rcid=2024634666', 'minor': False, 'patrolled': True, 'length': {'old': 13772, 'new': 15400}, 'revision': {'old': 1869851195, 'new': 1964666701}, 'server_url': 'https://www.wikidata.org', 'server_name': 'www.wikidata.org', 'server_script_path': '/w', 'wiki': 'wikidatawiki', 'parsedcomment': '\u200e<span dir="auto"><span class="autocomment">Changed label, description and/or aliases in an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo: </span></span> BOT - Adding labels (21 languages): an, ast, ca, de, eo, es, eu, ext, fr, ga, gl, ia, ie, io, it, la, oc, pl, pt, pt-br, vo'}
Создание и регистрация схемы AVRO
Создание схемы
На основании полученных данных необходимо создать схему в одном из поддерживаемых Schema Registry форматов сериализации, в данном случае формате Avro.
Схема может быть создана вручную, в соответствии с требованиями к формату Avro. Также можно использовать специальные сайты, например Data tools.AVRO SCHEMA.
Для данного примера схема создана с использованием лишь нескольких использующихся в событии полей (fields
). При этом можно добавить поля в качестве демонстрации изменения схемы.
После создания схему необходимо внести в специальный скрипт, к которому будет обращаться Schema Registry при регистрации схемы. Для этого необходимо выполнить:
-
Cоздать файл скрипта:
$ sudo vim /tmp/constants.py
-
Заполнить constants.py созданной схемой Avro, как показано ниже.
SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/constants.py
Регистрация схемы
Для того чтобы зарегистрировать схему в Schema Registry, используется скрипт Python3, выполняющий:
-
Указание наименования топика Kafka.
-
Указание имени субьекта схем — к наименованию топика добавляется суффикс
-value
(в данном примере создается схема для значений, а не для ключей). -
Cоздание экземпляра клиента Schema Registry.
-
Определение функции получения последней версии схемы для субъекта
wikimedia-value
. -
Определение функции регистрации в Schema Registry новой схемы AVRO с использованием данных скрипта constants.py и возвращение идентификатора новой схемы.
-
Определение функции удаления старой схемы и регистрации новой схемы для субьекта.
-
Запуск функции регистрации схемы.
-
Отображение последней зарегистрированной схемы.
Для регистрации схемы для субъекта wikimedia-value
необходимо:
-
Cоздать файл скрипта:
$ sudo vim /tmp/schema_registry.py
-
Заполнить schema_registry.py текстом скрипта, приведенным ниже.
schema_registry.py# 3rd party library imported from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry import Schema # import from constants from constants import SCHEMA_STR schema_registry_url = 'http://localhost:8081' kafka_topic = 'wikimedia' schema_registry_subject = f"{kafka_topic}-value" def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject): sr = SchemaRegistryClient({'url': schema_registry_url}) latest_version = sr.get_latest_version(schema_registry_subject) return sr, latest_version def register_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) schema = Schema(schema_str, schema_type="AVRO") schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema) return schema_id def update_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) versions_deleted_list = sr.delete_subject(schema_registry_subject) print(f"versions of schema deleted list: {versions_deleted_list}") schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str) return schema_id schema_id = register_schema(schema_registry_url, schema_registry_subject, SCHEMA_STR) print(schema_id) sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject) print(latest_version.schema.schema_str)
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/schema_registry.py
-
Запустить скрипт:
$ python3 /tmp/schema_registry.py
В результате выводится идентификатор новой записанной схемы и сама схема, зарегистрированная под этим идентификатором:
1 {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null}]}
ПРИМЕЧАНИЕ
Возвращенный идентификатор используется для извлечения этой схемы из ресурса схем и отличается от версии схемы, связанной с субъектом. Если одна и та же схема зарегистрирована под другим субъектом, будет возвращен тот же идентификатор. Однако версия схемы может быть разной для разных субъектов. |
Запись данных в топик Kafka с использованием схемы
ПРИМЕЧАНИЕ
Для автоматического создания топика |
Для того чтобы записать сериализованные данные в топик Kafka, используется скрипт Python3, выполняющий:
-
Определение функции
delivery_report
, которая вызывается при доставке сообщения. Если сообщение не доставлено, регистрируется ошибка. В случае, если сообщение доставлено, регистрируется ключ, значение, топик, партиция и смещение. -
Определение функции
avro_producer
— записи данных в формате Avro:-
Создание сериализатора Avro с использованием последней зарегистированной схемы.
-
Настройка производителя Kafka с возможностями сериализации, в том числе функций:
-
delivery.timeout.ms
— максимальное время, в течение которого производитель может доставить сообщение (включая повторные попытки); -
enable.idempotence
— при установке данного параметра в значениеtrue
сообщения создаются ровно один раз и в исходном порядке.
-
-
Обработка потоковых данных в том виде, как это выполняется в скрипте для получения данных events.py, описанном выше.
-
-
Определение функций получения, регистрации и обновления схемы в том виде, как это выполняется в скрипте для регистрации схемы schema_registry.py, описанном выше.
-
Запуск функции
avro_producer
.
Для записи в топик wikimedia
сообщений о событиях в формате Avro необходимо выполнить:
-
Cоздать файл скрипта:
$ sudo vim /tmp/avro_producer.py
-
Заполнить avro_producer.py текстом скрипта, приведенным ниже.
avro_producer.pyimport requests import json # 3rd party library imported from confluent_kafka import SerializingProducer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.schema_registry import Schema # import from constants from constants import SCHEMA_STR init_string = 'data: ' source_url = 'https://stream.wikimedia.org/v2/stream/recentchange' kafka_url = 'localhost:9092' schema_registry_url = 'http://localhost:8081' kafka_topic = 'wikimedia' schema_registry_subject = f"{kafka_topic}-value" def delivery_report(errmsg, msg): if errmsg is not None: print("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) return print('Message: {} unsuccessfully produced to Topic: {} Partition: [{}] at offset {}'.format(msg.key(), msg.topic(), msg.partition(), msg.offset())) def avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject): # schema registry sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject) value_avro_serializer = AvroSerializer(schema_registry_client = sr, schema_str = latest_version.schema.schema_str, conf={ 'auto.register.schemas': False } ) # Kafka Producer producer = SerializingProducer({ 'bootstrap.servers': kafka_url, 'security.protocol': 'plaintext', 'value.serializer': value_avro_serializer, 'delivery.timeout.ms': 120000, # set it to 2 mins 'enable.idempotence': 'true' }) s = requests.Session() with s.get(source_url, headers=None, stream=True) as resp: for line in resp.iter_lines(): if line: decoded_line = line.decode() if decoded_line.find(init_string) >= 0: # remove data: to create a valid json decoded_line = decoded_line.replace(init_string, "") # convert to json decoded_json = json.loads(decoded_line) try: # print(decoded_line + '\n') producer.produce(topic=kafka_topic, value=decoded_json, on_delivery=delivery_report) # Trigger any available delivery report callbacks from previous produce() calls events_processed = producer.poll(1) print(f"events_processed: {events_processed}") messages_in_queue = producer.flush(1) print(f"messages_in_queue: {messages_in_queue}") except Exception as e: print(e) def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject): sr = SchemaRegistryClient({'url': schema_registry_url}) latest_version = sr.get_latest_version(schema_registry_subject) return sr, latest_version def register_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) schema = Schema(schema_str, schema_type="AVRO") schema_id = sr.register_schema(subject_name=schema_registry_subject, schema=schema) return schema_id def update_schema(schema_registry_url, schema_registry_subject, schema_str): sr = SchemaRegistryClient({'url': schema_registry_url}) versions_deleted_list = sr.delete_subject(schema_registry_subject) print(f"versions of schema deleted list: {versions_deleted_list}") schema_id = register_schema(schema_registry_url, schema_registry_subject, schema_str) return schema_id avro_producer(source_url, kafka_url, schema_registry_url, schema_registry_subject)
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/avro_producer.py
-
Запустить скрипт:
$ python3 /tmp/avro_producer.py
В результате выводится сообщение об отсутствии неудачных записей в топик и номер смещения (
offset
) (показаны данные одного события):
Message: None unsuccessfully produced to Topic: wikimedia Partition: [0] at offset 284 events_processed: 1 messages_in_queue: 0
Чтение сообщений в соответствии со схемой
Для того чтобы прочитать сериализированные данные из топика Kafka, используется скрипт Python3, выполняющий:
-
Определение функции
avro_consumer
— чтения данных с использованием схемы формата Avro:-
Настройка потребителя сообщений.
-
Создание AvroConsumer.
-
Определение формы вывода сообщений.
-
-
Запуск потребителя сообщений.
Для чтения из топика wikimedia
сообщений о событиях с использованием схемы формата Avro необходимо выполнить:
-
Cоздать файл скрипта:
$ sudo nano /tmp/avro_consumer.py
-
Заполнить avro_consumer.py текстом скрипта, приведенным ниже.
avro_consumer.pyimport json from confluent_kafka import Producer, KafkaException from confluent_kafka.avro import AvroConsumer from confluent_kafka import avro def avro_consumer(): consumer_config = { "bootstrap.servers": "localhost:9092", "schema.registry.url": "http://localhost:8081", "group.id": "1", "auto.offset.reset": "earliest" } #print(value_schema) consumer = AvroConsumer(consumer_config) consumer.subscribe(['wikimedia']) while True: try: msg = consumer.poll(1) if msg is None: continue print("Value is :" + json.dumps(msg.value())) print("-------------------------") except KafkaException as e: print('Kafka failure ' + e) consumer.close() def main(): avro_consumer() if __name__ == "__main__": main()
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/avro_consumer.py
-
Запустить скрипт:
$ python3 /tmp/avro_consumer.py
В результате чтение потока данных выполняется в соответствии с заданной схемой (показаны данные одного события):
------------------------- Value is :{"bot": false, "comment": "[[:Category:2010s elections in Spain]] added to category", "id": 21173636} -------------------------
ПРИМЕЧАНИЕ
Чтение с десериализацией данных можно также выполнить при помощи клиента kcat, поддерживающего десериализацию сообщений Avro с использованием Confluent Schema Registry. |
Эволюция схемы
Для демонстрации эволюции схемы можно выполнить следующее:
-
Добавить в constants.py, описанный выше, еще одно поле, например
length
:{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}
Скрипт constants.py после изменения выглядит следующим образом:
SCHEMA_STR="{\"type\":\"record\",\"name\":\"value_wikimedia\",\"namespace\":\"wikimedia\",\"fields\":[{\"name\":\"bot\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Length\",\"fields\":[{\"name\":\"new\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"old\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}]}"
-
Запустить скрипт schema_registry.py.
В результате выводится идентификатор новой записанной схемы и сама схема, зарегистрированная под этим идентификатором:
2 {"type":"record","name":"value_wikimedia","namespace":"wikimedia","fields":[{"name":"bot","type":["null","boolean"],"default":null},{"name":"comment","type":["null","string"],"default":null},{"name":"id","type":["null","int"],"default":null},{"name":"length","type":["null",{"type":"record","name":"Length","fields":[{"name":"new","type":["null","int"],"default":null},{"name":"old","type":["null","int"],"default":null}]}],"default":null}]}
-
Поочередно запустить скрипты:
В результате поток данных считывается по измененной схеме — появилось новое поле
length
(показаны данные одного события):
Value is :{"bot": false, "comment": "/* wbcreateclaim-create:1| */ [[Property:P279]]: [[Q11173]], [[:toollabs:quickstatements/#/batch/211994|batch #211994]]", "id": 2025996909, "length": {"new": 6175, "old": 5747}}