Usage example
Before you start using ADQM Spark 3 Connector, you need to find out the following information:
-
ADQM host name;
-
port for interacting with ADQM;
-
user credentials for connection;
-
name of the database to connect to;
-
name of the table to be accessed.
Examples in this article use the following data for testing:
-
ADQM host —
10.92.17.146; -
port —
9000; -
ADQM user —
default(no password); -
database —
default; -
table to be read from ADQM —
users:┌─user_id─┬─name───┬───role──┐ │ 1 │ john │ admin│ │ 2 │ mary │ author│ │ 3 │ andrew │ author│ │ 4 │ harry │ reviewer│ │ 5 │ ann │view only│ └─────────┴────────┴─────────┘
ADQM Spark 3 Connector communicates with ADQM using a JDBC connection. The ClickHouse Native JDBC driver comes with the connector.
Commands from examples below should be executed in the interactive shell for Scala — spark3-shell.
Load data from ADQM to Spark 3
-
Specify the connector settings for reading data from the ADQM
default.userstable:val options_read = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "users") -
Create a DataFrame:
val adqm_users = spark.read.format("adqm").options(options_read).load()Result of command execution:
adqm_users: org.apache.spark.sql.DataFrame = [user_id: int, name: string ... 1 more field]
-
Check the DataFrame’s content:
adqm_users.show()+-------+------+---------+ |user_id| name| role| +-------+------+---------+ | 1| john| admin| | 2| mary| author| | 3|andrew| author| | 4| harry| reviewer| | 5| ann|view only| +-------+------+---------+
Run any SQL query on an ADQM node
-
Set additional settings for a Spark session and enable auxiliary functions:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adqm.url","jdbc:clickhouse://10.92.17.146:9000"). config("spark.adqm.user","default"). getOrCreate() import io.arenadata.spark.adqm.implicits._ -
Create an arbitrary table in ADQM and fill it with test data:
spark1.executeAdqmQueryOnShard("create table default.test_spark(id integer) ENGINE = MergeTree() ORDER BY id;") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(1);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(2);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(3);") -
Create a DataFrame based on the new ADQM table:
val test = spark1.executeAdqmSelectQueryOnShard("select * from default.test_spark;")test: org.apache.spark.sql.DataFrame = [id: int]
-
Check the DataFrame’s content:
test.show()+---+ | id| +---+ | 1| | 2| | 3| +---+
-
Check in ADQM (for example, in the clickhouse-client console client) that the table has been created and populated with data:
SELECT * FROM test_spark;┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘
Write data from Spark 3 to ADQM
-
Specify the connector settings:
val options_write = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "test_spark_write", "spark.adqm.create.table.engine" -> "MergeTree") -
Run the following command to create a
test_spark_writetable in ADQM and write data from the DataFrame created in the previous example into it:test.write.format("adqm").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save() -
Check in ADQM that data has been added to the new table:
SELECT * FROM test_spark_write;┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