Интеграция ADQM и S3

Для интеграции с хранилищем S3 в ClickHouse поддерживаются табличный движок S3 и специальные табличные функции, которые позволяют осуществлять чтение и запись данных S3 из ClickHouse.

Файлы S3 для тестовых примеров

 
В качестве хранилища S3 в примерах, приведенных в данной статье, используется бакет test-bucket в Yandex Object Storage. В бакет загружены следующие объекты (файлы) для тестирования интеграции ADQM и S3:

  • test_1.txt:

    "one",1
    "two",2
    "three",3
  • test_2.txt:

    "ten",10
    "twenty",20
    "thirty",30

Чтобы логически структурировать данные и сгруппировать файлы внутри бакета, используется префикс (каталог) adqm. Таким образом, обращаться к файлам из ADQM можно по следующим URL-адресам:

  • https://storage.yandexcloud.net/test-bucket/adqm/test_1.txt

  • https://storage.yandexcloud.net/test-bucket/adqm/test_2.txt

Параметры интеграции

Параметры движка S3 и табличных функций

В следующей таблице описаны параметры табличного движка и табличных функций для интеграции с S3 (базовый синтаксис запросов приводится в соответствующих разделах ниже).

<path>

URL бакета с путем к файлу. В режиме использования таблицы или функции для чтения можно указать путь к нескольким файлам с помощью символов подстановки

<access_key_id>, <secret_access_key>

Идентификатор ключа доступа и секретный ключ для аутентификации запросов к хранилищу S3. Ключ доступа не обязательно указывать с помощью этих параметров — можно указать учетные данные в конфигурационном файле или с помощью настройки use_environment_credentials включить возможность извлекать их из переменных окружения.

Если для бакета определен публичный доступ, вместо параметров ключа можно указать ключевое слово NOSIGN — в этом случае запросы не будут подписываться

<session_token>

Токен сессии для временного ключа доступа, который можно опционально указать вместе с <access_key_id> и <secret_access_key>

<format>

Формат файла. Полный список форматов, доступных в ADQM/ClickHouse для входных и выходных данных, можно посмотреть в статье Formats for input and output data документации ClickHouse

<compression>

Метод сжатия. Возможные значения:

  • none;

  • gzip или gz;

  • brotli или br;

  • xz или LZMA;

  • zstd или zst.

По умолчанию метод сжатия автоматически определяется по расширению файла

<headers>

Заголовки запроса S3 в формате headers('key1'='value1', 'key2'='value2', …​)

РЕКОМЕНДАЦИЯ
В реальных системах рекомендуется передавать параметры через именованные коллекции (named collections).

Дополнительные настройки

Параметры на уровне запроса/сессии

ADQM также поддерживает следующие дополнительные параметры интеграции с S3, которые можно настроить перед выполнением запроса или указать необходимые значения в конфигурационном файле на уровне пользователя:

SELECT name, default FROM system.settings WHERE startsWith(name, 's3_');
    ┌─name───────────────────────────────────────────────┬─default────┐
 1. │ s3_strict_upload_part_size                         │ 0          │
 2. │ s3_min_upload_part_size                            │ 16777216   │
 3. │ s3_max_upload_part_size                            │ 5368709120 │
 4. │ s3_upload_part_size_multiply_factor                │ 2          │
 5. │ s3_upload_part_size_multiply_parts_count_threshold │ 500        │
 6. │ s3_max_part_number                                 │ 10000      │
 7. │ s3_max_single_operation_copy_size                  │ 33554432   │
 8. │ s3_max_inflight_parts_for_one_file                 │ 20         │
 9. │ s3_max_single_part_upload_size                     │ 33554432   │
10. │ s3_max_single_read_retries                         │ 4          │
11. │ s3_max_unexpected_write_error_retries              │ 4          │
12. │ s3_max_redirects                                   │ 10         │
13. │ s3_max_connections                                 │ 1024       │
14. │ s3_max_get_rps                                     │ 0          │
15. │ s3_max_get_burst                                   │ 0          │
16. │ s3_max_put_rps                                     │ 0          │
17. │ s3_max_put_burst                                   │ 0          │
18. │ s3_list_object_keys_size                           │ 1000       │
19. │ s3_use_adaptive_timeouts                           │ 1          │
20. │ s3_truncate_on_insert                              │ 0          │
21. │ s3_create_new_file_on_insert                       │ 0          │
22. │ s3_skip_empty_files                                │ 0          │
23. │ s3_check_objects_after_upload                      │ 0          │
24. │ s3_allow_parallel_part_upload                      │ 1          │
25. │ s3_throw_on_zero_files_match                       │ 0          │
26. │ s3_ignore_file_doesnt_exist                        │ 0          │
27. │ s3_validate_request_settings                       │ 1          │
28. │ s3_disable_checksum                                │ 0          │
29. │ s3_retry_attempts                                  │ 100        │
30. │ s3_request_timeout_ms                              │ 30000      │
31. │ s3_connect_timeout_ms                              │ 1000       │
    └────────────────────────────────────────────────────┴────────────┘

Описание параметров можно посмотреть в статье Session Settings документации ClickHouse.

В конфигурационном файле (/etc/clickhouse-server/config.xml или в отдельном XML-файле директории /etc/clickhouse-server/config.d/) внутри тега <s3> можно настроить параметры для конкретной конечной точки (endpoint), которые будут использоваться в запросах, где префикс указанного URL точно совпадает с этой конечной точкой.

Конфигурация в общем виде:

<clickhouse>
    <s3>
        <endpoint_config>
            <endpoint>ENDPOINT_URL</endpoint>
            <access_key_id>ACCESS_KEY_ID</access_key_id>
            <secret_access_key>SECRET_ACCESS_KEY</secret_access_key>
            <!-- Other endpoint-related parameters -->
        </endpoint_config>
    </s3>
</clickhouse>

где:

  • endpoint_config — название конфигурации.

  • ENDPOINT_URL — URL точки приема запроса на стороне S3, включающий URL бакета и путь к каталогу, где хранятся данные (префикс). Обязательный параметр.

  • ACCESS_KEY_ID, SECRET_ACCESS_KEY — учетные данные для доступа к данной конечной точке. Эти параметры не являются обязательными — например, они не указываются, если бакет публичный или включена опция для считывания учетных данных из переменных окружения (см. следующий раздел).

Полный список настроек подключения к конечной точке S3 приведен в разделе Endpoint-based Settings документации ClickHouse.

Получение учетных данных для доступа к S3 из переменных окружения

Чтобы явно не указывать учетные данные доступа к бакету S3 в запросах или конфигурационных файлах, в ADQM можно включить опцию use_environment_credentials для считывания данных из переменных окружения AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN и метаданных Amazon EC2:

  • на уровне конечной точки S3:

    <clickhouse>
        <s3>
            <endpoint_config>
                <use_environment_credentials>true</use_environment_credentials>
                <!-- Other endpoint-related parameters -->
            </endpoint_config>
        </s3>
    </clickhouse>
  • на глобальном уровне:

    <clickhouse>
        <s3>
            <use_environment_credentials>true</use_environment_credentials>
        </s3>
    </clickhouse>

Табличный движок S3

Табличный движок S3 позволяет направлять запросы SELECT и INSERT к данным в хранилище S3.

Создание таблицы

Базовый синтаксис запроса для создания таблицы S3:

CREATE TABLE <table_name> (<column_name> <column_type>, ...)
ENGINE = S3('<path>' [, NOSIGN | '<access_key_id>', '<secret_access_key>' [, '<session_token>']]
            [, '<format>'] [, '<compression>'] [, <headers>])
[PARTITION BY <expr>]
[SETTINGS <setting_name> = <setting_value>, ...];

Описание параметров приведено выше.

