Интеграция ADQM и Kafka

ClickHouse поддерживает возможность интеграции с Apache Kafka через специальный табличный движок — Kafka. Этот движок позволяет:

  • подписываться на топики Kafka, чтобы организовать потоковую загрузку данных из Kafka в ClickHouse (наиболее частый сценарий использования интеграции) или публиковать данные из ClickHouse в Kafka;

  • создавать отказоустойчивые хранилища;

  • обрабатывать потоки по мере их появления.

В данной статье описываются особенности работы с табличным движком Kafka на примере ADQM и ADS (Arenadata Streaming), который предоставляет сервис Kafka.

Обзор

Движок Kafka позволяет ClickHouse читать данные напрямую из топика Kafka и отслеживать поток сообщений, но каждое сообщение из таблицы Kafka можно получить только один раз. Как только данные запрашиваются из таблицы Kafka, они считаются уже полученными (consumed) из очереди и перед возвратом результата увеличивается смещение для потребителя (offset). Данные невозможно повторно считать без сброса этих смещений, поэтому не следует делать выборку данных из таблицы Kafka напрямую (через запрос SELECT) — вместо этого рекомендуется использовать материализованное представление. Такой подход предполагает создание следующих таблиц на стороне ClickHouse:

  • Таблица на базе движка Kafka — потребитель Kafka, который подписывается на топик и читает поток данных.

  • Целевая таблица с нужной структурой для постоянного хранения данных, полученных из Kafka. Обычно это таблица на базе движка семейства MergeTree.

  • Материализованное представление, которое в фоновом режиме собирает данные из таблицы Kafka (получает новые данные блоками при срабатывании триггера на вставку данных в таблицу Kafka), преобразует их в необходимый формат и помещает в заранее созданную целевую таблицу.

    Иногда необходимо применять различные преобразования к данным, поступающим из Kafka — например, для хранения необработанных и агрегированных данных. В этом случае к одной таблице Kafka можно привязать несколько материализованных представлений, чтобы сохранять данные с разным уровнем детализации в несколько таблиц.

Чтобы записывать данные из ClickHouse в Kafka, можно вставлять их напрямую в таблицу на базе движка Kafka.

Табличный движок Kafka

Создание таблицы Kafka

Базовый синтаксис запроса для создания таблицы Kafka в ADQM:

