Integration between ADQM and HDFS

For integration with the Hadoop Distributed File System (HDFS), ClickHouse implements the HDFS table engine and special table functions, which allow reading and writing HDFS data from ClickHouse. This article describes ways to use these tools on the example of ADQM and ADH (Arenadata Hadoop) that provides the HDFS service.

To integrate with HDFS, send ADQM queries to an ADH host with a NameNode in the Active state (in examples below, replace <namenode_ip> in URI to HDFS files with the IP address of the corresponding host that belongs to your ADH cluster). When connecting to a NameNode in the Standby status, ADQM can return the following error message:

Exception: Unable to connect to HDFS: Operation category READ is not supported in state standby.

HDFS table engine

The HDFS table engine allows you to manage data stored in HDFS files from ADQM. This table type supports parallel reads and writes, but has a number of limitations (replication, indexes, ALTER and SELECT…​SAMPLE operations are not supported).

Below is the basic syntax of a query that creates an HDFS table:

CREATE TABLE <table_name> (<column_name> <column_type>, ...) ENGINE = HDFS(<URI>, <format>);

HDFS table engine parameters:

  • <URI> — whole URI to access a file in HDFS. The file path part of the URI (the directory and file names) may include wildcards to address multiple files — in this case, the table will be read-only.

  • <format> — format that ADQM will use to receive table data when executing SELECT queries and return data when executing INSERT queries (see available formats in the Input and Output columns of the Formats for Input and Output Data table, respectively).

When exporting data from an ADQM table to the HDFS file system, use the hdfs_create_new_file_on_insert and hdfs_truncate_on_insert parameters that allow you to control how data should be saved to HDFS files — whether new files should be created or existing files should be overwritten. By default, both options are set to 0, and the following message is shown in an attempt to insert data into an ADQM table:

Exception: File with path <file_path> already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert,
if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert.

where <file_path> is an HDFS path to a file, based on which an HDFS table was created in ADQM.

In the ADQM configuration file, you can set up extended configuration for HDFS tables: specify global settings in the <hdfs> section and user-level settings in the hdfs_<user_name> section (replace <user_name> with a user name). See the list of parameters that can be configured for the HDFS table engine in the corresponding section of the HDFS article in the ClickHouse documentation.

Example

Create an HDFS table

  1. In the adqm_data_sources HDFS directory, create the test_1.txt file with test data to be later inserted into ADQM tables:

    "one",1
    "two",2
    "three",3

    This file URI to access from ADQM is hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt, where <namenode_ip> is an IP address of an ADH host on which an active NameNode runs.

  2. Run the clickhouse-client console client and create a table based on the HDFS engine with a structure corresponding to data to be imported from HDFS:

    CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV');
  3. Select data from the table:

    SELECT * FROM hdfs_table;
    ┌─name──┬─value─┐
    │ one   │     1 │
    │ two   │     2 │
    │ three │     3 │
    └───────┴───────┘