Движок не поддерживает запросы ALTER и SELECT…​SAMPLE, а также индексирование.

Пример

Создание таблицы S3

  1. В консольном клиенте clickhouse-client создайте таблицу на базе движка S3 со структурой, соответствующей данным, которые будут импортироваться из S3:

    CREATE TABLE s3_table (name String, value UInt32)
    ENGINE=S3('https://storage.yandexcloud.net/test-bucket/adqm/test_1.txt',
              '<access_key_id>',
              '<secret_access_key>',
              'CSV');

    где <access_key_id> и <secret_access_key> — ключ доступа к бакету test-bucket.

  2. Запросите данные из таблицы:

    SELECT * FROM s3_table;
       ┌─name──┬─value─┐
    1. │ one   │     1 │
    2. │ two   │     2 │
    3. │ three │     3 │
       └───────┴───────┘

Вставка данных в таблицу S3

  1. Включите настройку s3_create_new_file_on_insert, чтобы при каждой вставке данных создавался новый файл в S3 (например, если таблица была создана на основе файла file_name.txt, новым файлам названия будут назначаться по шаблону — file_name.1.txt, file_name.2.txt и так далее):

    SET s3_create_new_file_on_insert = 1;
  2. Выполните вставку данных, повторив дважды запрос INSERT INTO:

    INSERT INTO s3_table VALUES('four', 4);
    INSERT INTO s3_table VALUES('five', 5);
  3. Проверьте содержимое бакета — в каталог adqm добавлено два новых файла test_1.1.txt и test_1.2.txt, которые содержат по одной строке ("four",4 и "five",5 соответственно).

    При последующем чтении таблицы можно убедиться, что в выборку будут включены данные из всех файлов:

    SELECT * FROM s3_table;
    ┌─name──┬─value─┐
    │ one   │     1 │
    │ two   │     2 │
    │ three │     3 │
    └───────┴───────┘
    ┌─name─┬─value─┐
    │ five │     5 │
    └──────┴───────┘
    ┌─name─┬─value─┐
    │ four │     4 │
    └──────┴───────┘
  4. Активируйте опцию s3_truncate_on_insert и выполните еще одну вставку данных в таблицу — в этом случае последний файл с данными будет перезаписан:

    SET s3_truncate_on_insert = 1;
    INSERT INTO s3_table VALUES('six', 6);
  5. Проверьте содержимое бакета. Список файлов в каталоге adqm не изменился, но файл test_1.2.txt был обновлен — теперь он содержит строку "six",6.

    Если настройка s3_truncate_on_insert включена, новые данные будут заменять текущее содержимое существующего файла в любом случае, независимо от значения s3_create_new_file_on_insert.

Табличные функции

ADQM также предоставляет две табличные функции для интеграции с S3:

  • s3 — создает таблицу для чтения/записи данных в S3.

    Синтаксис функции:

    s3('<path>' [, NOSIGN | '<access_key_id>', '<secret_access_key>' [, '<session_token>']]
       [, '<format>'] [, '<structure>'] [, '<compression>'] [, <headers>])

    где <structure> — структура таблицы в формате column_name1 data_type1, column_name2 data_type2, …​. Остальные параметры описаны выше.

  • s3Cluster — позволяет обрабатывать файлы S3 параллельно из нескольких узлов указанного кластера. На узле-инициаторе функция создает соединение со всеми узлами кластера, заменяет символы * в пути к файлу S3 и динамически отправляет каждый файл. На рабочем узле функция запрашивает у инициатора следующую задачу и обрабатывает ее. Это повторяется до тех пор, пока все задачи не будут завершены.

    Синтаксис функции:

    s3Cluster('<cluster_name>',
              '<path>' [, NOSIGN | '<access_key_id>', '<secret_access_key>' [, '<session_token>']]
              [, '<format>'] , '<structure>' [, '<compression>'] [, <headers>])

    где <cluster_name> — имя кластера, используемое для создания набора адресов и параметров подключения к удаленным и локальным серверам. Остальные параметры аналогичны параметрам функции s3, при этом указание структуры таблицы является обязательным.

Пример

Чтение данных из S3

Выполните следующий запрос, чтобы получить в ADQM данные из файла S3 test_2.txt с помощью функции s3:

SELECT * FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/test_2.txt',
                 '<access_key_id>',
                 '<secret_access_key>',
                 'CSV',
                 'name String, value UInt32');
   ┌─name───┬─value─┐
1. │ ten    │    10 │
2. │ twenty │    20 │
3. │ thirty │    30 │
   └────────┴───────┘

Запись данных в S3

Если настройка s3_truncate_on_insert включена, при вставке данных через функцию s3 существующие данные в файле, на которую ссылается функция, будут заменяться новыми данными.

Например, выполните следующий запрос (значение s3_truncate_on_insert было установлено в 1 в приведенном выше примере для табличного движка S3):

INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/test-bucket/adqm/test_2.txt',
                        '<access_key_id>', '<secret_access_key>',
                        'CSV')
VALUES ('s3_func_test_string_1', 100);

Проверьте, что содержимое файла test_2.txt обновилось:

SELECT * FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/test_2.txt',
                 '<access_key_id>',
                 '<secret_access_key>',
                 'CSV');
   ┌─c1────────────────────┬──c2─┐
1. │ s3_func_test_string_1 │ 100 │
   └───────────────────────┴─────┘

Если значение параметра s3_truncate_on_insert установлено в 0 и при этом активирована настройка s3_create_new_file_on_insert, при вставке данных в S3 через функцию s3, которая ссылается на файл file_name.txt, будет создан (или перезаписан, если уже существует) файл с именем file_name.1.txt.

Обновите настройки вставки данных и вызовите функцию s3 для записи данных в файл test_2.txt:

SET s3_truncate_on_insert = 0, s3_create_new_file_on_insert = 1;
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/test-bucket/adqm/test_2.txt',
                        '<access_key_id>', '<secret_access_key>',
                        'CSV')
VALUES ('s3_func_test_string_2', 200);

В результате данные запишутся в новый файл test_2.1.txt:

SELECT * FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/test_2.1.txt',
                 '<access_key_id>',
                 '<secret_access_key>',
                 'CSV');
   ┌─c1────────────────────┬──c2─┐
1. │ s3_func_test_string_2 │ 200 │
   └───────────────────────┴─────┘

Использование именованных коллекций

Чтобы не передавать все параметры подключения к хранилищу S3 каждый раз при создании таблицы S3 или вызове табличной функции, можно указать их один раз в именованной коллекции (named collection) в виде списка пар ключ/значение. Кроме того, этот подход позволяет скрыть конфиденциальные учетные данные для интеграции с S3 от пользователей без прав доступа администратора.

Ключи параметров

Основные ключи в именованной коллекции для хранения настроек подключения к хранилищу S3 соответствуют именам описанных выше параметров: url, access_key_id, secret_access_key, session_token, format, compression (или compression_method), structure, use_environment_credentials. Дополнительно поддерживаются следующие параметры.

filename

Имя файла, которое добавляется к URL

no_sign_request

Указывает, нужно ли игнорировать учетные данные, чтобы запросы не подписывались (например, при обращении к публичным бакетам)

expiration_window_seconds

Период для проверки истечения срока действия учетных данных

max_connections

Максимальное количество подключений к серверу

max_single_read_retries

Максимальное количество попыток запроса во время одного чтения S3

min_upload_part_size

Минимальный размер части объекта для загрузки при выполнении multipart-загрузки в S3

upload_part_size_multiply_parts_count_threshold

Количество частей объекта, при загрузке которого в S3 значение min_upload_part_size умножается на upload_part_size_multiply_factor

upload_part_size_multiply_factor