CREATE TABLE <table_name> (<column_name> <column_type> [ALIAS <expr>], ...)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = '<host_name>:9092,...',
    kafka_topic_list = '<topic_name>,...',
    kafka_group_name = '<consumer_group_name>',
    kafka_format = '<data_format>'[,]
    [kafka_schema = '',]
    [kafka_num_consumers = <N>,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = <N>,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

Табличный движок Kafka имеет следующие обязательные параметры. Эти параметры можно указывать в круглых скобках при создании таблицы (см. пример).

kafka_broker_list

Список брокеров (разделяются запятыми)

kafka_topic_list

Список топиков Kafka (разделяются запятыми)

kafka_group_name

Группа потребителей Kafka. Смещения для чтения отслеживаются для каждой группы отдельно. Если нужно, чтобы сообщения в кластере не дублировались, используйте одно имя группы потребителей для всех таблиц Kafka

kafka_format

Формат сообщений Kafka. Для определения этого параметра используются такие же названия форматов, как для выражения FORMAT в запросах INSERT и SELECT — подробнее в статье документации ClickHouse Formats for Input and Output Data

Опциональные параметры

kafka_schema

Параметр, который необходимо использовать, если формат требует определения схемы. Например, CapnProto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message

kafka_num_consumers

Количество потребителей (consumer) на таблицу. По умолчанию: 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике (так как на одну партицию может быть назначено не более одного потребителя) и количество ядер на сервере, где развернут ClickHouse

kafka_max_block_size

Максимальный размер блока (в сообщениях) для poll. Значение по умолчанию равно значению параметра max_insert_block_size

kafka_skip_broken_messages

Максимальное количество допустимых некорректных (несовместимых со схемой) сообщений в блоке. Если kafka_skip_broken_messages = N, то движок пропускает N сообщений Kafka, которые не получилось обработать (одно сообщение соответствует одной строке данных). Значение по умолчанию: 0

kafka_commit_every_batch

Включает или отключает режим коммита (commit) каждого принятого и обработанного пакета по отдельности вместо одного коммита после записи целого блока. Значение по умолчанию: 0

kafka_client_id

Идентификатор клиента. По умолчанию не указывается

kafka_poll_timeout_ms

Тайм-аут для одного опроса (poll) Kafka. Значение по умолчанию равно значению параметра stream_poll_timeout_ms

kafka_poll_max_batch_size

Максимальное количество сообщений, опрашиваемых в одном вызове опроса (poll) Kafka. Значение по умолчанию равно значению параметра max_block_size

kafka_flush_interval_ms

Тайм-аут для сброса данных из Kafka. Значение по умолчанию равно значению параметра stream_flush_interval_ms

kafka_thread_per_consumer

Включает или отключает предоставление отдельного потока каждому потребителю. При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки данных от нескольких потребителей собираются в один блок. Значение по умолчанию: 0

kafka_handle_error_mode

Способ обработки ошибок для движка Kafka. Возможные значения:

  • default — выдается исключение, если сообщение не удалось обработать (parse);

  • stream — текст ошибки и сообщение, которое не удалось обработать, будут сохранены в виртуальных столбцах _error и _raw_message

kafka_commit_on_select

Включает или отключает коммит (commit) сообщений при выполнении запроса SELECT. Значение по умолчанию: false

kafka_max_rows_per_message

Максимальное количество строк в одном сообщении Kafka для форматов row-based. Значение по умолчанию: 1

Расширенная настройка

В файле конфигурации ClickHouse можно указать дополнительные настройки для таблиц Kafka:

  • глобальные настройки в секции <kafka>;

  • настройки на уровне топика в секции <kafka_topic> внутри тега <kafka> (имя топика указывается через параметр <name>).

Например, ведение журнала отладки (debug) для библиотеки Kafka (librdkafka) активируется с помощью следующего параметра в файле config.xml:

<kafka>
   <debug>all</debug>
</kafka>

Список всех возможных параметров конфигурации можно посмотреть в статье Configuration properties документации библиотеки librdkafka. В ADQM используйте нижнее подчеркивание вместо точки в названии параметра — например, <auto_offset_reset>latest</auto_offset_reset> вместо auto.offset.reset = latest.

Настройки для таблиц Kafka в ADQM можно указать через интерфейс ADCM. Для этого на странице конфигурации сервиса ADQMDB активируйте опцию Kafka engine и в поле Kafka Properties перечислите нужные параметры движка Kafka (соответствующие теги со значениями, без тега <kafka>).

Дополнительные настройки движка Kafka
Дополнительные настройки движка Kafka

Нажмите Save и выполните действие Reconfig and restart для сервиса ADQMDB — после того, как выполнение действия завершится, секция <kafka> с указанными параметрами будет добавлена в файл config.xml.

Поддержка Kerberos

Для работы с Kafka с поддержкой Kerberos назначьте параметру security_protocol значение sasl_plaintext. Этого достаточно, если тикет выдачи тикетов Kerberos (ticket-granting ticket) получен и кешируется средствами операционной системы. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Используйте также параметры sasl_kerberos_service_name, sasl_kerberos_keytab и sasl_kerberos_principal для настройки аутентификации через Kerberos.

Виртуальные столбцы

Иногда может быть полезно иметь метаданные сообщений Kafka при их загрузке в ClickHouse. Например, может потребоваться знать координаты потребленного сообщения в топике или сколько сообщений получено из определенного топика или партиции. Для этой цели движок Kafka предоставляет следующие виртуальные столбцы.

Название столбца Тип данных Описание

_topic

LowCardinality(String)

Топик Kafka

_key

String

Ключ сообщения

_offset

UInt64

Смещение (offset) сообщения

_timestamp

Nullable(DateTime)

Временная метка сообщения

_timestamp_ms

Nullable(DateTime64(3))

Временная метка сообщения в миллисекундах

_partition

UInt64

Партиция топика Kafka

_headers.name

Array(String)

Массив ключей заголовков сообщений

_headers.value

Array(String)

Массив значений заголовков сообщений

_error

String

Текст исключения, которое было выдано, если сообщение не удалось обработать. Столбец заполняется, если параметр kafka_handle_error_mode установлен в stream и произошла ошибка во время синтаксического анализа сообщения (если сообщение было проанализировано успешно, столбец остается пустым)

_raw_message

String

Cообщение, которое не удалось успешно обработать. Столбец заполняется, если параметр kafka_handle_error_mode установлен в stream и произошла ошибка во время синтаксического анализа сообщения (если сообщение было проанализировано успешно, столбец остается пустым)

Чтобы получать значения виртуальных столбцов, необходимо добавить соответствующие столбцы в структуру целевой таблицы и указать их в выражении SELECT материализованного представления — см. пример ниже. Виртуальные столбцы в таблице Kafka создавать не нужно, так как они доступны автоматически.

Общие операции

Остановка и перезапуск потребления сообщений

Чтобы остановить получение сообщений из топика, отсоедините таблицу Kafka:

DETACH TABLE <kafka_engine_table>;

Отсоединение таблицы Kafka не влияет на смещение для группы потребителей. Чтобы возобновить чтение данных и продолжить с предыдущего смещения, снова присоедините таблицу:

ATTACH TABLE <kafka_engine_table>;

Изменение целевой таблицы

Если необходимо изменить целевую таблицу, рекомендуется следующая последовательность действий:

  1. Отключите таблицу Kafka (DETACH TABLE).

  2. Измените целевую таблицу (ALTER TABLE).

  3. Удалите материализованное представление, чтобы избежать несоответствия между измененной целевой таблицей и данными из представления (DROP VIEW).

  4. Снова присоедините таблицу Kafka (ATTACH TABLE).

  5. Пересоздайте материализованное представление (CREATE MATERIALIZED VIEW).

Изменение таблицы Kafka

Чтобы изменить настройки таблицы Kafka, удалите ее и заново создайте с новыми настройками. При этом материализованное представление изменять не нужно — потребление сообщений возобновится после пересоздания таблицы Kafka.

Пример

Создание топика Kafka в ADS

Подключитесь к любому хосту кластера ADS, на котором установлен сервис Kafka. Перейдите в корневую директорию со скриптами для выполнения команд в среде Kafka (файлы c расширением .sh) и следуйте инструкциям ниже.

  1. Создайте топик topic-kafka-to-adqm с помощью следующей команды (замените <kafka_host> на имя хоста ADS, где создается топик):

    $ bin/kafka-topics.sh --create --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092

    Результат:

    Created topic topic-kafka-to-adqm.
    ПРИМЕЧАНИЕ

    Дополнительную информацию о создании и чтении топиков в Kafka можно получить в статье документации ADS Начало работы c Kafka.

  2. Запишите тестовые сообщения в топик следующим образом.

    Выполните команду, которая запускает режим записи сообщений:

    $ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092

    На следующей строке после ввода команды введите сообщения, каждое на новой строке (используйте Enter для перехода на новую строку):

    >1,"one"
    >2,"two"
    >3,"three"

    Чтобы выйти из режима записи сообщений, перейдите на следующую строку после записи последнего сообщения и нажмите Ctrl+C.

Создание таблиц в ADQM для интеграции с Kafka

На хосте ADQM запустите консольный клиент clickhouse-client и выполните следующие шаги.

  1. Создайте таблицу на базе движка Kafka:

    CREATE TABLE kafka_table (id Int32, name String) ENGINE = Kafka ('<kafka_host>:9092', 'topic-kafka-to-adqm', 'clickhouse', 'CSV');
  2. Создайте целевую таблицу, в которую будут сохраняться данные:

    CREATE TABLE kafka_data (id Int32, name String) ENGINE = MergeTree() ORDER BY id;
  3. Создайте материализованное представление, которое будет получать данные из таблицы Kafka и помещать их в целевую таблицу MergeTree:

    CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name FROM kafka_table;

    В момент создания материализованное представление подключается к таблице Kafka, начинает чтение данных и вставку строк в целевую таблицу. Этот процесс продолжаться постоянно — все последующие вставки сообщений в топик Kafka будут считываться.

Чтение данных из Kafka

  1. Убедитесь, что данные Kafka вставлены в целевую таблицу:

    SELECT * FROM kafka_data;
    ┌─id─┬─name──┐
    │  1 │ one   │
    │  2 │ two   │
    │  3 │ three │
    └────┴───────┘
  2. Вставьте новые сообщения в топик Kafka (на хосте ADS):

    $ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
    >4,"four"
    >5,"five"
  3. Проверьте, что новые данные попали в таблицу ADQM:

    SELECT * FROM kafka_data;
    ┌─id─┬─name──┐
    │  1 │ one   │
    │  2 │ two   │
    │  3 │ three │
    │  4 │ four  │
    │  5 │ five  │
    └────┴───────┘

Вставка данных из ADQM в Kafka

  1. На хосте ADQM вставьте данные в таблицу на базе движка Kafka:

    INSERT INTO kafka_table VALUES (6, 'six');
  2. На хосте ADS проверьте, что соответствующее сообщение записалось в топик Kafka:

    $ bin/kafka-console-consumer.sh --topic topic-kafka-to-adqm --from-beginning --bootstrap-server <kafka_host>:9092
    1,"one"
    2,"two"
    3,"three"
    4,"four"
    5,"five"
    6,"six"
  3. Убедитесь также, что новые данные записались в целевую таблицу ADQM с данными Kafka:

    SELECT * FROM kafka_data;
    ┌─id─┬─name──┐
    │  1 │ one   │
    │  2 │ two   │
    │  3 │ three │
    │  4 │ four  │
    │  5 │ five  │
    │  6 │ six   │
    └────┴───────┘

Изменение целевой таблицы

Чтобы целевая таблица включала метаинформацию о сохраняемых в нее сообщениях Kafka (например, название топика и смещение сообщения), сделайте следующие изменения.

  1. Отключите таблицу Kafka:

    DETACH TABLE kafka_table;
  2. Добавьте в целевую таблицу столбцы topic и offset:

    ALTER TABLE kafka_data
        ADD COLUMN topic String,
        ADD COLUMN offset UInt64;
  3. Удалите материализованное представление:

    DROP VIEW kafka_data_mv;
  4. Снова подключите таблицу Kafka:

    ATTACH TABLE kafka_table;
  5. Пересоздайте материализованное представление:

    CREATE MATERIALIZED VIEW kafka_data_mv TO kafka_data AS SELECT id, name, _topic as topic, _offset as offset FROM kafka_table;

Теперь для новых сообщений, поступающих в целевую таблицу ADQM, в столбцах topic и offset будет показываться название топика Kafka и смещение сообщения в партиции. Чтобы это проверить:

  1. Вставьте новое сообщение в топик Kafka на хосте ADS:

    $ bin/kafka-console-producer.sh --topic topic-kafka-to-adqm --bootstrap-server <kafka_host>:9092
    >7,"seven"
  2. Запросите данные из таблицы kafka_data в ADQM:

    SELECT * FROM kafka_data;
    ┌─id─┬─name──┐
    │  1 │ one   │
    │  2 │ two   │
    │  3 │ three │
    │  4 │ four  │
    │  5 │ five  │
    │  6 │ six   │
    └────┴───────┘
    ┌─id─┬─name──┬─topic───────────────┬─offset─┐
    │  7 │ seven │ topic-kafka-to-adqm │      6 │
    └────┴───────┴─────────────────────┴────────┘
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней