Интеграция ADQM и Kafka
ClickHouse поддерживает возможность интеграции с Apache Kafka через специальный табличный движок — Kafka. Этот движок позволяет:
-
подписываться на топики Kafka, чтобы организовать потоковую загрузку данных из Kafka в ClickHouse (наиболее частый сценарий использования интеграции) или публиковать данные из ClickHouse в Kafka;
-
создавать отказоустойчивые хранилища;
-
обрабатывать потоки по мере их появления.
В данной статье описываются особенности работы с табличным движком Kafka на примере ADQM и ADS (Arenadata Streaming), который предоставляет сервис Kafka.
Обзор
Движок Kafka позволяет ClickHouse читать данные напрямую из топика Kafka и отслеживать поток сообщений, но каждое сообщение из таблицы Kafka можно получить только один раз. Как только данные запрашиваются из таблицы Kafka, они считаются уже полученными (consumed) из очереди и перед возвратом результата увеличивается смещение для потребителя (offset). Данные невозможно повторно считать без сброса этих смещений, поэтому не следует делать выборку данных из таблицы Kafka напрямую (через запрос SELECT
) — вместо этого рекомендуется использовать материализованное представление. Такой подход предполагает создание следующих таблиц на стороне ClickHouse:
-
Таблица на базе движка Kafka — потребитель Kafka, который подписывается на топик и читает поток данных.
-
Целевая таблица с нужной структурой для постоянного хранения данных, полученных из Kafka. Обычно это таблица на базе движка семейства MergeTree.
-
Материализованное представление, которое в фоновом режиме собирает данные из таблицы Kafka (получает новые данные блоками при срабатывании триггера на вставку данных в таблицу Kafka), преобразует их в необходимый формат и помещает в заранее созданную целевую таблицу.
Иногда необходимо применять различные преобразования к данным, поступающим из Kafka — например, для хранения необработанных и агрегированных данных. В этом случае к одной таблице Kafka можно привязать несколько материализованных представлений, чтобы сохранять данные с разным уровнем детализации в несколько таблиц.
Чтобы записывать данные из ClickHouse в Kafka, можно вставлять их напрямую в таблицу на базе движка Kafka.
Табличный движок Kafka
Создание таблицы Kafka
Базовый синтаксис запроса для создания таблицы Kafka в ADQM:
CREATE TABLE <table_name> (<column_name> <column_type> [ALIAS <expr>], ...)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '<host_name>:9092,...',
kafka_topic_list = '<topic_name>,...',
kafka_group_name = '<consumer_group_name>',
kafka_format = '<data_format>'[,]
[kafka_schema = '',]
[kafka_num_consumers = <N>,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = <N>,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
Табличный движок Kafka имеет следующие обязательные параметры. Эти параметры можно указывать в круглых скобках при создании таблицы (см. пример).
kafka_broker_list |
Список брокеров (разделяются запятыми) |
kafka_topic_list |
Список топиков Kafka (разделяются запятыми) |
kafka_group_name |
Группа потребителей Kafka. Смещения для чтения отслеживаются для каждой группы отдельно. Если нужно, чтобы сообщения в кластере не дублировались, используйте одно имя группы потребителей для всех таблиц Kafka |
kafka_format |
Формат сообщений Kafka. Для определения этого параметра используются такие же названия форматов, как для выражения |
kafka_schema |
Параметр, который необходимо использовать, если формат требует определения схемы. Например, CapnProto требует путь к файлу со схемой и название корневого объекта |
kafka_num_consumers |
Количество потребителей (consumer) на таблицу. По умолчанию: |
kafka_max_block_size |
Максимальный размер блока (в сообщениях) для poll. Значение по умолчанию равно значению параметра max_insert_block_size |
kafka_skip_broken_messages |
Максимальное количество допустимых некорректных (несовместимых со схемой) сообщений в блоке. Если |
kafka_commit_every_batch |
Включает или отключает режим коммита (commit) каждого принятого и обработанного пакета по отдельности вместо одного коммита после записи целого блока. Значение по умолчанию: |
kafka_client_id |
Идентификатор клиента. По умолчанию не указывается |
kafka_poll_timeout_ms |
Тайм-аут для одного опроса (poll) Kafka. Значение по умолчанию равно значению параметра stream_poll_timeout_ms |
kafka_poll_max_batch_size |
Максимальное количество сообщений, опрашиваемых в одном вызове опроса (poll) Kafka. Значение по умолчанию равно значению параметра max_block_size |
kafka_flush_interval_ms |
Тайм-аут для сброса данных из Kafka. Значение по умолчанию равно значению параметра stream_flush_interval_ms |
kafka_thread_per_consumer |
Включает или отключает предоставление отдельного потока каждому потребителю. При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки данных от нескольких потребителей собираются в один блок. Значение по умолчанию: |
kafka_handle_error_mode |
Способ обработки ошибок для движка Kafka. Возможные значения:
|
kafka_commit_on_select |
Включает или отключает коммит (commit) сообщений при выполнении запроса |
kafka_max_rows_per_message |
Максимальное количество строк в одном сообщении Kafka для форматов row-based. Значение по умолчанию: |
Расширенная настройка
В файле конфигурации ClickHouse можно указать дополнительные настройки для таблиц Kafka:
-
глобальные настройки в секции
<kafka>
; -
настройки на уровне топика в секции
<kafka_topic>
внутри тега<kafka>
(имя топика указывается через параметр<name>
).
Например, ведение журнала отладки (debug) для библиотеки Kafka (librdkafka) активируется с помощью следующего параметра в файле config.xml:
<kafka>
<debug>all</debug>
</kafka>
Список всех возможных параметров конфигурации можно посмотреть в статье Configuration properties документации библиотеки librdkafka. В ADQM используйте нижнее подчеркивание вместо точки в названии параметра — например, <auto_offset_reset>latest</auto_offset_reset>
вместо auto.offset.reset = latest
.
Настройки для таблиц Kafka в ADQM можно указать через интерфейс ADCM. Для этого на странице конфигурации сервиса ADQMDB активируйте опцию Kafka engine и в поле Kafka Properties перечислите нужные параметры движка Kafka (соответствующие теги со значениями, без тега <kafka>
).
Нажмите Save и выполните действие Reconfig and restart для сервиса ADQMDB — после того, как выполнение действия завершится, секция <kafka>
с указанными параметрами будет добавлена в файл config.xml.
Поддержка Kerberos
Для работы с Kafka с поддержкой Kerberos назначьте параметру security_protocol
значение sasl_plaintext
. Этого достаточно, если тикет выдачи тикетов Kerberos (ticket-granting ticket) получен и кешируется средствами операционной системы. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Используйте также параметры sasl_kerberos_service_name
, sasl_kerberos_keytab
и sasl_kerberos_principal
для настройки аутентификации через Kerberos.
Виртуальные столбцы
Иногда может быть полезно иметь метаданные сообщений Kafka при их загрузке в ClickHouse. Например, может потребоваться знать координаты потребленного сообщения в топике или сколько сообщений получено из определенного топика или партиции. Для этой цели движок Kafka предоставляет следующие виртуальные столбцы.
Название столбца | Тип данных | Описание |
---|---|---|
_topic |
LowCardinality(String) |
Топик Kafka |
_key |
String |
Ключ сообщения |
_offset |
UInt64 |
Смещение (offset) сообщения |
_timestamp |
Nullable(DateTime) |
Временная метка сообщения |
_timestamp_ms |
Nullable(DateTime64(3)) |
Временная метка сообщения в миллисекундах |
_partition |
UInt64 |
Партиция топика Kafka |
_headers.name |
Array(String) |
Массив ключей заголовков сообщений |
_headers.value |
Array(String) |
Массив значений заголовков сообщений |
_error |
String |
Текст исключения, которое было выдано, если сообщение не удалось обработать. Столбец заполняется, если параметр |
_raw_message |
String |
Cообщение, которое не удалось успешно обработать. Столбец заполняется, если параметр |
Чтобы получать значения виртуальных столбцов, необходимо добавить соответствующие столбцы в структуру целевой таблицы и указать их в выражении SELECT
материализованного представления — см. пример ниже. Виртуальные столбцы в таблице Kafka создавать не нужно, так как они доступны автоматически.
Общие операции
Остановка и перезапуск потребления сообщений
Чтобы остановить получение сообщений из топика, отсоедините таблицу Kafka:
DETACH TABLE <kafka_engine_table>;
Отсоединение таблицы Kafka не влияет на смещение для группы потребителей. Чтобы возобновить чтение данных и продолжить с предыдущего смещения, снова присоедините таблицу:
ATTACH TABLE <kafka_engine_table>;
Изменение целевой таблицы
Если необходимо изменить целевую таблицу, рекомендуется следующая последовательность действий:
-
Отключите таблицу Kafka (
DETACH TABLE
). -
Измените целевую таблицу (
ALTER TABLE
). -
Удалите материализованное представление, чтобы избежать несоответствия между измененной целевой таблицей и данными из представления (
DROP VIEW
). -
Снова присоедините таблицу Kafka (
ATTACH TABLE
). -
Пересоздайте материализованное представление (
CREATE MATERIALIZED VIEW
).
Пример
Создание топика Kafka в ADS
Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka. Перейдите в корневую директорию со скриптами для выполнения команд в среде Kafka (файлы c расширением .sh) и следуйте инструкциям ниже.
-
Создайте топик
topic-kafka-to-adqm
с помощью следующей команды (замените<kafka_host>
на имя хоста ADS, где создается топик):$ bin/kafka-topics.sh --create --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
Результат:
Created topic topic-kafka-to-adqm.
ПРИМЕЧАНИЕДополнительную информацию о создании и чтении топиков в Kafka можно получить в статье документации ADS Начало работы c Kafka.
-
Запишите тестовые сообщения в топик следующим образом.
Выполните команду, которая запускает режим записи сообщений:
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
На следующей строке после ввода команды введите сообщения, каждое на новой строке (используйте
Enter
для перехода на новую строку):>1,"one" >2,"two" >3,"three"
Чтобы выйти из режима записи сообщений, перейдите на следующую строку после записи последнего сообщения и нажмите
Ctrl+C
.
Создание таблиц в ADQM для интеграции с Kafka
На хосте ADQM запустите консольный клиент clickhouse-client и выполните следующие шаги.
-
Создайте таблицу на базе движка Kafka:
CREATE TABLE kafka_table (id Int32, name String) ENGINE = Kafka ('<kafka_host>:9092', 'topic-kafka-to-adqm', 'clickhouse', 'CSV');
-
Создайте целевую таблицу, в которую будут сохраняться данные:
CREATE TABLE kafka_data (id Int32, name String) ENGINE = MergeTree() ORDER BY id;
-
Создайте материализованное представление, которое будет получать данные из таблицы Kafka и помещать их в целевую таблицу MergeTree:
CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name FROM kafka_table;
В момент создания материализованное представление подключается к таблице Kafka, начинает чтение данных и вставку строк в целевую таблицу. Этот процесс продолжаться постоянно — все последующие вставки сообщений в топик Kafka будут считываться.
Чтение данных из Kafka
-
Убедитесь, что данные Kafka вставлены в целевую таблицу:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ └────┴───────┘
-
Вставьте новые сообщения в топик Kafka (на хосте ADS):
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
>4,"four" >5,"five"
-
Проверьте, что новые данные попали в таблицу ADQM:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ └────┴───────┘
Вставка данных из ADQM в Kafka
-
На хосте ADQM вставьте данные в таблицу на базе движка Kafka:
INSERT INTO kafka_table VALUES (6, 'six');
-
На хосте ADS проверьте, что соответствующее сообщение записалось в топик Kafka:
$ bin/kafka-console-consumer.sh --topic topic-kafka-to-adqm --from-beginning --bootstrap-server <kafka_host>:9092
1,"one" 2,"two" 3,"three" 4,"four" 5,"five" 6,"six"
-
Убедитесь также, что новые данные записались в целевую таблицу ADQM с данными Kafka:
SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ │ 6 │ six │ └────┴───────┘
Изменение целевой таблицы
Чтобы целевая таблица включала метаинформацию о сохраняемых в нее сообщениях Kafka (например, название топика и смещение сообщения), сделайте следующие изменения.
-
Отключите таблицу Kafka:
DETACH TABLE kafka_table;
-
Добавьте в целевую таблицу столбцы
topic
иoffset
:ALTER TABLE kafka_data ADD COLUMN topic String, ADD COLUMN offset UInt64;
-
Удалите материализованное представление:
DROP VIEW kafka_data_mv;
-
Снова подключите таблицу Kafka:
ATTACH TABLE kafka_table;
-
Пересоздайте материализованное представление:
CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name, _topic as topic, _offset as offset FROM kafka_table;
Теперь для новых сообщений, поступающих в целевую таблицу ADQM, в столбцах topic
и offset
будет показываться название топика Kafka и смещение сообщения в партиции. Чтобы это проверить:
-
Вставьте новое сообщение в топик Kafka на хосте ADS:
$ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
>7,"seven"
-
Запросите данные из таблицы
kafka_data
в ADQM:SELECT * FROM kafka_data;
┌─id─┬─name──┐ │ 1 │ one │ │ 2 │ two │ │ 3 │ three │ │ 4 │ four │ │ 5 │ five │ │ 6 │ six │ └────┴───────┘ ┌─id─┬─name──┬─topic───────────────┬─offset─┐ │ 7 │ seven │ topic-kafka-to-adqm │ 6 │ └────┴───────┴─────────────────────┴────────┘