Tkhemali Connector 1.X configuration

To send data from ADB to ClickHouse via Tkhemali Connector 1.X, you should first perform the following steps on the ADB cluster side:

  1. Create a writable external table. In the LOCATION clause, set the PXF protocol with the TKH profile and connection options for ClickHouse.

  2. If you are going to use staging tables (recommended):

    • Call the txn function. In the function arguments, use the INSERT query to add data to the external table.

    • In case of connection failures, drop staging tables.

  3. If you do not use staging tables, run the INSERT query against the external table directly.

To see how to send data from ADB to ADQM using the settings listed below, refer to Tkhemali Connector 1.X usage examples.

Create a writable external table

To create a writable external table, use the CREATE WRITABLE EXTERNAL TABLE command. The basic command syntax is listed below:

CREATE WRITABLE EXTERNAL TABLE <table_name> (
    { <column_name> <data_type> [, ...] | LIKE <other_table> }
)
LOCATION (
    'pxf://<clickhouse_table_name>?<pxf_profile>[&<option>=<value> [...]]'
)
FORMAT 'TEXT'
ENCODING 'UTF8';

where:

  • <table_name> — an external table name in ADB.

  • <column_name> — a column name.

  • <data_type> — a column data type.

  • <other_table> — a source table from which column names, column data types, and a data distribution policy will be copied to the new external table. Note that the column constraints and default values that are specified in the source table are not copied because they are not supported in external tables.

  • <clickhouse_table_name> — a target table name in ClickHouse that includes a database name. When using a staging layer, a staging table name should be specified in the option value by adding the ending_pattern value (see Arguments of the txn function) to the target table name. For example, if the table name in ClickHouse is default.t, then in the LOCATION clause you should specify default.t_tmp_$ (if the default ending_pattern value is used).

  • <pxf_profile> — a PXF profile name that you can define in two ways:

    { PROFILE=TKH | ACCESSOR=io.arenadata.tkh.pxf.TkhAccessor&RESOLVER=io.arenadata.tkh.pxf.TkhResolver }
  • <option> — options that define the connection details. The options that are available for Tkhemali Connector are listed in the External table options table.

  • <value> — option values.

NOTE
  • For the optimal performance, the data distribution policy in the external table should be similar to the policy in the source table from which data will be selected before sending it to ClickHouse. This will allow you to send data directly from ADB segments rather than redistribute it before sending. For this purpose, it is recommended to use the LIKE clause when creating an external table, or explicitly specify a similar distribution key in the DISTRIBUTED BY clause.

  • The full description of the CREATE EXTERNAL TABLE command syntax is available in the Greenplum documentation.

  • To alter an external table definition, use the ALTER EXTERNAL TABLE command. To delete an external table, use DROP EXTERNAL TABLE.

External table options
Name Type Description Default Required

url

TEXT

A comma-separated list of ClickHouse hosts where the target table is located. Requires the following format: <user>:<password>@<hostname>:<port> [, …​], where:

  • <user> — a user name in ClickHouse.

  • <password> — a user password in ClickHouse. If there is no password (for example, in the test environment), an empty string is specified.

  • <hostname> — every host name.

  • <port> — an HTTP port number to connect to the corresponding host.

 — 

Yes if distribution = LIST

distribution

TEXT

A strategy for load distribution between ClickHouse hosts.

Possible values:

  • LIST — hosts are selected randomly from the list that is defined in the url option.

  • GPREQUEST — hosts are selected based on the round-robin algorithm from the result of the gp_request query.

LIST

No

gp_url

TEXT

A URL of the ADB Master host that is used to execute the gp_request query

 — 

Yes if distribution == GPREQUEST

gp_user

TEXT

A user name in ADB that is used to execute the gp_request query

 — 

Yes if distribution == GPREQUEST

gp_password

TEXT

A user password in ADB that is used to execute the gp_request query

 — 

Yes if distribution == GPREQUEST

gp_request

TEXT

A query that is used to obtain a list of ClickHouse hosts. Every host from this list will be selected via the round-robin algorithm. The format of the query result should be identical to the abovementioned format of the url option

 — 

Yes if distribution == GPREQUEST

lines_batch_size

INT

The maximum number of rows in the data batch sent to ClickHouse by the connector.

Positive integers are allowed

100000

No

bytes_batch_size_mb

INT

The maximum batch size of data sent to ClickHouse by the connector (in MB). If the lines_batch_size option is also defined, the bytes_batch_size_mb value will be used.

Positive integers are allowed

 — 

No

send_compressed

BOOL

The flag that indicates whether to use compression on the connector side before sending data to ClickHouse.

Possible values:

  • true — use data compression.

  • false — do not use data compression.

false

No

send_delay

INT

A delay between connector requests to ClickHouse (in milliseconds).

Positive integers are allowed

300

No

insert_distributed_sync

BOOL

Enables or disables synchronous data load to distributed tables.

Possible values:

  • true — data is inserted synchronously. The INSERT query is considered successful after all data is added to all shards (at least to one replica per chard if internal_replication = true in ClickHouse).

  • false — data is inserted asynchronously.

true

No

Use the txn function

The txn function has the following definition:

FUNCTION txn(query TEXT, http_port INT DEFAULT 8123, debug BOOLEAN DEFAULT false, ending_pattern TEXT DEFAULT '_tmp_$')
RETURNS void

The txn function arguments are listed below.

Arguments of the txn function
Name Type Description Default Required

query

TEXT

The SQL query that is to be used to insert data into an external table. The $$ INSERT INTO...$$ form is recommended instead of quotes, since it allows using new lines and quotes inside of $$

 — 

Yes

http_port

INT

The ClickHouse HTTP port that is common to all nodes used in the query. Currently, it is not possible to get this value via SQL. Fill in the option if its value differs from 8123

8123

No

debug

BOOL

The flag that indicates whether to use logging for the txn function. Displays information about all internal operations: creation of staging tables, data validation, switching and deletion of data parts in ClickHouse

false

No

ending_pattern

TEXT

Defines the name format for staging tables — the intermediate layer that is created by Tkhemali Connector in ClickHouse before inserting data into the target table. The $ placeholder is replaced with the integer identifier that is automatically generated on the Master node of ADB to emulate transaction numbers in ClickHouse

'_tmp_$'

No

You can call the txn function using the SELECT query:

SELECT txn('INSERT INTO <external_table> VALUES(...);');
SELECT txn('INSERT INTO <external_table> SELECT ... FROM ...;');

where <external_table> — the external table name.

Drop staging tables

In case of unexpected failures, you may need to delete staging tables on all ClickHouse nodes. To do this, follow the steps on the ADB side:

  1. Find the unique identifier that was automatically generated on the ADB side when sending data to ClickHouse for use in staging table names:

    SELECT database || '.' || name AS tbl FROM sys_tables('<hostname>', <port>)
    WHERE database = '<database_name>' and name like '<table_name><postfix>%';

    where:

    • <hostname> — a name of one of the ClickHouse hosts where the target table is located.

    • <port> — an HTTP port number to connect to the corresponding <hostname> host.

    • <database_name> — a database name in ClickHouse.

    • <table_name> — a name of the ClickHouse target table to which you sent data from ADB.

    • <postfix> — the postfix that was added to the target table name in ClickHouse to generate the staging table name. Matches the value of the ending_pattern option without the $ placeholder (see Arguments of the txn function above).

    Example:

    SELECT database || '.' || name AS tbl FROM sys_tables('dev-adqm-01', 8123)
    where database = 'default' and name like 'test_distr_tmp_%';

    The query result is shown below. 59306 is the identifier that you need.

                 tbl
    ------------------------------
     default.test_distr_tmp_59306
    (1 row)
  2. Using the found identifier, delete staging tables on all nodes of the ClickHouse cluster via the drop_staging_tbl_based_on function:

    WITH ch(h, p) AS (values ('<hostname>', <port>) [, ...])
    SELECT drop_staging_tbl_based_on('<database_name>', '<table_name>', ch.h, ch.p, '<user>', '<password>', <debug>, '<postfix><id>') from ch;

    where:

    • <hostname> — a host name in ClickHouse. You should define all hosts in the comma-separated list. Every host is declared with the corresponding <port> value inside the round parentheses.

    • <port> — an HTTP port number to connect to the corresponding <hostname> host.

    • <database_name> — a database name in ClickHouse.

    • <table_name> — a name of the ClickHouse target table to which you sent data from ADB.

    • <user> — a user name in ClickHouse.

    • <password> — a user password in ClickHouse. If there is no password (for example, in the test environment), an empty string is specified.

    • <debug> — the flag that indicates whether to use the debugging mode. By default false.

    • <postfix> — the postfix that was added to the target table name in ClickHouse to generate the staging table name. Matches the value of the ending_pattern option without the $ placeholder (see Arguments of the txn function above).

    • id — the identifier that you found in step 1.

    Example:

    WITH ch(h, p) AS (values ('dev-adqm-01', 8123), ('dev-adqm-02', 8123), ('dev-adqm-03', 8123), ('dev-adqm-04', 8123))
    SELECT drop_staging_tbl_based_on('default', 'test_distr', ch.h, ch.p, 'default', '', false, '_tmp_59306') FROM ch;

    The result:

     drop_staging_tbl_based_on
    ---------------------------
    
    
    
    
    (4 rows)
Found a mistake? Seleсt text and press Ctrl+Enter to report it