Пример использования ksqlDB
В статье описан пример управления данными о событиях на сайте (например, Wikipedia) при помощи сервиса ksqlDB.
Запись событий в топик Kafka
Для записи 10 последних событий с сайта можно использовать Python-скрипт, выполняющий:
-
Определение параметров конфигурации для производителя (producer) Kafka.
-
Определение параметров конфигурации для работы с Wikipedia API.
-
Создание производителя (producer) Kafka.
-
Получение последних изменений из API Wikipedia (данные о последних 10 отредактированных статьях).
-
Запись событий в топик Kafka в JSON-формате.
-
Закрытие производителя (producer) Kafka.
ПРИМЕЧАНИЕ
Для работы с данным Python-скриптом необходимо предварительно установить пакеты: |
Для записи событий в топик Kafka необходимо:
-
Cоздать файл скрипта:
$ sudo vim /tmp/events.py
-
Заполнить events.py текстом скрипта, приведенным ниже, заменив индивидуальные значения:
-
bootstrap_servers
— имя хоста для подключения к серверу Kafka; -
topic_name
— имя топика для записи событий.events.pyimport requests from kafka import KafkaProducer import json # Kafka producer configuration bootstrap_servers = 'sov-test-4.ru-central1.internal:9092' topic_name = 'wikipedia-topic' # Wikipedia API configuration api_url = 'https://en.wikipedia.org/w/api.php' params = { 'action': 'query', 'format': 'json', 'list': 'recentchanges', 'rcprop': 'user|title|timestamp', 'rclimit': '10' } # Create a Kafka producer producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda x: json.dumps(x).encode('utf-8')) # Fetch recent changes from Wikipedia API and produce messages to Kafka response = requests.get(api_url, params=params) data = response.json() recent_changes = data['query']['recentchanges'] for change in recent_changes: event = { 'user_id': change['user'], 'page_title': change['title'], 'event_type': 'edit', 'timestamp': change['timestamp'] } producer.send(topic_name, value=event) print("Produced event:", event) # Close the Kafka producer producer.close()
-
-
Дать разрешение на выполнение скрипта:
$ sudo chmod +x /tmp/events.py
-
Запустить скрипт:
$ python /tmp/events.py
В результате правильного выполнения скрипта выводятся события в JSON-формате.
Пример записанных событий с сайта('Produced event:', {'page_title': u'User:190.203.61.245', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'}) ('Produced event:', {'page_title': u'User:178.221.183.209', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'}) ('Produced event:', {'page_title': u'Carlos Alcaraz career statistics', 'user_id': u'Dddenilson', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:179.6.164.242', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:45.230.251.198', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:143.0.67.78', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'Vera Farmiga on screen and stage', 'user_id': u'ChaitanyaJo', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'Personal health record', 'user_id': u'John of Reading', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'User:189.204.180.128', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'User:TechTwink', 'user_id': u'TechTwink', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
Для просмотра данных, записанных в топик Kafka, можно прочитать данные из топика Kafka при помощи командной строки.
Запуск ksqlDB
После установки ksqlDB запуск сервиса выполняется при помощи команды:
$ sudo ksql http://hostname:8088
где hostname
— имя хоста , на котором установлен сервис ksqlDB.
Сервер запущен:
=========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2021 Confluent Inc. CLI v6.2.1, Server v6.2.1 located at http://sov-test-4.ru-central1.internal:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
Запуск функций ksqlDB производится в строке ksql>
.
Создание потока из топика Kafka
Для манипулирования данными, записанными в топик Kafka, при помощи ksqlDB необходимо создать поток.
Поток (stream) — секционированная, неизменяемая, доступная только для добавления коллекция, которая представляет ряд фактов о событиях или записях.
Запрос для создания потока wikipedia_events
для топика Kafka wikipedia-topic
выглядит следующим образом:
CREATE STREAM wikipedia_events (\
user_id VARCHAR,\
page_title VARCHAR,\
event_type VARCHAR,\
timestamp VARCHAR\
) WITH (\
kafka_topic='wikipedia-topic',\
value_format='json',\
key_format='none'\
);
где заданы:
Поток создан:
Message ---------------- Stream created ----------------
Управление данными потока
При помощи различных запросов можно извлекать данные из созданного потока.
ПРИМЕЧАНИЕ
Для того чтобы топик Kafka был прочитан с начала, а не с самого последнего смещения, можно установить в рабочей строке параметр, определяющий, с какого смещения требуется читать топики:
Без этой установки для отработки приведенных ниже примеров необходимо после запуска функции ksqlDB перезаписать данные в топик, иcпользуемый в потоке (например, заново выполнить скрипт, описанный выше). |
Просмотр данных
Просмотреть вcё содержимое потока wikipedia_events
можно при помощи запроса с применением оператора SELECT без указания условий выборки:
SELECT * FROM wikipedia_events EMIT CHANGES;
где EMIT CHANGE
указывает на то, что следует выводить все происходящие изменения.
Результат запроса приведен ниже:
+-------------------------+--------------------------------+---------------------------+------------------------+ |USER_ID |PAGE_TITLE |EVENT_TYPE |TIMESTAMP | +-------------------------+--------------------------------+---------------------------+------------------------+ |M. Armando |Jânio Quadros |edit |2023-05-29T10:56:21Z | |DepressedPer |Talk:Bring It Back (Mates of Sta|edit |2023-05-29T10:56:20Z | | |te album) | | | |193.137.135.5 |Víctor Muñoz |edit |2023-05-29T10:56:20Z | |Devokewater |Viceroy Research |edit |2023-05-29T10:56:16Z | |Browhatwhyamihere |MG 08 |edit |2023-05-29T10:56:16Z | |92.239.205.156 |Category:1946 establishments in |edit |2023-05-29T10:56:15Z | | |South Africa | | | |92.239.205.156 |80 Air Navigation School SAAF |edit |2023-05-29T10:56:15Z | |Ubaid ur Rehman Nisar |User:Ubaid ur Rehman Nisar |edit |2023-05-29T10:56:15Z | |Οἶδα |2023 Cannes Film Festival |edit |2023-05-29T10:56:14Z | |Super Dromaeosaurus |Bujoreni, Teleorman |edit |2023-05-29T10:56:14Z |
Если еще раз запустить скрипт, описанный выше, для того же топика, можно увидеть, что в поток поступают новые данные.
Подсчет событий
Для вывода количества редакций статей из выборки, записанной в топик, можно использовать операторы SELECT и COUNT c сохранением результатов в новом столбце edit_count
.
Для наглядности можно расширить количество записей в топик Kafka, изменив в скрипте количество возвращаемых изменений ('rclimit'
) до 100 и более, и заново запустить скрипт.
Запрос для подсчета количества редакций последних 10 статей в потоке wikipedia-events
и сохранения результатов в новом столбце edit_count
выглядит следующим образом:
SELECT page_title, COUNT(*) AS edit_count
FROM wikipedia_events
GROUP BY page_title
EMIT CHANGES
LIMIT 10;
где применена функция GROUP BY.
Результат запроса приведен ниже:
+--------------------------------------+--------------------------------------+ |PAGE_TITLE |EDIT_COUNT | +--------------------------------------+--------------------------------------+ |Category:Redirects from long names |1 | |Science-fiction: The Gernsback Years :|1 | | a Complete Coverage of the Genre Maga| | |zines ... from 1926 Through 1936 | | |Ben Anderson (journalist) |1 | |User talk:175.142.65.166 |1 | |Wikipedia:Articles for creation/Redire|1 | |cts and categories | | |2023 North Indian Ocean cyclone season|1 | |Category:Accepted AfC submissions |6 | |Category:AfC submissions by date/06 Ju|6 | |ne 2023 | | |Category:Redirect-Class AfC articles |6 | |Category:Unprintworthy redirects |1 | Limit Reached
Выборка по названию
Для осуществления выборки статей, в названии которых содержится Category:
, используется оператор SELECT и функция WHERE.
SELECT page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;
Результат запроса приведен ниже:
+--------------------------------------------------------+--------------------------------------------------------+ |PAGE_TITLE |TIMESTAMP | +--------------------------------------------------------+--------------------------------------------------------+ |Category:Redirects from long names |2023-06-01T07:21:57Z | |Category:Accepted AfC submissions |2023-06-01T07:21:57Z | |Category:AfC submissions by date/06 June 2023 |2023-06-01T07:21:57Z | |Category:Redirect-Class AfC articles |2023-06-01T07:21:58Z | |Category:Unprintworthy redirects |2023-06-01T07:21:58Z |
Запись из ksqlDB в топик Kafka
Для записи последней выборки в новый топик Kafka new_topic
в формате JSON можно создать новый поток wikipedia_new
:
CREATE STREAM wikipedia_new
WITH ( kafka_topic='new_topic', value_format='json')
AS SELECT
page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;
Поток создан:
Message ---------------------------------------------- Created query with ID CSAS_WIKIPEDIA_NEW_191 ----------------------------------------------
В строке ksql>
также возможно запустить команду для чтения сообщений, записанных в топик new_topic
:
print new_topic;
Результат запроса приведен ниже:
Key format: ¯\_(ツ)_/¯ - no data processed Value format: JSON or KAFKA_STRING rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirects from long names","TIMEST AMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Accepted AfC submissions","T IMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:AfC submissions by date/06 June 2023","TIMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirect-Class AfC articles","TIMESTAMP":"2023-06-01T07:21:58Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Unprintworthy redirects","TIMESTAMP":"2023-06-01T07:21:58Z "}, partition: 0
В топик записаны сообщения в соответствии с выполненной выборкой.