Интеграция ADQM и HDFS

Для интеграции с распределенной файловой системой HDFS (Hadoop Distributed File System) в ClickHouse реализованы табличный движок HDFS и специальные табличные функции, которые позволяют осуществлять чтение и запись данных HDFS из ClickHouse. Данная статья описывает особенности и способы использования этих инструментов на примере ADQM и ADH (Arenadata Hadoop), который предоставляет сервис HDFS.

Для интеграции с HDFS запросы из ADQM направляются к хосту кластера ADH c NameNode в состоянии Active (в приведенных ниже примерах замените <namenode_ip> в URI файлов HDFS на IP-адрес соответствующего хоста ADH). При подключении к NameNode в статусе Standby ADQM может вернуть сообщение об ошибке следующего содержания:

Exception: Unable to connect to HDFS: Operation category READ is not supported in state standby.

Табличный движок HDFS

Табличный движок HDFS позволяет управлять данными, хранящимися в файлах HDFS, из ADQM. Этот тип таблиц поддерживает многопоточное чтение и запись данных HDFS, но имеет ряд ограничений (репликация, индексы, операции ALTER и SELECT…​SAMPLE не поддерживаются).

Базовый синтаксис запроса для создания таблицы HDFS:

CREATE TABLE <table_name> (<column_name> <column_type>, ...) ENGINE = HDFS(<URI>, <format>);

Параметры табличного движка HDFS:

  • <URI> — полный URI файла в HDFS. Часть URI, содержащая путь к файлу в HDFS (имя каталога, имя файла), может содержать символы подстановки, чтобы получить данные из нескольких файлов — в этом случае таблица может быть использована только для чтения.

  • <format> — формат, в котором ADQM будет принимать при выполнении запросов SELECT и отдавать при выполнении запросов INSERT табличные данные (см. форматы в столбцах Input и Output таблицы Formats for Input and Output Data соответственно).

При экспорте данных из таблицы ADQM в файловую систему HDFS используйте параметры hdfs_create_new_file_on_insert и hdfs_truncate_on_insert, которые позволяют определить, как будут сохраняться данные в файлы HDFS — будут создаваться новые файлы или перезапиcываться существующие. По умолчанию обе опции выставлены в 0, и при вставке данных в таблицу ADQM возвращается следующее сообщение:

Exception: File with path <file_path> already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert,
if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert.

где <file_path> — путь к файлу в HDFS, на основе которого была создана таблица HDFS в ADQM.

В файле конфигурации ADQM можно указать дополнительные настройки для таблиц HDFS: глобальные (указываются в секции настроек <hdfs>) и настройки на уровне пользователя (указываются в секции hdfs_<user_name>, где <user_name> — имя пользователя). Список параметров, которые можно настроить для движка HDFS, приведен в соответствующем разделе статьи HDFS документации ClickHouse.

Пример

Создание таблицы HDFS

  1. В HDFS в каталоге adqm_data_sources создайте файл test_1.txt с тестовыми данными для последующей вставки в таблицу ADQM:

    "one",1
    "two",2
    "three",3

    URI файла для обращения из ADQM — hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt, где <namenode_ip> — IP-адрес хоста ADH с NameNode в состоянии Active.

  2. Запустите консольный клиент clickhouse-client и создайте таблицу на базе движка HDFS со структурой, соответствующей данным, которые будут импортироваться из HDFS:

    CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV');
  3. Запросите данные из таблицы:

    SELECT * FROM hdfs_table;
    ┌─name──┬─value─┐
    │ one   │     1 │
    │ two   │     2 │
    │ three │     3 │
    └───────┴───────┘

