KsqlDB usage example
The article describes an example of managing event data on a site (for example Wikipedia) using the ksqlDB service.
Write events to a Kafka topic
To write to Kafka the last 10 events from the site, you can use a Python script that:
-
Sets the configuration options for the Kafka producer.
-
Sets the configuration parameters to work with the Wikipedia API.
-
Creates a Kafka producer.
-
Gets the latest changes from the Wikipedia API (data about the last 10 edited articles).
-
Writes the events to a Kafka topic in the JSON format.
-
Closes the Kafka producer.
NOTE
To work with this Python script, you need first install the packages: |
To write events to a Kafka topic, you need to:
-
Create a script file:
$ sudo vim /tmp/events.py
-
Fill events.py with the script text below, replacing individual values:
-
bootstrap_servers
— hostname to connect to the Kafka server; -
topic_name
— name of the topic to write events.events.pyimport requests from kafka import KafkaProducer import json # Kafka producer configuration bootstrap_servers = 'sov-test-4.ru-central1.internal:9092' topic_name = 'wikipedia-topic' # Wikipedia API configuration api_url = 'https://en.wikipedia.org/w/api.php' params = { 'action': 'query', 'format': 'json', 'list': 'recentchanges', 'rcprop': 'user|title|timestamp', 'rclimit': '10' } # Create a Kafka producer producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda x: json.dumps(x).encode('utf-8')) # Fetch recent changes from Wikipedia API and produce messages to Kafka response = requests.get(api_url, params=params) data = response.json() recent_changes = data['query']['recentchanges'] for change in recent_changes: event = { 'user_id': change['user'], 'page_title': change['title'], 'event_type': 'edit', 'timestamp': change['timestamp'] } producer.send(topic_name, value=event) print("Produced event:", event) # Close the Kafka producer producer.close()
-
-
Give permission to execute the script:
$ sudo chmod +x /tmp/events.py
-
Run the script:
$ python /tmp/events.py
As a result of the correct execution of the script, events are displayed in the JSON format.
Example of recorded events from the site('Produced event:', {'page_title': u'User:190.203.61.245', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'}) ('Produced event:', {'page_title': u'User:178.221.183.209', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:48Z'}) ('Produced event:', {'page_title': u'Carlos Alcaraz career statistics', 'user_id': u'Dddenilson', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:179.6.164.242', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:45.230.251.198', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'User:143.0.67.78', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:47Z'}) ('Produced event:', {'page_title': u'Vera Farmiga on screen and stage', 'user_id': u'ChaitanyaJo', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'Personal health record', 'user_id': u'John of Reading', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'User:189.204.180.128', 'user_id': u'ST47ProxyBot', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'}) ('Produced event:', {'page_title': u'User:TechTwink', 'user_id': u'TechTwink', 'event_type': 'edit', 'timestamp': u'2023-05-31T15:17:46Z'})
To view data written to a Kafka topic, you can read data from a Kafka topic using the command line.
Start ksqlDB
After installing ksqlDB, the service is started using the command:
$ sudo ksql http://hostname:8088
where hostname
is the name of the host where the ksqlDB service is installed.
Server started:
=========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = Event Streaming Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2021 Confluent Inc. CLI v6.2.1, Server v6.2.1 located at http://sov-test-4.ru-central1.internal:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
The ksqlDB functions are invoked in the ksql>
line.
Create a stream from a Kafka topic
To manipulate data written to a Kafka topic using ksqlDB, you need to create a stream.
A stream is a partitioned, immutable, append-only collection that represents a set of facts about events or records.
The request for creating a wikipedia_events
stream for the Kafka topic wikipedia-topic
is as follows:
CREATE STREAM wikipedia_events (\
user_id VARCHAR,\
page_title VARCHAR,\
event_type VARCHAR,\
timestamp VARCHAR\
) WITH (\
kafka_topic='wikipedia-topic',\
value_format='json',\
key_format='none'\
);
where are defined:
-
names of the stream columns that match the keys of the object described in the topic and the SQL data types that will be placed in columns;
-
stream properties.
Stream is created:
Message ---------------- Stream created ----------------
Stream data management
With various queries you can retrieve data from the created stream.
NOTE
In order for a Kafka topic to be read from the beginning, and not from the last offset, you can set a parameter on the work line that determines from which offset you want to read topics:
Without this setting, to work out the examples below, after running the ksqlDB function, it is necessary to overwrite the data in the topic used in the thread (for example, re-execute the script described by above). |
View data
You can view the entire contents of the wikipedia_events
stream via a query using the SELECT operator without specifying selection conditions:
SELECT * FROM wikipedia_events EMIT CHANGES;
where EMIT CHANGE
indicates that any changes that occur should be emitted.
The query result is shown below:
+-------------------------+--------------------------------+---------------------------+------------------------+ |USER_ID |PAGE_TITLE |EVENT_TYPE |TIMESTAMP | +-------------------------+--------------------------------+---------------------------+------------------------+ |M. Armando |Jânio Quadros |edit |2023-05-29T10:56:21Z | |DepressedPer |Talk:Bring It Back (Mates of Sta|edit |2023-05-29T10:56:20Z | | |te album) | | | |193.137.135.5 |Víctor Muñoz |edit |2023-05-29T10:56:20Z | |Devokewater |Viceroy Research |edit |2023-05-29T10:56:16Z | |Browhatwhyamihere |MG 08 |edit |2023-05-29T10:56:16Z | |92.239.205.156 |Category:1946 establishments in |edit |2023-05-29T10:56:15Z | | |South Africa | | | |92.239.205.156 |80 Air Navigation School SAAF |edit |2023-05-29T10:56:15Z | |Ubaid ur Rehman Nisar |User:Ubaid ur Rehman Nisar |edit |2023-05-29T10:56:15Z | |Οἶδα |2023 Cannes Film Festival |edit |2023-05-29T10:56:14Z | |Super Dromaeosaurus |Bujoreni, Teleorman |edit |2023-05-29T10:56:14Z |
If you run the script described by above again for the same topic, you can see that new data is coming into the stream.
Count events
To display the number of revisions of articles from a selection recorded in a topic, you can use the operators SELECT and COUNT saving the results in a new edit_count
column.
For clarity, you can expand the number of records in the Kafka topic by changing the number of returned changes ('rclimit'
) in the script to 100 or more, and re-run the script.
The query to count the number of revisions of the last 10 articles in the wikipedia-events
stream and store the results in a new edit_count
column looks like this:
SELECT page_title, COUNT(*) AS edit_count
FROM wikipedia_events
GROUP BY page_title
EMIT CHANGES
LIMIT 10;
where the GROUP BY function is applied.
The query result is shown below:
+--------------------------------------+--------------------------------------+ |PAGE_TITLE |EDIT_COUNT | +--------------------------------------+--------------------------------------+ |Category:Redirects from long names |1 | |Science-fiction: The Gernsback Years :|1 | | a Complete Coverage of the Genre Maga| | |zines ... from 1926 Through 1936 | | |Ben Anderson (journalist) |1 | |User talk:175.142.65.166 |1 | |Wikipedia:Articles for creation/Redire|1 | |cts and categories | | |2023 North Indian Ocean cyclone season|1 | |Category:Accepted AfC submissions |6 | |Category:AfC submissions by date/06 Ju|6 | |ne 2023 | | |Category:Redirect-Class AfC articles |6 | |Category:Unprintworthy redirects |1 | Limit Reached
Select by name
SELECT page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;
The query result is shown below:
+--------------------------------------------------------+--------------------------------------------------------+ |PAGE_TITLE |TIMESTAMP | +--------------------------------------------------------+--------------------------------------------------------+ |Category:Redirects from long names |2023-06-01T07:21:57Z | |Category:Accepted AfC submissions |2023-06-01T07:21:57Z | |Category:AfC submissions by date/06 June 2023 |2023-06-01T07:21:57Z | |Category:Redirect-Class AfC articles |2023-06-01T07:21:58Z | |Category:Unprintworthy redirects |2023-06-01T07:21:58Z |
Writе from ksqlDB to a Kafka topic
To write the latest selection to a new Kafka topic new_topic
in the JSON format, you can create a new stream wikipedia_new
:
CREATE STREAM wikipedia_new
WITH ( kafka_topic='new_topic', value_format='json')
AS SELECT
page_title,
timestamp
FROM wikipedia_events
WHERE page_title LIKE 'Category:%'
EMIT CHANGES;
Stream is created:
Message ---------------------------------------------- Created query with ID CSAS_WIKIPEDIA_NEW_191 ----------------------------------------------
In the ksql>
line, it is also possible to run a command to read the messages written to the new_topic
topic:
print new_topic;
The query result is shown below:
Key format: ¯\_(ツ)_/¯ - no data processed Value format: JSON or KAFKA_STRING rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirects from long names","TIMEST AMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Accepted AfC submissions","T IMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:AfC submissions by date/06 June 2023","TIMESTAMP":"2023-06-01T07:21:57Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Redirect-Class AfC articles","TIMESTAMP":"2023-06-01T07:21:58Z"}, partition: 0 rowtime: 2023/06/01 07:21:59.522 Z, key: <null>, value: {"PAGE_TITLE":"Category:Unprintworthy redirects","TIMESTAMP":"2023-06-01T07:21:58Z "}, partition: 0
The topic contains messages in accordance with the selection made.