Пример использования ksqlDB

В статье описан пример управления данными о событиях на сайте (например, Wikipedia) при помощи сервиса ksqlDB.

Запись событий в топик Kafka

Для записи 10 последних событий с сайта можно использовать Python-скрипт, выполняющий:

  1. Определение параметров конфигурации для производителя (producer) Kafka.

  2. Определение параметров конфигурации для работы с Wikipedia API.

  3. Создание производителя (producer) Kafka.

  4. Получение последних изменений из API Wikipedia (данные о последних 10 отредактированных статьях).

  5. Запись событий в топик Kafka в JSON-формате.

  6. Закрытие производителя (producer) Kafka.

ПРИМЕЧАНИЕ

Для работы с данным Python-скриптом необходимо предварительно установить пакеты:

Для записи событий в топик Kafka необходимо:

  1. Cоздать файл скрипта:

    $ sudo vim /tmp/events.py
  2. Заполнить events.py текстом скрипта, приведенным ниже, заменив индивидуальные значения:

    • bootstrap_servers — имя хоста для подключения к серверу Kafka;

    • topic_name — имя топика для записи событий.

      events.py
      import requests
      from kafka import KafkaProducer
      import json
      
      # Kafka producer configuration
      bootstrap_servers = 'sov-test-4.ru-central1.internal:9092'
      topic_name = 'wikipedia-topic'
      
      # Wikipedia API configuration
      api_url = 'https://en.wikipedia.org/w/api.php'
      params = {
          'action': 'query',
          'format': 'json',
          'list': 'recentchanges',
          'rcprop': 'user|title|timestamp',
          'rclimit': '10'
      }
      
      # Create a Kafka producer
      producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                               value_serializer=lambda x: json.dumps(x).encode('utf-8'))
      
      # Fetch recent changes from Wikipedia API and produce messages to Kafka
      response = requests.get(api_url, params=params)
      data = response.json()
      recent_changes = data['query']['recentchanges']
      
      for change in recent_changes:
          event = {
              'user_id': change['user'],
              'page_title': change['title'],
              'event_type': 'edit',
              'timestamp': change['timestamp']
          }
          producer.send(topic_name, value=event)
          print("Produced event:", event)
      
      # Close the Kafka producer
      producer.close()
  3. Дать разрешение на выполнение скрипта:

    $ sudo chmod +x /tmp/events.py
  4. Запустить скрипт:

    $ python /tmp/events.py

    В результате правильного выполнения скрипта выводятся события в JSON-формате.

    Пример записанных событий с сайта
    ('Produced event:', {'page_title': u'User:190.203.61.245', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'})
    ('Produced event:', {'page_title': u'User:178.221.183.209', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'})
    ('Produced event:', {'page_title': u'Carlos Alcaraz career statistics', 'user_id': u'Dddenilson', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:179.6.164.242', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:45.230.251.198', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:143.0.67.78', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'Vera Farmiga on screen and stage', 'user_id': u'ChaitanyaJo', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'Personal health record', 'user_id': u'John of Reading', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'User:189.204.180.128', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'User:TechTwink', 'user_id': u'TechTwink', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})

    Для просмотра данных, записанных в топик Kafka, можно прочитать данные из топика Kafka при помощи командной строки.

Запуск ksqlDB

После установки ksqlDB запуск сервиса выполняется при помощи команды:

$ sudo ksql http://hostname:8088

где hostname — имя хоста , на котором установлен сервис ksqlDB.

Сервер запущен:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v6.2.1, Server v6.2.1 located at http://sov-test-4.ru-central1.internal:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Запуск функций ksqlDB производится в строке ksql>.

Создание потока из топика Kafka

Для манипулирования данными, записанными в топик Kafka, при помощи ksqlDB необходимо создать поток.

Поток (stream) — секционированная, неизменяемая, доступная только для добавления коллекция, которая представляет ряд фактов о событиях или записях.

Запрос для создания потока wikipedia_events для топика Kafka wikipedia-topic выглядит следующим образом:

CREATE STREAM wikipedia_events (\
    user_id VARCHAR,\
    page_title VARCHAR,\
    event_type VARCHAR,\
    timestamp VARCHAR\
) WITH (\
    kafka_topic='wikipedia-topic',\
    value_format='json',\
    key_format='none'\
);

где заданы:

  • наименования столбцов потока, совпадающие c ключами обьекта, описанного в топике, и типы данных SQL, которые будут помещены в столбцы;

  • свойства потока.

Поток создан:

 Message
----------------
 Stream created
----------------

Управление данными потока

При помощи различных запросов можно извлекать данные из созданного потока.

ПРИМЕЧАНИЕ

Для того чтобы топик Kafka был прочитан с начала, а не с самого последнего смещения, можно установить в рабочей строке параметр, определяющий, с какого смещения требуется читать топики:

SET 'auto.offset.reset' = 'earliest';

Без этой установки для отработки приведенных ниже примеров необходимо после запуска функции ksqlDB перезаписать данные в топик, иcпользуемый в потоке (например, заново выполнить скрипт, описанный выше).

Просмотр данных

Просмотреть вcё содержимое потока wikipedia_events можно при помощи запроса с применением оператора SELECT без указания условий выборки:

SELECT * FROM wikipedia_events EMIT CHANGES;

где EMIT CHANGE указывает на то, что следует выводить все происходящие изменения.

Результат запроса приведен ниже:

+-------------------------+--------------------------------+---------------------------+------------------------+
|USER_ID                  |PAGE_TITLE                      |EVENT_TYPE                 |TIMESTAMP               |
+-------------------------+--------------------------------+---------------------------+------------------------+
|M. Armando               |Jânio Quadros                   |edit                       |2023-05-29T10:56:21Z    |
|DepressedPer             |Talk:Bring It Back (Mates of Sta|edit                       |2023-05-29T10:56:20Z    |
|                         |te album)                       |                           |                        |
|193.137.135.5            |Víctor Muñoz                    |edit                       |2023-05-29T10:56:20Z    |
|Devokewater              |Viceroy Research                |edit                       |2023-05-29T10:56:16Z    |
|Browhatwhyamihere        |MG 08                           |edit                       |2023-05-29T10:56:16Z    |
|92.239.205.156           |Category:1946 establishments in |edit                       |2023-05-29T10:56:15Z    |
|                         |South Africa                    |                           |                        |
|92.239.205.156           |80 Air Navigation School SAAF   |edit                       |2023-05-29T10:56:15Z    |
|Ubaid ur Rehman Nisar    |User:Ubaid ur Rehman Nisar      |edit                       |2023-05-29T10:56:15Z    |
|Οἶδα                     |2023 Cannes Film Festival       |edit                       |2023-05-29T10:56:14Z    |
|Super Dromaeosaurus      |Bujoreni, Teleorman             |edit                       |2023-05-29T10:56:14Z    |

Если еще раз запустить скрипт, описанный выше, для того же топика, можно увидеть, что в поток поступают новые данные.

Подсчет событий

Для вывода количества редакций статей из выборки, записанной в топик, можно использовать операторы SELECT и COUNT c сохранением результатов в новом столбце edit_count.

Для наглядности можно расширить количество записей в топик Kafka, изменив в скрипте количество возвращаемых изменений ('rclimit') до 100 и более, и заново запустить скрипт.

Запрос для подсчета количества редакций последних 10 статей в потоке wikipedia-events и сохранения результатов в новом столбце edit_count выглядит следующим образом:

SELECT page_title, COUNT(*) AS edit_count
FROM wikipedia_events
GROUP BY page_title
EMIT CHANGES
LIMIT 10;

где применена функция GROUP BY.

Результат запроса приведен ниже:

+--------------------------------------+--------------------------------------+
|PAGE_TITLE                            |EDIT_COUNT                            |
+--------------------------------------+--------------------------------------+
|Category:Redirects from long names    |1                                     |
|Science-fiction: The Gernsback Years :|1                                     |
| a Complete Coverage of the Genre Maga|                                      |
|zines ... from 1926 Through 1936      |                                      |
|Ben Anderson (journalist)             |1                                     |
|User talk:175.142.65.166              |1                                     |
|Wikipedia:Articles for creation/Redire|1                                     |
|cts and categories                    |                                      |
|2023 North Indian Ocean cyclone season|1                                     |
|Category:Accepted AfC submissions     |6                                     |
|Category:AfC submissions by date/06 Ju|6                                     |
|ne 2023                               |                                      |
|Category:Redirect-Class AfC articles  |6                                     |
|Category:Unprintworthy redirects      |1                                     |
Limit Reached

Выборка по названию

Для осуществления выборки статей, в названии которых содержится Category:, используется оператор SELECT и функция WHERE.

SELECT page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;

Результат запроса приведен ниже:

+--------------------------------------------------------+--------------------------------------------------------+
|PAGE_TITLE                                              |TIMESTAMP                                               |
+--------------------------------------------------------+--------------------------------------------------------+
|Category:Redirects from long names                      |2023-06-01T07:21:57Z                                    |
|Category:Accepted AfC submissions                       |2023-06-01T07:21:57Z                                    |
|Category:AfC submissions by date/06 June 2023           |2023-06-01T07:21:57Z                                    |
|Category:Redirect-Class AfC articles                    |2023-06-01T07:21:58Z                                    |
|Category:Unprintworthy redirects                        |2023-06-01T07:21:58Z                                    |

Запись из ksqlDB в топик Kafka

Для записи последней выборки в новый топик Kafka new_topic в формате JSON можно создать новый поток wikipedia_new:

CREATE STREAM wikipedia_new
   WITH ( kafka_topic='new_topic', value_format='json')
   AS SELECT
   page_title,
   timestamp
   FROM wikipedia_events
   WHERE page_title LIKE 'Category:%'
   EMIT CHANGES;

Поток создан:

 Message
----------------------------------------------
 Created query with ID CSAS_WIKIPEDIA_NEW_191
----------------------------------------------

В строке ksql> также возможно запустить команду для чтения сообщений, записанных в топик new_topic:

print new_topic;

Результат запроса приведен ниже:

Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirects from long names","TIMEST
AMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Accepted AfC submissions","T
IMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:AfC submissions by date/06 June 2023","TIMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirect-Class AfC articles","TIMESTAMP":"2023-06-01T07:21:58Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Unprintworthy redirects","TIMESTAMP":"2023-06-01T07:21:58Z "}, partition: 0

В топик записаны сообщения в соответствии с выполненной выборкой.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней