Sharding is a database design principle which suggests locating parts of the same table on different shards. Shard is a cluster node that can consist of one or more replicas. Replicas are servers that duplicate data within a shard. SELECT and INSERT queries can be sent to any replica of a shard, there is no a dedicated master.

Sharding can be useful in the following cases:

  • The amount of data to be stored and the frequency of queries to a database increase.

  • A system requires more resources, but a cluster cannot be scaled using the vertical strategy (which involves updating the hardware of existing servers) due to physical limits — in terms of the number of cores per processor, the number of processors, the amount of memory, etc.

Sharding allows you to:

  • Overcome technical limitations. If a large data set does not fit on a single server, or the current storage infrastructure is running at hardware performance limits, you can scale a cluster horizontally — add new servers to the existing set and distribute data across multiple shards.

  • Improve fault tolerance. Sharding helps mitigate the impact of failures. If data is stored on a single server, the failure of that server can lead to loss of access to all data. When data is distributed (for example, across five shards), the failure of one shard leaves 80% of data available.

  • Increase the speed of executing queries. Queries compete with each other for the computing resources of cluster servers. This may slow processing queries. In a sharded cluster where queries to the same table can be executed in parallel, competition for shared resources is eliminated and query processing time is reduced.

Distributed tables

For sharding, use tables on the Distributed engine. A distributed table does not store data. Data is stored in local tables placed on servers of each shard. A distributed table only provides routing of queries to these tables, in other words, it allows processing distributed queries on multiple servers.

Create a distributed table

A distributed table can be created anywhere, even on servers that are not included into shards that store tables with data. However, a standard approach is to place a distributed table on all shards that host tables with data.

An example of a query that creates a distributed table on all servers of the cluster:

CREATE TABLE distributed_table ON CLUSTER cluster_name AS table_name
ENGINE = Distributed(cluster_name, database, table_name [, sharding_key][, policy_name]);

The Distributed table engine accepts the following parameters:

  • cluster name in a server’s configuration file;

  • name of a database for a table that the distributed table will access (the database name should be the same on all servers of the cluster);

  • name of a table that the distributed table will access (the table name should be the same on all servers of the cluster);

  • optionally — sharding key used to decide how to distribute data across shards when executing INSERT queries;

  • optionally — policy name for storing temporary files for asynchronous uploads.

Cluster configuration

The Distributed table engine requires a cluster to be defined in each server’s configuration file (config.xml). A cluster defined in the configuration file is a logical entity that groups servers. One server can participate in several logical clusters. This provides the flexibility to distribute data between servers.

A cluster can be configured as follows:


In this example, a distributed cluster (cluster_name) consists of two shards. Each shard contains two replicas. Each replica parameters are host (remote server address) and port (TCP port for inter-server communication, usually 9000).

A cluster can include one shard (in this case, query processing is called remote, not distributed) or multiple shards. In each shard, you can specify one or any number of replicas. The number of replicas for each shard within the cluster can be different.

You can specify the following optional parameters for a shard:

  • weight(default is 1) — shard weight used when writing data. Data is distributed across shards in the amount proportional to shard weights. For example, there are two shards — the first shard’s weight is 2, the second one’s weight is 1. In this case, two thirds (2/3) of data rows to be inserted will be sent to the first shard, and one third (1/3) of rows will be sent to the second shard.

  • internal_replication — defines how to insert data to replicas:

    • true — data is written to the first available replica. Use this parameter value when inserting data to a replicated table that manages data replication itself. Data written to a replica will be copied to other replicas of the shard in the asynchronous mode.

    • false (default) — data is written to all replicas. In this case, the distributed table replicates data itself. This is less preferred than using replicated tables as the identity of replicas is not controlled (they may contain slightly different data over time).

You can also configure the load balancing algorithm for replicas (specify preferences on which of the replicas to send a request first). To do this, you can specify the priority parameter of replicas in cluster configuration (less value has more priority, a default value is 1), or use the load_balancing setting.

Write data to a cluster

There are two ways to write data to a cluster.

Write data directly to shards

You can manage data distribution across servers yourself (determine shards to insert data manually) and write data directly to each shard. In other words, you can send INSERT queries directly to local tables that are underlying for the distributed table. This is the most flexible and optimal solution as you can use a sharding scheme meeting the domain requirements and write data to different shards completely independently.

Write data to a distributed table

You can send INSERT queries to a distributed table. In this case, the table will distribute data to be inserted across servers itself. To do this, you need to set a sharding key when creating the distributed table (except if there is only one shard).

A sharding key can be specified as any expression that returns an integer. For example, it can be:

  • the UserID table column — to distribute data by user ID (data of one user will be located on one shard);

  • the toDayOfWeek function — to distribute data across shards depending on the day of the week of a certain date (for example, if a table has the saleDate field containing sale dates, and a cluster consists of 7 shards);

  • the rand() expression — to distribute data across shards randomly.

To select a shard to which a data row should be sent, the Distributed engine uses the remainder of dividing the sharding key by the total weight of shards. It sends a data row to the shard if the remainder of dividing is in the range between prev_weights and prev_weights + weight, where prev_weights is the total weight of shards with a lower number, and weight is the weight of this shard. For example, there are 2 shards — the first shard’s weight is 2, and the second one’s weight is 1. The total weight of these shards is 2 + 1 = 3. In this case, possible remainders from dividing the sharding key by the total weight of shards are 0, 1, and 2. A data row will be written to the first shard if the remainder is 0 or 1 (a value from the [0, 2) range), and to the second shard if the remainder is 2 (a value from the [2, 3) range).

A simple remainder of dividing is not always appropriate for sharding. It is suitable for medium to large data sets (dozens of servers), but not for very large data sets (hundreds of servers or more). In the latter case, it is better to use a sharding scheme meeting the domain requirements rather than using the ability to write to distributed tables.

Read data from a distributed table

Unlike INSERT queries, SELECT queries are sent to all shards of a cluster regardless of how data is distributed across shards (the optimize_skip_unused_shards setting manages this behavior). For reading data on each shard, one of the available replicas is selected. A read request is processed on remote servers and partially aggregated results are sent to a distributed table. It then combines all received data and send the full result to a user.


Auto resharding is not supported, but you can use the clickhouse-copier tool to redistribute data. This tool copies data from tables in one cluster (source cluster) to tables in another cluster (destination cluster). To use it, do the following:

  1. Create a configuration file for tasks to copy tables. In this file, define:

    • a source cluster and a destination cluster (in the remote_servers section);

    • tasks for sharding (in the tables section);

    • a new sharding key (in the sharding_key section).

  2. Create a ZooKeeper configuration file.

  3. Create a task in ZooKeeper.

  4. Run clickhouse-copier to copy data.

  5. Create a new distributed table, switch from the old cluster to the new one.

Found a mistake? Seleсt text and press Ctrl+Enter to report it