Интеграция ADQM и HDFS
Для интеграции с распределенной файловой системой HDFS (Hadoop Distributed File System) в ClickHouse реализованы табличный движок HDFS и специальные табличные функции, которые позволяют осуществлять чтение и запись данных HDFS из ClickHouse. Данная статья описывает особенности и способы использования этих инструментов на примере ADQM и ADH (Arenadata Hadoop), который предоставляет сервис HDFS.
Для интеграции с HDFS запросы из ADQM направляются к хосту кластера ADH c NameNode в состоянии Active (в приведенных ниже примерах замените <namenode_ip> в URI файлов HDFS на IP-адрес соответствующего хоста ADH). При подключении к NameNode в статусе Standby ADQM может вернуть сообщение об ошибке следующего содержания:
Exception: Unable to connect to HDFS: Operation category READ is not supported in state standby.
Табличный движок HDFS
Табличный движок HDFS позволяет управлять данными, хранящимися в файлах HDFS, из ADQM. Этот тип таблиц поддерживает многопоточное чтение и запись данных HDFS, но имеет ряд ограничений (репликация, индексы, операции ALTER
и SELECT…SAMPLE
не поддерживаются).
Базовый синтаксис запроса для создания таблицы HDFS:
CREATE TABLE <table_name> (<column_name> <column_type>, ...) ENGINE = HDFS(<URI>, <format>);
Параметры табличного движка HDFS:
-
<URI>
— полный URI файла в HDFS. Часть URI, содержащая путь к файлу в HDFS (имя каталога, имя файла), может содержать символы подстановки, чтобы получить данные из нескольких файлов — в этом случае таблица может быть использована только для чтения. -
<format>
— формат, в котором ADQM будет принимать при выполнении запросовSELECT
и отдавать при выполнении запросовINSERT
табличные данные (см. форматы в столбцах Input и Output таблицы Formats for Input and Output Data соответственно).
При экспорте данных из таблицы ADQM в файловую систему HDFS используйте параметры hdfs_create_new_file_on_insert
и hdfs_truncate_on_insert
, которые позволяют определить, как будут сохраняться данные в файлы HDFS — будут создаваться новые файлы или перезапиcываться существующие. По умолчанию обе опции выставлены в 0
, и при вставке данных в таблицу ADQM возвращается следующее сообщение:
Exception: File with path <file_path> already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert.
где <file_path>
— путь к файлу в HDFS, на основе которого была создана таблица HDFS в ADQM.
В файле конфигурации ADQM можно указать дополнительные настройки для таблиц HDFS: глобальные (указываются в секции настроек <hdfs>
) и настройки на уровне пользователя (указываются в секции hdfs_<user_name>
, где <user_name>
— имя пользователя). Список параметров, которые можно настроить для движка HDFS, приведен в соответствующем разделе статьи HDFS документации ClickHouse.
Пример
Создание таблицы HDFS
-
В HDFS в каталоге adqm_data_sources создайте файл test_1.txt с тестовыми данными для последующей вставки в таблицу ADQM:
"one",1 "two",2 "three",3
URI файла для обращения из ADQM — hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt, где <namenode_ip> — IP-адрес хоста ADH с NameNode в состоянии Active.
-
Запустите консольный клиент clickhouse-client и создайте таблицу на базе движка HDFS со структурой, соответствующей данным, которые будут импортироваться из HDFS:
CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV');
-
Запросите данные из таблицы:
SELECT * FROM hdfs_table;
┌─name──┬─value─┐ │ one │ 1 │ │ two │ 2 │ │ three │ 3 │ └───────┴───────┘
Вставка данных в таблицу HDFS
-
Включите настройку
hdfs_create_new_file_on_insert
, чтобы при каждой вставке данных создавался новый файл в HDFS (например, если таблица была создана на основе файла file_name.txt, новым файлам названия будут назначаться по шаблону — file_name.1.txt, file_name.2.txt и так далее):SET hdfs_create_new_file_on_insert = 1;
-
Выполните вставку данных, повторив два запроса
INSERT INTO
:INSERT INTO hdfs_table VALUES('four', 4);
INSERT INTO hdfs_table VALUES('five', 5);
-
Проверьте содержимое каталога adqm_data_sources в HDFS:
$ hdfs dfs -ls /adqm_data_sources
В каталоге создано два новых файла test_1.1.txt и test_1.2.txt:
Found 3 items -rwxrwxrwx 3 clickhouse hadoop 9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt -rwxrwxrwx 3 clickhouse hadoop 9 2023-10-25 13:40 /adqm_data_sources/test_1.2.txt -rwxrwxrwx 3 clickhouse hadoop 23 2023-10-25 13:25 /adqm_data_sources/test_1.txt
Посмотрите содержимое этих файлов:
$ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
"four",4
$ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
"five",5
При последующем чтении таблицы можно убедиться, что в выборку будут включены данные из всех файлов:
SELECT * FROM hdfs_table;
┌─name──┬─value─┐ │ one │ 1 │ │ two │ 2 │ │ three │ 3 │ └───────┴───────┘ ┌─name─┬─value─┐ │ five │ 5 │ └──────┴───────┘ ┌─name─┬─value─┐ │ four │ 4 │ └──────┴───────┘
-
Активируйте опцию
hdfs_truncate_on_insert
и выполните еще одну вставку данных в таблицу — в этом случае последний файл с данными будет перезаписан:SET hdfs_truncate_on_insert = 1;
INSERT INTO hdfs_table VALUES('six', 6);
-
Проверьте содержимое каталога adqm_data_sources в HDFS:
$ hdfs dfs -ls /adqm_data_sources
Новый файл не был добавлен, но файл test_1.2.txt обновлен:
Found 3 items -rwxrwxrwx 3 clickhouse hadoop 9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt -rwxrwxrwx 3 clickhouse hadoop 9 2023-10-25 13:45 /adqm_data_sources/test_1.2.txt -rwxrwxrwx 3 clickhouse hadoop 23 2023-10-25 13:25 /adqm_data_sources/test_1.txt
$ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
"six",6
Если настройка
hdfs_truncate_on_insert
включена, новые данные будут заменять текущее содержимое существующего файла в любом случае, независимо от значенияhdfs_create_new_file_on_insert
.
Табличные функции
ADQM также предоставляет две табличные функции для интеграции с HDFS:
-
hdfs — создает таблицу для чтения/записи данных в HDFS.
Синтаксис функции:
hdfs(<URI>, <format>, <structure>)
где:
-
<URI>
— URI файла в HDFS (в режиме использования функции для чтения можно указать путь к нескольким файлам с помощью символов подстановки); -
<format>
— формат, в котором функция будет принимать или отдавать данные (подробная информация о форматах для импорта и экспорта данных ADQM/ClickHouse представлена в статье Formats for Input and Output Data документации ClickHouse); -
<structure>
— структура таблицы в форматеcolumn_name1 data_type1, column_name2 data_type2, …
.
-
-
hdfsCluster — позволяет обрабатывать файлы HDFS параллельно из нескольких узлов в указанном кластере. На узле-инициаторе функция создает соединение со всеми узлами в кластере, заменяет символы
*
в пути к файлу HDFS и динамически отправляет каждый файл. На рабочем узле функция запрашивает у инициатора следующую задачу и обрабатывает ее. Это повторяется до тех пор, пока все задачи не будут завершены.Синтаксис функции:
hdfs(<cluster_name>, <URI>, <format>, <structure>)
где
<cluster_name>
— имя кластера, используемое для создания набора адресов и параметров подключения к удаленным и локальным серверам; остальные параметры аналогичны параметрам функцииhdfs
.
Пример
Чтение данных из файла HDFS
Выполните следующий запрос, чтобы получить в ADQM данные из файла HDFS test_1.txt с помощью функции hdfs
:
SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');
┌─column1─┬─column2─┐ │ one │ 1 │ │ two │ 2 │ │ three │ 3 │ └─────────┴─────────┘
Запись данных в HDFS
Если настройка hdfs_truncate_on_insert
включена, при вставке данных через функцию hdfs
существующие данные в файле, на которую ссылается функция, будут заменяться новыми данными. В этом случае необходимо, чтобы у пользователя clickhouse
в HDFS были права на изменение этого файла.
Например, выполните следующий запрос (значение hdfs_truncate_on_insert
было установлено в 1
в приведенном выше примере для табличного движка HDFS):
INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_1', 100);
В HDFS проверьте, что содержимое файла test_1.txt обновилось:
$ hdfs dfs -cat /adqm_data_sources/test_1.txt
"hdfs_func_test_string_1",100
Если значение параметра hdfs_truncate_on_insert
установлено в 0
и при этом активирована настройка hdfs_create_new_file_on_insert
, при вставке данных в HDFS через функцию hdfs
, которая ссылается на файл file_name.txt, будет создан (или перезаписан, если уже существует) файл с именем file_name.1.txt.
Например, выполните запрос на запись данных в файл test_1.txt:
SET hdfs_truncate_on_insert = 0, hdfs_create_new_file_on_insert = 1;
INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_2', 200);
В результате новые данные запишутся в файл test_1.1.txt:
$ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
"hdfs_func_test_string_2",200
Символы подстановки в пути к файлу HDFS
Если таблица ADQM будет принимать данные из нескольких файлов HDFS и использоваться только для чтения, путь к файлам HDFS в параметре URI
табличного движка HDFS или табличной функции hdfs
/hdfsCluster
можно указать с использованием символов подстановки:
-
*
— заменяет любое количество любых символов кроме разделителя пути/
, включая отсутствие символов; -
?
— заменяет любой одиночный символ; -
{first_string,second_string,third_one}
-- заменяет любую из строк'first_string'
,'second_string'
,'third_one'
(в этом шаблоне также можно использовать числа — например,{1,3,5}
); -
{n..m}
— заменяет любое число в диапазоне[n, m]
.
Символы подстановки могут содержаться в нескольких компонентах пути (например, в названии каталога и в названии файла). Обрабатываются только файлы, существующие в файловой системе, пути и названия которых полностью соответствуют шаблону. Список файлов определяется во время выполнения операции SELECT
(не CREATE
).
Пример
-
Создайте несколько файлов со следующими URI в HDFS:
-
hdfs://<namenode_ip>:8020/first_dir/file_1.txt
-
hdfs://<namenode_ip>:8020/first_dir/file_2.txt
-
hdfs://<namenode_ip>:8020/second_dir/file_3.txt
-
hdfs://<namenode_ip>:8020/second_dir/file_4.txt
Данные во всех файлах должны согласовываться с форматом и схемой, которые будут указываться в запросах при подключении из ADQM.
-
-
С помощью функции
hdfs
импортируйте в ADQM данные из всех приведенных выше файлов одним из способов:SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_{1..4}.txt', 'CSV', 'column1 String, column2 UInt32');
SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_?.txt', 'CSV', 'column1 String, column2 UInt32');
Следующий запрос считывает данные из всех файлов в каталогах first_dir и second_dir:
SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/*', 'CSV', 'column1 String, column2 UInt32');
-
Если список файлов содержит числовые интервалы с ведущими нулями в названиях файлов, можно использовать общий шаблон
{0n..0m}
или отдельный шаблон{n..m}
(либо знак?
) для каждой цифры в названии файла. Например, следующие запросы создают таблицу с данными из файлов file_000.txt, file_001.txt, …, file_999.txt:CREATE TABLE hdfs_table_1 (column1 String, column2 UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{001..999}.txt', 'CSV');
CREATE TABLE hdfs_table_2 (column1 String, column2 UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{0..9}{0..9}{0..9}.txt', 'CSV');
CREATE TABLE hdfs_table_3 (column1 String, column2 UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_???.txt', 'CSV');
Режим высокой доступности (HA) HDFS
Если в кластере ADH для HDFS включен режим высокой доступности (High Availability, HA), выполните настройку:
-
Скопируйте файл /etc/hadoop/conf/hdfs-site.xml с хоста ADH на хост ADQM в папку /etc/clickhouse-server/.
-
Добавьте следующую секцию в файл конфигурации ClickHouse /etc/clickhouse-server/config.xml:
<hdfs> <libhdfs3_conf>/etc/clickhouse-server/hdfs-site.xml</libhdfs3_conf> </hdfs>
-
Перезапустите сервер ClickHouse:
$ sudo systemctl restart clickhouse-server
После этого при подключения к HDFS через табличный движок HDFS или табличные функции можно использовать значение тега dfs.nameservices
в файле hdfs-site.xml вместо <namenode_ip>:8020
в URI файла HDFS. Например, если значение тега dfs.nameservices
— adhcluster
, то запрос на создание таблицы hdfs_table и вызов функции hdfs, приведенные в примерах выше, можно изменить следующим образом:
CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV');
SELECT * FROM hdfs('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');