KsqlDB usage example

The article describes an example of managing event data on a site (for example Wikipedia) using the ksqlDB service.

Write events to a Kafka topic

To write to Kafka the last 10 events from the site, you can use a Python script that:

  1. Sets the configuration options for the Kafka producer.

  2. Sets the configuration parameters to work with the Wikipedia API.

  3. Creates a Kafka producer.

  4. Gets the latest changes from the Wikipedia API (data about the last 10 edited articles).

  5. Writes the events to a Kafka topic in the JSON format.

  6. Closes the Kafka producer.

NOTE

To work with this Python script, you need first install the packages:

To write events to a Kafka topic, you need to:

  1. Create a script file:

    $ sudo vim /tmp/events.py
  2. Fill events.py with the script text below, replacing individual values:

    • bootstrap_servers — hostname to connect to the Kafka server;

    • topic_name — name of the topic to write events.

      events.py
      import requests
      from kafka import KafkaProducer
      import json
      
      # Kafka producer configuration
      bootstrap_servers = 'sov-test-4.ru-central1.internal:9092'
      topic_name = 'wikipedia-topic'
      
      # Wikipedia API configuration
      api_url = 'https://en.wikipedia.org/w/api.php'
      params = {
          'action': 'query',
          'format': 'json',
          'list': 'recentchanges',
          'rcprop': 'user|title|timestamp',
          'rclimit': '10'
      }
      
      # Create a Kafka producer
      producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                               value_serializer=lambda x: json.dumps(x).encode('utf-8'))
      
      # Fetch recent changes from Wikipedia API and produce messages to Kafka
      response = requests.get(api_url, params=params)
      data = response.json()
      recent_changes = data['query']['recentchanges']
      
      for change in recent_changes:
          event = {
              'user_id': change['user'],
              'page_title': change['title'],
              'event_type': 'edit',
              'timestamp': change['timestamp']
          }
          producer.send(topic_name, value=event)
          print("Produced event:", event)
      
      # Close the Kafka producer
      producer.close()
  3. Give permission to execute the script:

    $ sudo chmod +x /tmp/events.py
  4. Run the script:

    $ python /tmp/events.py

    As a result of the correct execution of the script, events are displayed in the JSON format.

    Example of recorded events from the site
    ('Produced event:', {'page_title': u'User:190.203.61.245', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'})
    ('Produced event:', {'page_title': u'User:178.221.183.209', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'})
    ('Produced event:', {'page_title': u'Carlos Alcaraz career statistics', 'user_id': u'Dddenilson', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:179.6.164.242', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:45.230.251.198', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'User:143.0.67.78', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'})
    ('Produced event:', {'page_title': u'Vera Farmiga on screen and stage', 'user_id': u'ChaitanyaJo', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'Personal health record', 'user_id': u'John of Reading', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'User:189.204.180.128', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
    ('Produced event:', {'page_title': u'User:TechTwink', 'user_id': u'TechTwink', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})

    To view data written to a Kafka topic, you can read data from a Kafka topic using the command line.

Start ksqlDB

After installing ksqlDB, the service is started using the command:

$ sudo ksql http://hostname:8088

where hostname is the name of the host where the ksqlDB service is installed.

Server started:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v6.2.1, Server v6.2.1 located at http://sov-test-4.ru-central1.internal:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

The ksqlDB functions are invoked in the ksql> line.

Create a stream from a Kafka topic

To manipulate data written to a Kafka topic using ksqlDB, you need to create a stream.

A stream is a partitioned, immutable, append-only collection that represents a set of facts about events or records.

The request for creating a wikipedia_events stream for the Kafka topic wikipedia-topic is as follows:

CREATE STREAM wikipedia_events (\
    user_id VARCHAR,\
    page_title VARCHAR,\
    event_type VARCHAR,\
    timestamp VARCHAR\
) WITH (\
    kafka_topic='wikipedia-topic',\
    value_format='json',\
    key_format='none'\
);

where are defined:

  • names of the stream columns that match the keys of the object described in the topic and the SQL data types that will be placed in columns;

  • stream properties.

Stream is created:

 Message
----------------
 Stream created
----------------

Stream data management

With various queries you can retrieve data from the created stream.

NOTE

In order for a Kafka topic to be read from the beginning, and not from the last offset, you can set a parameter on the work line that determines from which offset you want to read topics:

SET 'auto.offset.reset' = 'earliest';

Without this setting, to work out the examples below, after running the ksqlDB function, it is necessary to overwrite the data in the topic used in the thread (for example, re-execute the script described by above).

View data

You can view the entire contents of the wikipedia_events stream via a query using the SELECT operator without specifying selection conditions:

SELECT * FROM wikipedia_events EMIT CHANGES;

where EMIT CHANGE indicates that any changes that occur should be emitted.

The query result is shown below:

+-------------------------+--------------------------------+---------------------------+------------------------+
|USER_ID                  |PAGE_TITLE                      |EVENT_TYPE                 |TIMESTAMP               |
+-------------------------+--------------------------------+---------------------------+------------------------+
|M. Armando               |Jânio Quadros                   |edit                       |2023-05-29T10:56:21Z    |
|DepressedPer             |Talk:Bring It Back (Mates of Sta|edit                       |2023-05-29T10:56:20Z    |
|                         |te album)                       |                           |                        |
|193.137.135.5            |Víctor Muñoz                    |edit                       |2023-05-29T10:56:20Z    |
|Devokewater              |Viceroy Research                |edit                       |2023-05-29T10:56:16Z    |
|Browhatwhyamihere        |MG 08                           |edit                       |2023-05-29T10:56:16Z    |
|92.239.205.156           |Category:1946 establishments in |edit                       |2023-05-29T10:56:15Z    |
|                         |South Africa                    |                           |                        |
|92.239.205.156           |80 Air Navigation School SAAF   |edit                       |2023-05-29T10:56:15Z    |
|Ubaid ur Rehman Nisar    |User:Ubaid ur Rehman Nisar      |edit                       |2023-05-29T10:56:15Z    |
|Οἶδα                     |2023 Cannes Film Festival       |edit                       |2023-05-29T10:56:14Z    |
|Super Dromaeosaurus      |Bujoreni, Teleorman             |edit                       |2023-05-29T10:56:14Z    |

If you run the script described by above again for the same topic, you can see that new data is coming into the stream.

Count events

To display the number of revisions of articles from a selection recorded in a topic, you can use the operators SELECT and COUNT saving the results in a new edit_count column.

For clarity, you can expand the number of records in the Kafka topic by changing the number of returned changes ('rclimit') in the script to 100 or more, and re-run the script.

The query to count the number of revisions of the last 10 articles in the wikipedia-events stream and store the results in a new edit_count column looks like this:

SELECT page_title, COUNT(*) AS edit_count
FROM wikipedia_events
GROUP BY page_title
EMIT CHANGES
LIMIT 10;

where the GROUP BY function is applied.

The query result is shown below:

+--------------------------------------+--------------------------------------+
|PAGE_TITLE                            |EDIT_COUNT                            |
+--------------------------------------+--------------------------------------+
|Category:Redirects from long names    |1                                     |
|Science-fiction: The Gernsback Years :|1                                     |
| a Complete Coverage of the Genre Maga|                                      |
|zines ... from 1926 Through 1936      |                                      |
|Ben Anderson (journalist)             |1                                     |
|User talk:175.142.65.166              |1                                     |
|Wikipedia:Articles for creation/Redire|1                                     |
|cts and categories                    |                                      |
|2023 North Indian Ocean cyclone season|1                                     |
|Category:Accepted AfC submissions     |6                                     |
|Category:AfC submissions by date/06 Ju|6                                     |
|ne 2023                               |                                      |
|Category:Redirect-Class AfC articles  |6                                     |
|Category:Unprintworthy redirects      |1                                     |
Limit Reached

Select by name

To select articles which title contains Category:, use the SELECT operator and the WHERE function .

SELECT page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;

The query result is shown below:

+--------------------------------------------------------+--------------------------------------------------------+
|PAGE_TITLE                                              |TIMESTAMP                                               |
+--------------------------------------------------------+--------------------------------------------------------+
|Category:Redirects from long names                      |2023-06-01T07:21:57Z                                    |
|Category:Accepted AfC submissions                       |2023-06-01T07:21:57Z                                    |
|Category:AfC submissions by date/06 June 2023           |2023-06-01T07:21:57Z                                    |
|Category:Redirect-Class AfC articles                    |2023-06-01T07:21:58Z                                    |
|Category:Unprintworthy redirects                        |2023-06-01T07:21:58Z                                    |

Writе from ksqlDB to a Kafka topic

To write the latest selection to a new Kafka topic new_topic in the JSON format, you can create a new stream wikipedia_new:

CREATE STREAM wikipedia_new
   WITH ( kafka_topic='new_topic', value_format='json')
   AS SELECT
   page_title,
   timestamp
   FROM wikipedia_events
   WHERE page_title LIKE 'Category:%'
   EMIT CHANGES;

Stream is created:

 Message
----------------------------------------------
 Created query with ID CSAS_WIKIPEDIA_NEW_191
----------------------------------------------

In the ksql> line, it is also possible to run a command to read the messages written to the new_topic topic:

print new_topic;

The query result is shown below:

Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirects from long names","TIMEST
AMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Accepted AfC submissions","T
IMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:AfC submissions by date/06 June 2023","TIMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirect-Class AfC articles","TIMESTAMP":"2023-06-01T07:21:58Z"}, partition: 0
rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Unprintworthy redirects","TIMESTAMP":"2023-06-01T07:21:58Z "}, partition: 0

The topic contains messages in accordance with the selection made.

Found a mistake? Seleсt text and press Ctrl+Enter to report it