Вставка данных в таблицу HDFS

  1. Включите настройку hdfs_create_new_file_on_insert, чтобы при каждой вставке данных создавался новый файл в HDFS (например, если таблица была создана на основе файла file_name.txt, новым файлам названия будут назначаться по шаблону — file_name.1.txt, file_name.2.txt и так далее):

    SET hdfs_create_new_file_on_insert = 1;
  2. Выполните вставку данных, повторив два запроса INSERT INTO:

    INSERT INTO hdfs_table VALUES('four', 4);
    INSERT INTO hdfs_table VALUES('five', 5);
  3. Проверьте содержимое каталога adqm_data_sources в HDFS:

    $ hdfs dfs -ls /adqm_data_sources

    В каталоге создано два новых файла test_1.1.txt и test_1.2.txt:

    Found 3 items
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.2.txt
    -rwxrwxrwx   3 clickhouse hadoop         23 2023-10-25 13:25 /adqm_data_sources/test_1.txt

    Посмотрите содержимое этих файлов:

    $ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
    "four",4
    $ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
    "five",5

    При последующем чтении таблицы можно убедиться, что в выборку будут включены данные из всех файлов:

    SELECT * FROM hdfs_table;
    ┌─name──┬─value─┐
    │ one   │     1 │
    │ two   │     2 │
    │ three │     3 │
    └───────┴───────┘
    ┌─name─┬─value─┐
    │ five │     5 │
    └──────┴───────┘
    ┌─name─┬─value─┐
    │ four │     4 │
    └──────┴───────┘
  4. Активируйте опцию hdfs_truncate_on_insert и выполните еще одну вставку данных в таблицу — в этом случае последний файл с данными будет перезаписан:

    SET hdfs_truncate_on_insert = 1;
    INSERT INTO hdfs_table VALUES('six', 6);
  5. Проверьте содержимое каталога adqm_data_sources в HDFS:

    $ hdfs dfs -ls /adqm_data_sources

    Новый файл не был добавлен, но файл test_1.2.txt обновлен:

    Found 3 items
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:45 /adqm_data_sources/test_1.2.txt
    -rwxrwxrwx   3 clickhouse hadoop         23 2023-10-25 13:25 /adqm_data_sources/test_1.txt
    $ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
    "six",6

    Если настройка hdfs_truncate_on_insert включена, новые данные будут заменять текущее содержимое существующего файла в любом случае, независимо от значения hdfs_create_new_file_on_insert.

Табличные функции

ADQM также предоставляет две табличные функции для интеграции с HDFS:

  • hdfs — создает таблицу для чтения/записи данных в HDFS.

    Синтаксис функции:

    hdfs(<URI>, <format>, <structure>)

    где:

    • <URI> — URI файла в HDFS (в режиме использования функции для чтения можно указать путь к нескольким файлам с помощью символов подстановки);

    • <format> — формат, в котором функция будет принимать или отдавать данные (подробная информация о форматах для импорта и экспорта данных ADQM/ClickHouse представлена в статье Formats for Input and Output Data документации ClickHouse);

    • <structure> — структура таблицы в формате column_name1 data_type1, column_name2 data_type2, …​.

  • hdfsCluster — позволяет обрабатывать файлы HDFS параллельно из нескольких узлов в указанном кластере. На узле-инициаторе функция создает соединение со всеми узлами в кластере, заменяет символы * в пути к файлу HDFS и динамически отправляет каждый файл. На рабочем узле функция запрашивает у инициатора следующую задачу и обрабатывает ее. Это повторяется до тех пор, пока все задачи не будут завершены.

    Синтаксис функции:

    hdfs(<cluster_name>, <URI>, <format>, <structure>)

    где <cluster_name> — имя кластера, используемое для создания набора адресов и параметров подключения к удаленным и локальным серверам; остальные параметры аналогичны параметрам функции hdfs.

Пример

Чтение данных из файла HDFS

Выполните следующий запрос, чтобы получить в ADQM данные из файла HDFS test_1.txt с помощью функции hdfs:

SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');
┌─column1─┬─column2─┐
│ one     │       1 │
│ two     │       2 │
│ three   │       3 │
└─────────┴─────────┘

Запись данных в HDFS

Если настройка hdfs_truncate_on_insert включена, при вставке данных через функцию hdfs существующие данные в файле, на которую ссылается функция, будут заменяться новыми данными. В этом случае необходимо, чтобы у пользователя clickhouse в HDFS были права на изменение этого файла.

Например, выполните следующий запрос (значение hdfs_truncate_on_insert было установлено в 1 в приведенном выше примере для табличного движка HDFS):

INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_1', 100);

В HDFS проверьте, что содержимое файла test_1.txt обновилось:

$ hdfs dfs -cat /adqm_data_sources/test_1.txt
"hdfs_func_test_string_1",100