Insert data into an HDFS table

  1. Enable the hdfs_create_new_file_on_insert setting to create a new file in HDFS on each data insert in an ADQM table which is based on the HDFS engine (for example, if an HDFS table has been created for the file_name.txt file, names of new files will be generated according to the pattern — file_name.1.txt, file_name.2.txt, and so on):

    SET hdfs_create_new_file_on_insert = 1;
  2. Insert data into the table by repeating the INSERT INTO query twice:

    INSERT INTO hdfs_table VALUES('four', 4);
    INSERT INTO hdfs_table VALUES('five', 5);
  3. Check the content of the adqm_data_sources directory in HDFS:

    $ hdfs dfs -ls /adqm_data_sources

    As you can see, two new files (test_1.1.txt and test_1.2.txt) have been created in the directory:

    Found 3 items
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.2.txt
    -rwxrwxrwx   3 clickhouse hadoop         23 2023-10-25 13:25 /adqm_data_sources/test_1.txt

    View the content of these files:

    $ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
    "four",4
    $ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
    "five",5

    On subsequent reading the table, you can make sure that a sample includes data from all files:

    SELECT * FROM hdfs_table;
    ┌─name──┬─value─┐
    │ one   │     1 │
    │ two   │     2 │
    │ three │     3 │
    └───────┴───────┘
    ┌─name─┬─value─┐
    │ five │     5 │
    └──────┴───────┘
    ┌─name─┬─value─┐
    │ four │     4 │
    └──────┴───────┘
  4. Activate the hdfs_truncate_on_insert option and perform another data insertion into the table — this will overwrite the last data file :

    SET hdfs_truncate_on_insert = 1;
    INSERT INTO hdfs_table VALUES('six', 6);
  5. Check the content of the adqm_data_sources directory in HDFS:

    $ hdfs dfs -ls /adqm_data_sources

    No new file was added, but the test_1.2.txt file was overwritten:

    Found 3 items
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:40 /adqm_data_sources/test_1.1.txt
    -rwxrwxrwx   3 clickhouse hadoop          9 2023-10-25 13:45 /adqm_data_sources/test_1.2.txt
    -rwxrwxrwx   3 clickhouse hadoop         23 2023-10-25 13:25 /adqm_data_sources/test_1.txt
    $ hdfs dfs -cat /adqm_data_sources/test_1.2.txt
    "six",6

    If the hdfs_truncate_on_insert setting is enabled, new data will replace the current contents of an existing file in any case, regardless of the hdfs_create_new_file_on_insert value.

Table functions

ADQM also provides two table functions for integration with HDFS:

  • hdfs — creates a table for reading/writing data in HDFS.

    Function syntax:

    hdfs(<URI>, <format>, <structure>)

    where:

    • <URI> — URI to a file in HDFS (when using the function in the read-only mode, you can specify the path to multiple files with wildcards);

    • <format> — format in which the function will receive or output data (see detailed information about formats to import and export ADQM/ClickHouse data in the Formats for Input and Output Data article of the ClickHouse documentation);

    • <structure> — table structure in the column_name1 data_type1, column_name2 data_type2, …​ format.

  • hdfsCluster — allows processing HDFS files in parallel from multiple nodes in a specified cluster. On an initiator node, the function creates a connection to all nodes in the cluster, discloses asterisks in the HDFS file path, and dispatches each file dynamically. On a worker node, it requests the next task from the initiator and processes it. This is repeated until all tasks are completed.

    Function syntax:

    hdfs(<cluster_name>, <URI>, <format>, <structure>)

    where <cluster_name> is a name of a cluster used to build a set of addresses and connection parameters to remote and local servers, and other parameters are similar to parameters of the hdfs function.

Example

Read data from an HDFS file

Run the following query to read data from the test_1.txt HDFS file with the hdfs function:

SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');
┌─column1─┬─column2─┐
│ one     │       1 │
│ two     │       2 │
│ three   │       3 │
└─────────┴─────────┘

Write data to HDFS

When inserting data through the hdfs function with the hdfs_truncate_on_insert setting enabled, existing data in a file referenced by the function will be replaced with new data. In this case, the clickhouse user in HDFS should have permissions to modify this file.

For example, run the following query (the hdfs_truncate_on_insert value was set to 1 in the above example for the HDFS table engine):

INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_1', 100);

In HDFS, check that the test_1.txt file has been updated:

$ hdfs dfs -cat /adqm_data_sources/test_1.txt
"hdfs_func_test_string_1",100

If the hdfs_truncate_on_insert parameter value is set to 0 and the hdfs_create_new_file_on_insert setting is enabled, a new file will be created when inserting data into HDFS through the hdfs function. For example, if the function inserts data into the file_name.txt file, it will create a file named file_name.1.txt to save new data or overwrite the file_name.1.txt file if it already exists.

Run the following query to export data to the test_1.txt file:

SET hdfs_truncate_on_insert = 0, hdfs_create_new_file_on_insert = 1;
INSERT INTO FUNCTION hdfs('hdfs://<namenode_ip>:8020/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32')
VALUES ('hdfs_func_test_string_2', 200);

As a result, new data will be written to the test_1.1.txt file:

$ hdfs dfs -cat /adqm_data_sources/test_1.1.txt
"hdfs_func_test_string_2",200

Wildcards in path

If an ADQM table should accept data from a bunch of HDFS files and be read-only, you can use the following wildcards to specify the path to multiple HDFS files in the URI parameter of the HDFS table engine or the hdfs/hdfsCluster table function:

  • * — substitutes any number of any characters except /, including an empty string;

  • ? — substitutes any single character;

  • {first_string,second_string,third_one} — substitutes any of strings 'first_string', 'second_string', 'third_one' (you can also use numbers in this pattern — for example, {1,3,5});

  • {n..m} — substitutes any number in the [n, m] range.

Wildcard characters can appear in multiple path components (for example, in a directory name and in a file name). Only files that exist in the file system and match to the whole path pattern are processed. The list of files is determined during the SELECT (not CREATE) operation.

Example

  1. Create multiple files with the following URIs in HDFS:

    • hdfs://<namenode_ip>:8020/first_dir/file_1.txt

    • hdfs://<namenode_ip>:8020/first_dir/file_2.txt

    • hdfs://<namenode_ip>:8020/second_dir/file_3.txt

    • hdfs://<namenode_ip>:8020/second_dir/file_4.txt

    Data in all files should comply with the format and scheme that will be specified in queries connecting from ADQM.

  2. You can import data from all the above files into ADQM with the hdfs function in one of the following ways:

    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_{1..4}.txt', 'CSV', 'column1 String, column2 UInt32');
    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/file_?.txt', 'CSV', 'column1 String, column2 UInt32');

    The following query reads data from all files in the first_dir and second_dir directories:

    SELECT * FROM hdfs('hdfs://<namenode_ip>:8020/{first,second}_dir/*', 'CSV', 'column1 String, column2 UInt32');
  3. If the list of files contains a number range with leading zeros in file names, you can use the general pattern {0n..0m}, or the separate pattern {n..m} (or the sign ?) for each digit in a file name. For example, you can create a table with data from files named file_000.txt, file_001.txt, …​, file_999.txt as follows:

    CREATE TABLE hdfs_table_1 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{001..999}.txt', 'CSV');
    CREATE TABLE hdfs_table_2 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_{0..9}{0..9}{0..9}.txt', 'CSV');
    CREATE TABLE hdfs_table_3 (column1 String, column2 UInt32)
    ENGINE=HDFS('hdfs://<namenode_ip>:8020/<dir_name>/file_???.txt', 'CSV');

HDFS high availability (HA)

If the high availability (HA) mode is enabled for HDFS on an ADH cluster, perform the following configuration:

  1. Copy the /etc/hadoop/conf/hdfs-site.xml file from an ADH host to the /etc/clickhouse-server/ folder on an ADQM host.

  2. Add the following setting to the /etc/clickhouse-server/config.xml configuration file of ClickHouse:

    <hdfs>
        <libhdfs3_conf>/etc/clickhouse-server/hdfs-site.xml</libhdfs3_conf>
    </hdfs>
  3. Restart the ClickHouse server:

    $ sudo systemctl restart clickhouse-server

Now, when connecting to HDFS via the HDFS table engine or table functions, you can use a value of the dfs.nameservices tag in the hdfs-site.xml file instead of <namenode_ip>:8020 in an HDFS file URI. For example, if the value of the dfs.nameservices tag is adhcluster, then a query creating the hdfs_table table and the hdfs function call from examples above can be modified as follows:

CREATE TABLE hdfs_table (name String, value UInt32) ENGINE=HDFS('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV');
SELECT * FROM hdfs('hdfs://adhcluster/adqm_data_sources/test_1.txt', 'CSV', 'column1 String, column2 UInt32');
Found a mistake? Seleсt text and press Ctrl+Enter to report it