Коэффициент, на который умножается значение min_upload_part_size каждый раз, когда количество частей объекта, равное значению upload_part_size_multiply_parts_count_threshold, загрузилось одной операцией записи в S3

max_single_part_upload_size

Максимальный размер объекта для загрузки с использованием singlepart-загрузки в S3

Пример

  1. Создайте именованную коллекцию с параметрами подключения к бакету S3:

    CREATE NAMED COLLECTION test_bucket_creds AS
    url = 'https://storage.yandexcloud.net/test-bucket/adqm/',
    access_key_id = '<access_key_id>',
    secret_access_key = '<secret_access_key>';
  2. Используйте именованную коллекцию при вызове табличной функции s3 (файл данных не определен в именованной коллекции, поэтому его необходимо указать отдельно в параметрах функции):

    SELECT * FROM s3(test_bucket_creds, filename = 'test_1.txt');
       ┌─c1────┬─c2─┐
    1. │ one   │  1 │
    2. │ two   │  2 │
    3. │ three │  3 │
       └───────┴────┘
  3. Создайте таблицу S3 с помощью именованной коллекции и считайте из нее данные:

    CREATE TABLE s3_table_nc (name String, value UInt32)
    ENGINE=S3(test_bucket_creds, filename = 'test_1.txt');
    SELECT * FROM s3_table_nc;
       ┌─name──┬─value─┐
    1. │ one   │     1 │
    2. │ two   │     2 │
    3. │ three │     3 │
       └───────┴───────┘

Символы подстановки в пути к файлам S3

Если таблица ADQM будет принимать данные из нескольких файлов S3 и использоваться только для чтения, путь к файлам S3 в параметре path табличного движка S3 или табличной функции s3/s3Cluster можно указать с использованием символов подстановки:

  • * — заменяет любое количество любых символов кроме разделителя пути /, включая отсутствие символов;

  • ** — заменяет любое количество любых символов, включая разделитель пути / и отсутствие символов (этот шаблон можно использовать для рекурсивного обхода каталога);

  • ? — заменяет любой одиночный символ;

  • {first_string,second_string,third_one} — заменяет любую из строк 'first_string', 'second_string', 'third_one' (в этом шаблоне также можно использовать числа — например, {1,3,5});

  • {n..m} — заменяет любое число в диапазоне [n, m]. Если список файлов содержит диапазон чисел с ведущими нулями, можно использовать общий шаблон {0n..0m} или отдельный шаблон {n..m} (либо знак ?) для каждой цифры в названии файла.

Символы подстановки могут содержаться в нескольких компонентах пути (например, в названии каталога и в названии файла). Обрабатываются только существующие файлы, пути и названия которых полностью соответствуют шаблону. Список файлов определяется во время выполнения операции SELECT (не CREATE).

Примеры

Импортировать в ADQM данные из приведенных выше файлов test_1.txt и test_2.txt можно одним из способов:

SELECT * FROM s3(test_bucket_creds, filename = 'test_{1,2}.txt');
SELECT * FROM s3(test_bucket_creds, filename = 'test_?.txt');

Получить данных из всех файлов в каталоге adqm можно с помощью следующего запроса:

SELECT * FROM s3(test_bucket_creds, filename = '*');

Партиционирование данных при записи

Если при создании таблицы S3 указать выражение PARTITION BY, то для каждого значения ключа партиционирования в хранилище S3 будет создаваться отдельный файл при вставке данных в таблицу. Например, может быть полезно разделять данные на отдельные файлы для их дальнейшего переноса в другую систему — так как наборы данных ClickHouse часто очень велики, а надежность сети иногда недостаточна, имеет смысл передавать наборы данных относительно небольшими частями.

Ключ партиционирования также можно указать при экспорте данных из ADQM в S3 через табличную функцию: INSERT INTO FUNCTION s3(…​) PARTITION BY <expr> VALUES …​.