Если значение параметра hdfs_truncate_on_insert установлено в 0 и при этом активирована настройка hdfs_create_new_file_on_insert, при вставке данных в HDFS через функцию hdfs, которая ссылается на файл file_name.txt, будет создан (или перезаписан, если уже существует) файл с именем file_name.1.txt.

Например, выполните запрос на запись данных в файл test_1.txt:

SET hdfs_truncate_on_insert = 0, hdfs_create_new_file_on_insert = 1;
INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_2', 200);

В результате новые данные запишутся в файл test_1.1.txt:

$ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
"hdfs_func_test_string_2",200

Символы подстановки в пути к файлу HDFS

Если таблица ADQM будет принимать данные из нескольких файлов HDFS и использоваться только для чтения, путь к файлам HDFS в параметре URI табличного движка HDFS или табличной функции hdfs/hdfsCluster можно указать с использованием символов подстановки:

  • * — заменяет любое количество любых символов кроме разделителя пути /, включая отсутствие символов;

  • ? — заменяет любой одиночный символ;

  • {first_string,second_string,third_one}-- заменяет любую из строк 'first_string', 'second_string', 'third_one' (в этом шаблоне также можно использовать числа — например, {1,3,5});

  • {n..m} — заменяет любое число в диапазоне [n, m].

Символы подстановки могут содержаться в нескольких компонентах пути (например, в названии каталога и в названии файла). Обрабатываются только файлы, существующие в файловой системе, пути и названия которых полностью соответствуют шаблону. Список файлов определяется во время выполнения операции SELECT (не CREATE).

Пример

  1. Создайте несколько файлов со следующими URI в HDFS:

    • hdfs://<namenode_ip>:8020/first_dir/file_1.txt

    • hdfs://<namenode_ip>:8020/first_dir/file_2.txt

    • hdfs://<namenode_ip>:8020/second_dir/file_3.txt

    • hdfs://<namenode_ip>:8020/second_dir/file_4.txt

    Данные во всех файлах должны согласовываться с форматом и схемой, которые будут указываться в запросах при подключении из ADQM.

  2. С помощью функции hdfs импортируйте в ADQM данные из всех приведенных выше файлов одним из способов:

    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_{1..4}.txt', 'CSV', 'column1 String, column2 UInt32');
    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_?.txt', 'CSV', 'column1 String, column2 UInt32');

    Следующий запрос считывает данные из всех файлов в каталогах first_dir и second_dir:

    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/*', 'CSV', 'column1 String, column2 UInt32');
  3. Если список файлов содержит числовые интервалы с ведущими нулями в названиях файлов, можно использовать общий шаблон {0n..0m} или отдельный шаблон {n..m} (либо знак ?) для каждой цифры в названии файла. Например, следующие запросы создают таблицу с данными из файлов file_000.txt, file_001.txt, …​, file_999.txt:

    CREATE TABLE hdfs_table_1 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{001..999}.txt', 'CSV');
    CREATE TABLE hdfs_table_2 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{0..9}{0..9}{0..9}.txt', 'CSV');
    CREATE TABLE hdfs_table_3 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_???.txt', 'CSV');

Режим высокой доступности (HA) HDFS

Если в кластере ADH для HDFS включен режим высокой доступности (High Availability, HA), выполните настройку:

  1. Скопируйте файл /etc/hadoop/conf/hdfs-site.xml с хоста ADH на хост ADQM в папку /etc/clickhouse-server/.

  2. Добавьте следующую секцию в файл конфигурации ClickHouse /etc/clickhouse-server/config.xml:

    <hdfs>
        <libhdfs3_conf>/etc/clickhouse-server/hdfs-site.xml</libhdfs3_conf>
    </hdfs>
  3. Перезапустите сервер ClickHouse:

    $ sudo systemctl restart clickhouse-server

После этого при подключения к HDFS через табличный движок HDFS или табличные функции можно использовать значение тега dfs.nameservices в файле hdfs-site.xml вместо <namenode_ip>:8020 в URI файла HDFS. Например, если значение тега dfs.nameservices — adhcluster, то запрос на создание таблицы hdfs_table и вызов функции hdfs, приведенные в примерах выше, можно изменить следующим образом:

CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV');
SELECT * FROM hdfs('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней