Improve query performance

This article describes tools to improve the performance of queries in ADQM.

Indexes

High performance of analytical queries in ADQM is usually achieved due to indexing tables — when table indexes are defined correctly, they can significantly reduce the amount of data to be read from disk for query execution.

For MergeTree tables, ADQM supports the following index types:

  • Primary index — provides quick searches for ranges of rows that potentially contain the desired values. A key to build a primary index can be compound, i.e. include multiple columns.

  • Data skipping indexes — store some additional information about the specified column on data blocks and, when executing read queries, allow skipping data blocks that do not contain the requested values. The type of information about a data block that the index aggregates and stores is defined by the index type: MinMax, Set, Bloom Filter, or Inverted.

TIP
For detailed information on how ADQM indexes work and how to manage them, refer to the Indexes article.

Below are examples that demonstrate how a primary index or a data skipping index (for example, of the Set type) can affect the speed of reading data from a MergeTree table.

Primary index

  1. Create a table without a primary key (with one numeric column and one string column) and populate it with test data:

    CREATE TABLE table_no_primary_key (a Int64, b String) ENGINE = MergeTree ORDER BY tuple();
    INSERT INTO table_no_primary_key
    SELECT toInt64(randUniform(0, 1000)), ['a', 'b', 'c', 'd', 'e'][(rand() % 5) + 1]
    FROM numbers(100000000);
  2. Run a query that calculates the number of rows with the 555 value in the a column:

    SELECT count() FROM table_no_primary_key WHERE a = 555;

    The output shows that the full-table scan was performed — ADQM processed each of the 100 million rows, and this took about 0.5 seconds:

    ┌─count()─┐
    │  100162 │
    └─────────┘
    
    1 row in set. Elapsed: 0.512 sec. Processed 100.00 million rows, 800.00 MB (195.30 million rows/s., 1.56 GB/s.)
  3. To improve this query performance, redefine the table to set the a column as a primary key:

    CREATE TABLE table_with_primary_key (a Int64, b String) ENGINE = MergeTree ORDER BY (a);

    Populate the new table with data from the first table:

    INSERT INTO table_with_primary_key SELECT * FROM table_no_primary_key;
  4. Repeat the query filtering data by the a column:

    SELECT count() FROM table_with_primary_key WHERE a = 555;

    In this case, ADQM has processes 172 thousand rows instead of 100 million and completed the query much faster:

    ┌─count()─┐
    │  100162 │
    └─────────┘
    
    1 row in set. Elapsed: 0.004 sec. Processed 172.03 thousand rows, 1.38 MB (43.90 million rows/s., 351.17 MB/s.)

    If you apply the EXPLAIN clause with the indexes parameter to the query, you can see that only 21 granules out of 12209 were selected as possibly containing rows with the 555 value in the a column during the query execution:

    EXPLAIN indexes = 1 SELECT count() FROM table_with_primary_key WHERE a = 555;
    ┌─explain────────────────────────────────────────────────────┐
    │ Expression ((Projection + Before ORDER BY))                │
    │   Aggregating                                              │
    │     Expression (Before GROUP BY)                           │
    │       Filter (WHERE)                                       │
    │         ReadFromMergeTree (default.table_with_primary_key) │
    │         Indexes:                                           │
    │           PrimaryKey                                       │
    │             Keys:                                          │
    │               a                                            │
    │             Condition: (a in [555, 555])                   │
    │             Parts: 8/8                                     │
    │             Granules: 21/12209                             │
    └────────────────────────────────────────────────────────────┘

Compound key of a primary index

  1. Create a table with a compound primary key:

    CREATE TABLE table_compound_primary_key (a Int64, b String) ENGINE = MergeTree ORDER BY (a, b);

    Fill in the table with data:

    INSERT INTO table_compound_primary_key
    SELECT toInt64(randUniform(0, 1000)), ['a', 'b', 'c', 'd', 'e'][(rand() % 5) + 1]
    FROM numbers(100000000);

    Note that table columns differ greatly in cardinality — the first column is filled with integers from the [0, 1000) range, while the second one contains only five possible string values.

  2. Check the performance of some queries filtering by the first column and by the second column of the primary key:

    Filter by the a column Filter by the b column
    SELECT count()
    FROM table_compound_primary_key
    WHERE a = 555;
    ┌─count()─┐
    │  100162 │
    └─────────┘
    
    1 row in set. Elapsed: 0.004 sec.
    Processed 155.65 thousand rows, 1.25 MB
    (38.72 million rows/s., 309.78 MB/s.)
    SELECT count()
    FROM table_compound_primary_key
    WHERE b = 'c';
    ┌──count()─┐
    │ 19997249 │
    └──────────┘
    
    1 row in set. Elapsed: 1.050 sec.
    Processed 44.17 million rows, 441.66 MB
    (42.07 million rows/s., 420.70 MB/s.)
  3. Swap columns in the primary key ordering them from low to high cardinality, according to the recommendations:

    CREATE TABLE table_compound_primary_key1 (a Int64, b String) ENGINE = MergeTree ORDER BY (b, a);
    INSERT INTO table_compound_primary_key1 SELECT * from table_compound_primary_key;
  4. See how the performance of the same queries with filters by each of key columns has changed. The query filtering by the b column now runs two times faster, while the execution time of the query filtering by the a column remains the same:

    Filter by the a column Filter by the b column
    SELECT count()
    FROM table_compound_primary_key1
    WHERE a = 555;
    ┌─count()─┐
    │  100162 │
    └─────────┘
    
    1 row in set. Elapsed: 0.004 sec.
    Processed 540.67 thousand rows, 4.33 MB
    (123.51 million rows/s., 988.06 MB/s.)
    SELECT count()
    FROM table_compound_primary_key1
    WHERE b = 'c';
    ┌──count()─┐
    │ 19997249 │
    └──────────┘
    
    1 row in set. Elapsed: 0.566 sec.
    Processed 20.01 million rows, 200.13 MB
    (35.38 million rows/s., 353.83 MB/s.)

Data skipping index (Set)

  1. Create a test table with 10000 rows in each granule and add 100 million data rows to it:

    CREATE TABLE table_skip_index (a UInt64, b UInt64)
    ENGINE MergeTree
    PRIMARY KEY a
    SETTINGS index_granularity=10000;
    INSERT INTO table_skip_index SELECT number, intDiv(number,5000) FROM numbers(100000000);
  2. When running a query with a filter by the b column, which is not a part of the primary key, all 100 million records are scanned:

    SELECT * FROM table_skip_index WHERE b=100 OR b=555;
    ┌──────a─┬───b─┐
    │ 500000 │ 100 │
    │ 500001 │ 100 │
    │ 500002 │ 100 │
    │    ... │ ... │
    └────────┴─────┘
    10000 rows in set. Elapsed: 0.042 sec. Processed 100.00 million rows, 800.12 MB (2.38 billion rows/s., 19.01 GB/s.)
  3. Add a data skipping index:

    ALTER TABLE table_skip_index ADD INDEX my_index b TYPE set(100) GRANULARITY 2;

    At this point, the skipping index will only be applied to newly inserted data. To index data that already exists in the table, run the following query:

    ALTER TABLE table_skip_index MATERIALIZE INDEX my_index;
  4. Re-execute the query that selects data with the same filter:

    SELECT * FROM table_skip_index WHERE b=100 OR b=555;

    Instead of processing 100 million rows (800 MB), ADQM reads and processes only 40 thousand rows (440 KB) — four granules of 10000 rows each one.

    ┌──────a─┬───b─┐
    │ 500000 │ 100 │
    │ 500001 │ 100 │
    │ 500002 │ 100 │
    │    ... │ ... │
    └────────┴─────┘
    10000 rows in set. Elapsed: 0.018 sec. Processed 40.00 thousand rows, 440.00 KB (2.19 million rows/s., 24.06 MB/s.)

Strict data types

To improve query performance and minimize your storage, it is important to analyze data to be stored in a table and assign each column the most appropriate type. For example, if a column with string values has low cardinality (contains a limited set of repeated values), you can assign the LowCardinality type to this column instead of String for the column storage optimization.

Example

  1. Create a table with two columns of the Int64 and String types:

    CREATE TABLE table_types (a Int64, b String) ENGINE = MergeTree ORDER BY a;
  2. Insert 100 million rows of test data into the table (the first column stores random integers from the [0,1000) range, each row of the second column stores one of five predefined strings):

    INSERT INTO table_types
    SELECT toInt64(randUniform(0, 1000)), ['a', 'b', 'c', 'd', 'e'][(rand() % 5) + 1]
    FROM numbers(100000000);
  3. Check the size of uncompressed data:

    SELECT
        name,
        type,
        formatReadableSize(data_uncompressed_bytes) AS uncompressed_size
    FROM system.columns
    WHERE table = 'table_types';
    ┌─name─┬─type───┬─uncompressed_size─┐
    │ a    │ Int64  │ 759.91 MiB        │
    │ b    │ String │ 189.98 MiB        │
    └──────┴────────┴───────────────────┘
  4. Knowing the range of values in the a column, you can define a more appropriate type for integer data in the table schema — UInt16 instead of Int64 (see value ranges that different data types support in the UInt* article). The b column storing strings can also be optimized if you change the String type to LowCardinality. Create a table similar to the table created earlier, but with strict data types for columns:

    CREATE TABLE table_strict_types (a UInt16, b LowCardinality(String)) ENGINE = MergeTree ORDER BY a;
    INSERT INTO table_strict_types SELECT a, toLowCardinality(b) from table_types;
  5. Ensure that the size of columns has decreased:

    SELECT
        name,
        type,
        formatReadableSize(data_uncompressed_bytes) AS uncompressed_size
    FROM system.columns
    WHERE table = 'table_strict_types';
    ┌─name─┬─type───────────────────┬─uncompressed_size─┐
    │ a    │ UInt16                 │ 189.98 MiB        │
    │ b    │ LowCardinality(String) │ 95.32 MiB         │
    └──────┴────────────────────────┴───────────────────┘
  6. Compare how the same test query runs when selecting data from the initial table_types table and the table_strict_types table with optimal column types:

    SELECT count() FROM table_types WHERE b = 'a';
    ┌──count()─┐
    │ 20003609 │
    └──────────┘
    
    1 row in set. Elapsed: 5.330 sec. Processed 100.00 million rows, 1.00 GB (18.76 million rows/s., 187.61 MB/s.)
    SELECT count() FROM table_strict_types WHERE b = 'a';
    ┌──count()─┐
    │ 20003609 │
    └──────────┘
    
    1 row in set. Elapsed: 0.390 sec. Processed 100.00 million rows, 100.12 MB (256.40 million rows/s., 256.71 MB/s.)

    The output shows that both queries scan the same number of rows, but the query accessing the first table reads almost 10 times more data.

Compression and encoding

When a query runs, it needs to read data from disk — the more data it reads, the slower it is. You can reduce the amount of data to be read using compression and encryption. ADQM supports general-purpose compression codecs that optimize the balance between disk space and CPU consumption, a set of specialized compression codecs for specific data types, and encryption codecs. For a complete list of available compression and encryption codecs, see the Column Compression Codecs section of the ClickHouse documentation.

By default, ADQM applies the compression method defined in the server configuration to columns. You can also specify the compression method for each individual column in the CREATE TABLE query using the CODEC clause:

CREATE TABLE <table_name>
(   <column_name1> <data_type1> CODEC(<compression_codec1>) [...],
    <column_name2> <data_type2> CODEC(<compression_codec2>) [...],
    ...)
ENGINE = MergeTree
...;

Codecs can be combined sequentially (for example, CODEC(Delta, Default)).

Example

  1. Create a test table with three columns (the first and second columns are compressed with the LZ4 and ZSTD algorithms respectively, the third one stores data without compression):

    CREATE TABLE table_sizes (a Int32 CODEC(LZ4), b Int32 CODEC(ZSTD), c Int32 CODEC(NONE))
    ENGINE = MergeTree
    ORDER BY a;
  2. Insert 100 million rows with the same values (for example, 1) in all columns:

    INSERT INTO table_sizes SELECT 1, 1, 1 FROM numbers(100000000);
  3. In the system.columns table, you can see the difference between columns in the size of stored data. The first two columns (compressed with different codecs) store 1.7 MB and 279 KB of data respectively, and the size of the uncompressed column is about 380 MB:

    SELECT
        name,
        formatReadableSize(data_uncompressed_bytes) AS uncompressed_size,
        formatReadableSize(data_compressed_bytes) AS compressed_size,
        round(data_uncompressed_bytes / data_compressed_bytes, 2) AS ratio
    FROM system.columns
    WHERE table = 'table_sizes';
    ┌─name─┬─uncompressed_size─┬─compressed_size─┬───ratio─┐
    │ a    │ 379.95 MiB        │ 1.71 MiB        │  222.15 │
    │ b    │ 379.95 MiB        │ 279.06 KiB      │ 1394.21 │
    │ c    │ 379.95 MiB        │ 380.10 MiB      │       1 │
    └──────┴───────────────────┴─────────────────┴─────────┘
  4. Run a query that calculates the sum by each column to see the difference between compressed and uncompressed columns:

    SELECT sum(a) FROM table_sizes;
    ┌────sum(a)─┐
    │ 100000000 │
    └───────────┘
    
    1 row in set. Elapsed: 0.197 sec. Processed 100.00 million rows, 400.00 MB (507.58 million rows/s., 2.03 GB/s.)
    SELECT sum(b) FROM table_sizes;
    ┌────sum(b)─┐
    │ 100000000 │
    └───────────┘
    
    1 row in set. Elapsed: 0.091 sec. Processed 100.00 million rows, 400.00 MB (1.09 billion rows/s., 4.38 GB/s.)
    SELECT sum(c) FROM table_sizes;
    ┌────sum(c)─┐
    │ 100000000 │
    └───────────┘
    
    1 row in set. Elapsed: 0.389 sec. Processed 100.00 million rows, 400.00 MB (257.00 million rows/s., 1.03 GB/s.)

    Each of three queries scans 100 million rows, but scanning compressed data is faster: 0.197 and 0.091 seconds vs 0.389 seconds for uncompressed data.

Query cache

Another way to reduce the execution time of SELECT queries, as well as reduce the load on the ADQM server and optimize resource consumption is to cache queries. ADQM can compute a query just once (when executing it for the first time), save the result to the query cache, and then read this result from the cache when executing the same query again. If a query fails due to an error or user cancellation, no entry is added to the query cache.

Manage query caching

  1. Enable the query cache support in ADQM at the session level:

    SET allow_experimental_query_cache = 1;
  2. Set the use_query_cache setting value to 1 to cache the result of a specific query or all queries of the current session:

    • SELECT …​ SETTINGS use_query_cache = 1; — caching an individual query;

    • SET use_query_cache = 1; — caching queries at the session level.

    Subsequent executions of the same query (also with the use_query_cache = 1 parameter) will read the previously computed result from the cache and return it immediately.

    To set up caching in more detail, you can also use the following parameters (both values are 1 by default):

    • enable_writes_to_query_cache — specifies whether results of SELECT queries should be stored in the query cache;

    • enable_reads_from_query_cache — specifies whether results of SELECT queries should be retrieved from the query cache.

NOTE

The use_query_cache setting and other options related to the query cache affect only standalone SELECT queries.

To clear the query cache, use the SYSTEM DROP QUERY CACHE command.

Configure query cache

To provide more accurate control over the query cache behavior, ADQM supports a set of configuration options.

Server-level setting

The size of the query cache (in bytes), the maximum number of cache entries, and the maximum size of individual cache entries (in bytes and in records) can be configured using the corresponding parameters in the query_cache section of the ADQM server’s configuration file (config.xml).

<query_cache>
    <max_size>1073741824</max_size>
    <max_entries>1024</max_entries>
    <max_entry_size>1048576</max_entry_size>
    <max_entry_rows>30000000</max_entry_rows>
</query_cache>

Settings per query or per session

  • To cache only the most frequent or time-consuming queries, you can configure the following settings:

    • query_cache_min_query_runs — specifies how many times (minimum) a SELECT query must be executed before its result is cached;

    • query_cache_min_query_duration — specifies how long a query should run at least for its result to be cached.

  • Entries in the query cache expire after a certain period of time (60 seconds, by default). To change this period, use the query_cache_ttl parameter.

  • Entries in the query cache are not shared between users by default for security reasons. However, if necessary, you can mark cache entries as available to other users (i.e. as shared) by setting the query_cache_share_between_users parameter to 1.

  • Results of queries with non-deterministic functions (for example, rand(), now()) are not cached by default. To change this, use the query_cache_store_results_of_queries_with_nondeterministic_functions option.

View information on query cache

You can get information about the content of the query cache from the system.query_cache system table.

SELECT * FROM system.query_cache;
┌─query───────────────────────────────────────────────────┬────────────key_hash─┬──────────expires_at─┬─stale─┬─shared─┬─result_size─┐
│ SELECT count() FROM table_types WHERE b = 'a' SETTINGS  │ 3145443392323842376 │ 2023-10-18 12:58:43 │     1 │      0 │         256 │
└─────────────────────────────────────────────────────────┴─────────────────────┴─────────────────────┴───────┴────────┴─────────────┘

In the example above, the cache entry is marked as stale because the time elapsed since the query result was cached exceeds the lifetime of the cache entry. This means that the next time the query is run, the result of the cached query will not be used, but the cache entry will be updated instead.

The SETTINGS expression in the query field is shown partially because all parameters associated with the query cache are internally cleared before the query is used as a key for the query cache.

The number of query cache hits and misses since the server startup is counted as the QueryCacheHits and QueryCacheMisses events in the system.events system table. Both counters are updated only for SELECT queries that are executed with the use_query_cache = 1 setting.

SELECT * FROM system.events WHERE (event = 'QueryCacheHits' OR event = 'QueryCacheMisses');
┌─event────────────┬─value─┬─description────────────────────────────────────────────────────────────────────────────────────────────┐
│ QueryCacheHits   │     2 │ Number of times a query result has been found in the query cache (and query computation was avoided).  │
│ QueryCacheMisses │     4 │ Number of times a query result has not been found in the query cache (and required query computation). │
└──────────────────┴───────┴────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Example

  1. Run the SELECT query accessing the non-optimized table_types table created above:

    SELECT count() FROM table_types WHERE b = 'a';

    The query takes about 2.5 seconds to complete:

    ┌──count()─┐
    │ 19999789 │
    └──────────┘
    
    1 row in set. Elapsed: 2.584 sec. Processed 100.00 million rows, 1.00 GB (38.70 million rows/s., 387.00 MB/s.)
  2. Turn on the query cache feature and run the query again with the use_query_cache setting enabled:

    SET allow_experimental_query_cache = 1;
    SELECT count() FROM table_types WHERE b = 'a' SETTINGS use_query_cache = 1;
    ┌──count()─┐
    │ 19999789 │
    └──────────┘
    
    1 row in set. Elapsed: 2.924 sec. Processed 100.00 million rows, 1.00 GB (34.20 million rows/s., 341.98 MB/s.)

    As you can see, the query took about the same amount of time to complete (even a little more), since the first time the query runs with the use_query_cache = 1 setting the result of its execution will be written to the cache. Subsequent executions of the same query (also with the use_query_cache = 1 setting) within the time interval specified by query_cache_ttl will read the result from the cache and return it immediately:

    SELECT count() FROM table_types WHERE b = 'a' SETTINGS use_query_cache = 1;
    ┌──count()─┐
    │ 19999789 │
    └──────────┘
    
    1 row in set. Elapsed: 0.003 sec.

PREWHERE

PREWHERE is a way to optimize SELECT …​ WHERE queries. It allows filtering data before the read operation, so the amount of data to be read from the table is reduced, and the query runs faster. In ADQM, PREWHERE filtering optimization is applied automatically (with the optimize_move_to_prewhere setting enabled by default) — even if the PREWHERE clause is not explicitly specified in the query, a part of filtering expressions is moved from WHERE to PREWHERE to be checked first.

You can explicitly specify the PREWHERE clause in the query to manually adjust the filtering optimization. First, columns necessary for checking the PREWHERE expression are read from the queried table. Then other columns needed to complete the rest of the query are read, but only those blocks where the PREWHERE expression is true at least for some rows. If there are a lot of blocks where the PREWHERE condition is not satisfied for all rows, and the PREWHERE expression uses fewer columns than other parts of the query, PREWHERE optimization can significantly reduce the amount of data read from disk for query execution.

You can use both the PREWHERE and WHERE clauses in the SELECT query simultaneously (PREWHERE is executed before WHERE in this case).

If a query has the FINAL modifier, the PREWHERE optimization is not always correct. It is enabled only if both the optimize_move_to_prewhere and optimize_move_to_prewhere_if_final settings are turned on.

Example

  1. Create a test table with three columns and add 10 million rows to it (the first column contains integers in order, the second column — random numbers from the [0, 1000) range, and the third one — the test string value in 100 rows, and values for the remaining rows are converted from the corresponding integer identifiers in the first column):

    CREATE TABLE table_prewhere (id UInt64, int_value Int64, string_value String) ENGINE = MergeTree ORDER BY id;
    INSERT INTO table_prewhere SELECT
        number,
        toInt64(randUniform(0, 1000)),
        if(number between 11 and 110, 'test string', toString(number))
    FROM numbers(10000000);
  2. Run a query that filters data by columns that are not included into the primary key:

    SELECT count()
    FROM table_prewhere
    WHERE (int_value > 550) AND (string_value = 'test string');
    ┌─count()─┐
    │      43 │
    └─────────┘
    1 row in set. Elapsed: 0.638 sec. Processed 10.00 million rows, 238.89 MB (15.68 million rows/s., 374.58 MB/s.)
  3. If you enable tracing with the SET send_logs_level='debug' command before executing the query, you can see that ADQM has automatically moved the first filtering condition (int_value > 550) to PREWHERE:

    InterpreterSelectQuery: MergeTreeWhereOptimizer: condition "int_value > 550" moved to PREWHERE

    Thus, the query first receives a large set of rows based on the int_value > 550 condition, and then filters out the most of these rows according to the string_value = 'test string' filter. To optimize the query, change the order in which filter conditions are checked — pass the filter by the string_value column to PREWHERE explicitly:

    SELECT count()
    FROM table_prewhere
    PREWHERE string_value = 'test string'
    WHERE int_value > 550;

    Now the query first receives a small set of rows that meet the string_value = 'test string' condition, which is then checked against the second int_value > 550 filter. As a result, less data is processed (158.89 MB instead of 238.89 MB) and the query runs faster:

    ┌─count()─┐
    │      43 │
    └─────────┘
    1 row in set. Elapsed: 0.249 sec. Processed 10.00 million rows, 158.89 MB (40.12 million rows/s., 637.45 MB/s.)

Optimize JOIN operations

JOIN operations in ADQM can be optimized in several ways to improve query performance. Here are some recommendations:

  • ADQM supports multiple join algorithms — choose an algorithm that is the most optimal for your query based on the join type/strictness, and the engine of tables being joined. To set the join algorithm, use the join_algorithm parameter. You can specify one or multiple algorithms (one of the available algorithms will be selected for each specific query based on its type/strictness and table engine) or allow the query planner to select and dynamically change the algorithm during query execution depending on resource availability. See the Choosing the Right Join Algorithm blog post by ClickHouse for the comparison of join algorithms and recommendations on how to choose the join algorithm that fits best into your scenario.

  • Use appropriate join keys (columns used to match rows between two tables). Ideally, join keys should be indexed, and they should have low cardinality. The data type of a key can also affect performance (for example, using a string join key may be slower than using an integer key).

  • In a query with the JOIN clause, there is no optimization of the order in which the join operation is executed relative to other steps of the query. The join operation (lookup in the right table) is performed before WHERE filtering and before aggregation. To explicitly specify the order of calculations and improve query performance, it is recommended to apply a filter as early as possible — join subqueries.

  • Results of joins are not cached — each time a query with the same JOIN is executed, the join subquery is executed again. To avoid this, you can use the special Join table engine, which is a prepared data structure for usage in join operations.

  • For small amounts of data (up to ~200 GB in compressed form), it is recommended to use memory no less than the amount of data.

  • Sometimes it is more efficient to use IN or dictionaries instead of JOIN. It is not recommended to use JOIN with dictionaries as dictGet functions work more efficiently.

  • In some scenarios, it is more effective to denormalize data by combining multiple tables into one to completely eliminate the use of join operations and improve query performance.

Found a mistake? Seleсt text and press Ctrl+Enter to report it