Integration between ADQM and Kafka
ClickHouse supports integration with Apache Kafka through a special table engine — Kafka. It allows you to:
-
subscribe to Kafka topics to load streaming data from Kafka to ClickHouse (it is the most common use case) or publish data from ClickHouse to Kafka;
-
create fault-tolerant storage;
-
process streams as they become available.
This article explains how to use the Kafka table engine on the example of ADQM and ADS (Arenadata Streaming) that provides the Kafka service.
Overview
The Kafka table engine allows ClickHouse to read data from a Kafka topic directly and track message streaming, but each message can be retrieved only once from a Kafka-engine table. When data is queried from a Kafka table, it is considered already consumed from the queue. In this case, an offset for a consumer is increased, and data cannot be re-read without resetting these offsets. Thus, it is not recommended to select data from a Kafka table directly (via the SELECT
query) — use a materialized view instead. This approach requires you to create the following tables on the ClickHouse side:
-
Table based on the Kafka engine to be a Kafka consumer that reads data flow.
-
Target table with the desired structure to be a persistent storage of data received from Kafka. Typically, a target table is implemented based on an engine from the MergeTree family.
-
Materialized view to collect data from the Kafka table in the background (it will receive new records in blocks when a trigger fires on inserting data into the Kafka table), convert it into the required format, and transfer it to the previously created target table.
Sometimes it is necessary to apply various transformations to data coming from Kafka — for example, to store raw and aggregated data. In this case, you can bind multiple materialized views to one Kafka table to store data with different levels of detail across multiple tables.
To write data from ClickHouse to Kafka, insert it directly into a Kafka-engine table.
Kafka table engine
Create a Kafka table
The basic syntax of a query that creates a Kafka table in ADQM is:
CREATE TABLE <table_name> (<column_name> <column_type> [ALIAS <expr>], ...)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '<host_name>:9092,...',
kafka_topic_list = '<topic_name>,...',
kafka_group_name = '<consumer_group_name>',
kafka_format = '<data_format>'[,]
[kafka_schema = '',]
[kafka_num_consumers = <N>,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = <N>,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
The Kafka table engine has the following required parameters. You can specify these parameters in parentheses when creating a table (see an example).
kafka_broker_list |
List of Kafka brokers separated by commas |
kafka_topic_list |
List of Kafka topics separated by commas |
kafka_group_name |
Group of Kafka consumers. Offsets to read messages are tracked for each group separately. If you want to avoid duplicate messages in your cluster, use the same consumer group name for all Kafka tables |
kafka_format |
Kafka message format. For this parameter, use the same format names as for the |
kafka_schema |
Use this parameter if the format requires a schema definition. For example, CapnProto requires the path to the schema file and the name of the root |
kafka_num_consumers |
Number of consumers per table. Default value is |
kafka_max_block_size |
Maximum batch size (in messages) for a poll. Default value is equal to a value of the max_insert_block_size parameter |
kafka_skip_broken_messages |
Maximum number of allowed schema-incompatible messages per block. If |
kafka_commit_every_batch |
Specifies whether to commit each consumed and handled batch instead of the whole block. Default value is |
kafka_client_id |
Client identifier. The parameter is empty by default |
kafka_poll_timeout_ms |
Timeout for a single poll from Kafka. Default value is equal to a value of the stream_poll_timeout_ms parameter |
kafka_poll_max_batch_size |
Maximum number of messages to be polled in a single Kafka poll. Default value is equal to a value of the max_block_size parameter |
kafka_flush_interval_ms |
Timeout for flushing data from Kafka. Default value is equal to a value of the stream_flush_interval_ms parameter |
kafka_thread_per_consumer |
Specifies whether to provide an independent thread for each consumer. When this option is enabled, each consumer flushes data independently, in parallel (otherwise — rows from several consumers are squashed to form one block). Default value is |
kafka_handle_error_mode |
Specifies how to handle errors for the Kafka table engine. Possible values:
|
kafka_commit_on_select |
Specifies whether to commit messages when executing the |
kafka_max_rows_per_message |
Maximum number of rows in one Kafka message for row-based formats. Default value is |
Extended configuration
In the ClickHouse configuration file (config.xml), you can specify additional settings that the Kafka table engine supports:
-
global settings in the
<kafka>
section; -
topic-level settings in the
<kafka_topic>
section inside the<kafka>
tag (specify a topic name via the<name>
parameter).
For example, the following parameter allows you to enable debug logging for the Kafka library (librdkafka):
<kafka>
<debug>all</debug>
</kafka>
See a list of possible configuration parameters in the Configuration properties article of the librdkafka library documentation. For ADQM, replace a dot in a parameter name with an underscore — for example, use <auto_offset_reset>latest</auto_offset_reset>
instead of auto.offset.reset = latest
.
Settings of the Kafka engine for ADQM are also available in the ADCM interface. On the configuration page of the ADQMDB service, activate the Kafka engine option and list required parameters of the Kafka engine (corresponding tags with values, without the <kafka>
tag) in the Kafka Properties field.
Click Save and execute the Reconfig and restart action for the ADQMDB service — after the action is completed, the <kafka>
section with the specified parameters will be added to the config.xml file.
Kerberos support
For integration with Kerberos-aware Kafka, set the security_protocol
parameter to sasl_plaintext
. It is enough if Kerberos ticket-granting ticket is obtained and cached by the operating system. ClickHouse can maintain Kerberos credentials using a keytab file. Use also the sasl_kerberos_service_name
, sasl_kerberos_keytab
, and sasl_kerberos_principal
parameters to set up Kerberos authentication.
Virtual columns
It can be useful to track metadata of Kafka messages being loaded into ClickHouse. For example, you may need to have coordinates of a consumed message or to know how many messages were received from a certain topic or partition. For this purpose, the Kafka engine provides the following virtual columns.
Column name | Data type | Description |
---|---|---|
_topic |
LowCardinality(String) |
Kafka topic |
_key |
String |
Message key |
_offset |
UInt64 |
Message offset |
_timestamp |
Nullable(DateTime) |
Message timestamp |
_timestamp_ms |
Nullable(DateTime64(3)) |
Message timestamp in milliseconds |
_partition |
UInt64 |
Partition of a Kafka topic |
_headers.name |
Array(String) |
Array of message header keys |
_headers.value |
Array(String) |
Array of message header values |
_error |
String |
Text of an exception thrown when the parsing of a message failed. The column is filled if the Kafka engine’s |
_raw_message |
String |
Raw message that could not be parsed successfully. The column is filled if the Kafka engine’s |
To get values of virtual columns in ADQM, add the corresponding columns to a target table and specify them in the SELECT
clause of a materialized view — see the example below. There is no need to create virtual columns in a Kafka table as they are available automatically.
Common operations
Stop and restart message consumption
To stop receiving messages from a topic, detach a Kafka-engine table:
DETACH TABLE <kafka_engine_table>;
Detaching a Kafka table does not affect an offset of the consumer group. To restart consumption and continue it from the previous offset, reattach the table:
ATTACH TABLE <kafka_engine_table>;
Modify a target table
If it is needed to change a target table, the following sequence of steps is recommended:
-
Detach the Kafka table (
DETACH TABLE
). -
Change the target table (
ALTER TABLE
). -
Delete the materialized view to avoid discrepancies between the modified target table and data from the view (
DROP VIEW
). -
Re-attach the Kafka table (
ATTACH TABLE
). -
Re-create the materialized view (
CREATE MATERIALIZED VIEW
).
Example
Create a Kafka topic
Connect to any ADS host where the Kafka service is installed. Go to the root directory with scripts for executing commands in the Kafka environment (.sh files) and follow the steps below.
-
Create the
topic-kafka-to-adqm
topic via the following command (replace<kafka_host>
with a name of an ADS host where the topic is created):$ bin/kafka-topics.sh --create --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
The result:
Created topic topic-kafka-to-adqm.
NOTEFor more information on creating and reading Kafka topics, see Quick start with Kafka in the ADS documentation.
-
Write test messages to the topic as follows.
Run the command that starts the message recording mode:
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
On the next line after entering the command, enter messages — each message on a new line (use
Enter
to move to a new line):>1,"one" >2,"two" >3,"three"
To exit the message recording mode, go to the next line after writing the last message and press
Ctrl+C
.
Create ADQM tables for integration with Kafka
On an ADQM host, run the clickhouse-client console client and follow the steps below.
-
Create a table based on the Kafka engine:
CREATE TABLE kafka_table (id Int32, name String) ENGINE = Kafka ('<kafka_host>:9092', 'topic-kafka-to-adqm', 'clickhouse', 'CSV');
-
Create a target table to store data from Kafka:
CREATE TABLE kafka_data (id Int32, name String) ENGINE = MergeTree() ORDER BY id;
-
Create a materialized view that will receive data from the Kafka table and put it into the target MergeTree table:
CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name FROM kafka_table;
Now the materialized view is connected to the Kafka-engine table — it starts reading Kafka data and inserting corresponding data rows into the target table. This process will continue indefinitely — all subsequent insertions of messages into the Kafka topic will be consumed.
Read data from Kafka
-
Verify that data from Kafka is inserted into the target table:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ └────┴───────┘
-
Insert more messages into the Kafka topic (on the ADS host):
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
>4,"four" >5,"five"
-
Check that new data is in the ADQM table:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ └────┴───────┘
Insert data from ADQM to Kafka
-
On the ADQM host, insert data into the Kafka-engine table:
INSERT INTO kafka_table VALUES (6, 'six');
-
On the ADS host, check that the corresponding message is sent to the Kafka topic:
$ bin/kafka-console-consumer.sh --topic topic-kafka-to-adqm --from-beginning --bootstrap-server <kafka_host>:9092
1,"one" 2,"two" 3,"three" 4,"four" 5,"five" 6,"six"
-
Ensure also that new data is written to the target ADQM table with Kafka data:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ │ 6 │ six │ └────┴───────┘
Change the target table
To include meta information about Kafka messages (for example, the topic name and message offset) into the target table, make the following changes:
-
Detach the Kafka table:
DETACH TABLE kafka_table;
-
Add the
topic
andoffset
columns to the target table:ALTER TABLE kafka_data ADD COLUMN topic String, ADD COLUMN offset UInt64;
-
Delete the materialized view:
DROP VIEW kafka_data_mv;
-
Re-attach the Kafka-engine table:
ATTACH TABLE kafka_table;
-
Re-create the materialized view:
CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name, _topic as topic, _offset as offset FROM kafka_table;
Now, for each newly consumed message, the topic
and offset
columns of the target ADQM table will contain a name of the Kafka topic and the message offset in the partition. To check this:
-
Insert a new message into the Kafka topic on the ADS host:
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
>7,"seven"
-
Query data from the
kafka_data
table in ADQM:SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ │ 6 │ six │ └────┴───────┘ ┌─id─┬─name──┬─topic───────────────┬─offset─┐ │ 7 │ seven │ topic-kafka-to-adqm │ 6 │ └────┴───────┴─────────────────────┴────────┘