ПРИМЕЧАНИЕ
  • Не следует применять слишком детализированное партиционирование данных (например, более детализированное, чем по месяцам).

  • Партиционирование данных не ускоряет выполнение запросов (в отличие от выражения ORDER BY). Например, не стоит разделять данные по идентификаторам или именам пользователей — вместо этого укажите столбец с идентификаторами или именами первым в выражении ORDER BY.

  • В настоящее время не поддерживается выборка данных напрямую из партиционированных таблиц S3 — данные могут быть получены из отдельных партиций с помощью табличной функции s3.

Ниже приведен пример деления вставляемых в таблицу S3 данных по значениям столбца, который имеет низкую кардинальность (небольшое количество уникальных значений).

  1. Создайте таблицу S3, указав столбец value в качестве ключа партиционирования и параметр {_partition_id} в имени файла:

    CREATE TABLE s3_table_partition (name String, value UInt32)
    ENGINE=S3('https://storage.yandexcloud.net/test-bucket/adqm/test_partition_{_partition_id}.txt',
              '<access_key_id>',
              '<secret_access_key>',
              'CSV')
    PARTITION BY value;
  2. Вставьте в таблицу данные:

    INSERT INTO s3_table_partition
    VALUES ('one_1', 1), ('two_1', 2), ('one_2', 1), ('two_2', 2), ('three_1', 3);
  3. В результате данные будут записаны в три файла, в названиях которых параметр {_partition_id} заменен на соответствующие значения ключа партиционирования: test_partition_1.txt, test_partition_2.txt и test_partition_3.txt. Параметр можно также указать в названии каталога (в префиксе).

  4. С помощью табличной функции s3 считайте данные из отдельных файлов, например:

    SELECT * FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/test_partition_1.txt',
                     '<access_key_id>',
                     '<secret_access_key>',
                     'CSV');
       ┌─c1────┬─c2─┐
    1. │ one_1 │  1 │
    2. │ one_2 │  1 │
       └───────┴────┘
    SELECT * FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/test_partition_2.txt',
                     '<access_key_id>',
                     '<secret_access_key>',
                     'CSV');
       ┌─c1────┬─c2─┐
    1. │ two_1 │  2 │
    2. │ two_2 │  2 │
       └───────┴────┘

    При этом, если попробовать получить данные напрямую из таблицы s3_table_partition, будет выведено сообщение об ошибке:

    Exception: Reading from a partitioned S3 storage is not implemented yet. (NOT_IMPLEMENTED)

Виртуальные столбцы

В таблицы на базе движка S3 и таблицы, возвращаемые табличными функциями s3 и s3Cluster, автоматически добавляются следующие виртуальные столбцы.

Название столбца Тип данных Описание

_path

LowCardinality(String)

Путь к файлу

_file

LowCardinality(String)

Имя файла

_size

Nullable(UInt64)

Размер файла в байтах

_time

Nullable(DateTime)

Время последнего изменения файла

_etag

LowCardinality(String)

ETag файла

Виртуальные столбцы можно использовать в запросах SELECT, чтобы получить информацию о файлах в бакете S3, к которым обращается таблица ADQM. Например, следующий запрос возвращает список всех .txt-файлов в каталоге adqm бакета test-bucket и вычисляет количество строк в каждом файле:

SELECT _file, count() AS count
FROM s3('https://storage.yandexcloud.net/test-bucket/adqm/*.txt', '<access_key_id>', '<secret_access_key>', 'CSV')
GROUP BY _file
ORDER BY _file;
   ┌─_file────────────────┬─count─┐
1. │ test_1.1.txt         │     1 │
2. │ test_1.2.txt         │     1 │
3. │ test_1.txt           │     3 │
4. │ test_2.1.txt         │     1 │
5. │ test_2.txt           │     1 │
6. │ test_partition_1.txt │     2 │
7. │ test_partition_2.txt │     2 │
8. │ test_partition_3.txt │     1 │
   └──────────────────────┴───────┘
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней