Usage example

Before you start using ADQM Spark Connector, you need to find out the following information:

  • ADQM host name;

  • port for interacting with ADQM;

  • user credentials for connection;

  • name of the database to connect to;

  • name of the table to be accessed.

Examples in this article use the following data for testing:

  • ADQM host —;

  • port — 9000;

  • ADQM user — default (no password);

  • database — default;

  • table to be read from ADQM — users:

    │       1 │ john   │    admin│
    │       2 │ mary   │   author│
    │       3 │ andrew │   author│
    │       4 │ harry  │ reviewer│
    │       5 │ ann    │view only│

ADQM Spark Connector communicates with ADQM using a JDBC connection. The ClickHouse Native JDBC driver comes with the connector.

Commands from examples below should be executed in the interactive shell for Scala — spark-shell.

Load data from ADQM to Spark

  1. Specify the connector settings for reading data from the ADQM default.users table:

    val options_read = Map(
        "spark.adqm.url" -> "jdbc:clickhouse://",
        "spark.adqm.user" -> "default",
        "spark.adqm.dbschema" -> "default",
        "spark.adqm.dbtable" -> "users")
  2. Create a DataFrame:

    val adqm_users ="adqm").options(options_read).load()

    Result of command execution:

    adqm_users: org.apache.spark.sql.DataFrame = [user_id: int, name: string ... 1 more field]
  3. Check the DataFrame’s content:
    |user_id|  name|     role|
    |      1|  john|    admin|
    |      2|  mary|   author|
    |      3|andrew|   author|
    |      4| harry| reviewer|
    |      5|   ann|view only|

Run any SQL query on an ADQM node

  1. Set additional settings for a Spark session and enable auxiliary functions:

    val spark1 = org.apache.spark.sql.SparkSession.
    import io.arenadata.spark.adqm.implicits._
  2. Create an arbitrary table in ADQM and fill it with test data:

    spark1.executeAdqmQueryOnShard("create table default.test_spark(id integer) ENGINE  = MergeTree() ORDER BY id;")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(1);")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(2);")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(3);")
  3. Create a DataFrame based on the new ADQM table:

    val test = spark1.executeAdqmSelectQueryOnShard("select * from default.test_spark;")
    test: org.apache.spark.sql.DataFrame = [id: int]
  4. Check the DataFrame’s content:
    | id|
    |  1|
    |  2|
    |  3|
  5. Check in ADQM (for example, in the clickhouse-client console client) that the table has been created and populated with data:

    SELECT * FROM test_spark;
    │  1 │
    │  2 │
    │  3 │

Write data from Spark to ADQM

  1. Specify the connector settings:

    val options_write = Map(
        "spark.adqm.url" -> "jdbc:clickhouse://",
        "spark.adqm.user" -> "default",
        "spark.adqm.dbschema" -> "default",
        "spark.adqm.dbtable" -> "test_spark_write",
        "spark.adqm.create.table.engine" -> "MergeTree")
  2. Run the following command to create a test_spark_write table in ADQM and write data from the DataFrame created in the previous example into it:

  3. Check in ADQM that data has been added to the new table:

    SELECT * FROM test_spark_write;
    │  1 │
    │  2 │
    │  3 │
