Пример использования
Прежде чем приступить к использованию ADQM Spark 3 Connector, необходимо уточнить следующую информацию:
-
имя хоста ADQM;
-
порт для взаимодействия с ADQM;
-
реквизиты пользователя для подключения;
-
имя базы данных, к которой необходимо подключиться;
-
имя таблицы, к которой нужно получить доступ.
В примерах, приведенных в этой статье, используются следующие тестовые данные:
-
хост ADQM —
10.92.17.146
; -
порт —
9000
; -
пользователь ADQM —
default
(без пароля); -
база данных —
default
; -
таблица для чтения из ADQM —
users
:┌─user_id─┬─name───┬───role──┐ │ 1 │ john │ admin│ │ 2 │ mary │ author│ │ 3 │ andrew │ author│ │ 4 │ harry │ reviewer│ │ 5 │ ann │view only│ └─────────┴────────┴─────────┘
Для взаимодействия ADQM Spark 3 Connector и ADQM используется соединение JDBC. Драйвер ClickHouse Native JDBC поставляется вместе с коннектором.
Команды выполняются в интерактивной оболочке для Scala — spark3-shell.
Пример загрузки данных из ADQM в Spark 3
-
Укажите настройки коннектора для чтения данных из таблицы ADQM
default.users
:val options_read = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "users")
-
Создайте DataFrame:
val adqm_users = spark.read.format("adqm").options(options_read).load()
Результат выполнения команды:
adqm_users: org.apache.spark.sql.DataFrame = [user_id: int, name: string ... 1 more field]
-
Проверьте содержимое DataFrame:
adqm_users.show()
+-------+------+---------+ |user_id| name| role| +-------+------+---------+ | 1| john| admin| | 2| mary| author| | 3|andrew| author| | 4| harry| reviewer| | 5| ann|view only| +-------+------+---------+
Пример выполнения произвольного запроса SQL на узле ADQM
-
Отредактируйте настройки Spark-сессии и включите вспомогательные функции:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adqm.url","jdbc:clickhouse://10.92.17.146:9000"). config("spark.adqm.user","default"). getOrCreate() import io.arenadata.spark.adqm.implicits._
-
Создайте произвольную таблицу в ADQM и заполните ее тестовыми данными:
spark1.executeAdqmQueryOnShard("create table default.test_spark(id integer) ENGINE = MergeTree() ORDER BY id;") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(1);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(2);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(3);")
-
Создайте DataFrame на основе новой таблицы ADQM:
val test = spark1.executeAdqmSelectQueryOnShard("select * from default.test_spark;")
test: org.apache.spark.sql.DataFrame = [id: int]
-
Проверьте содержимое DataFrame:
test.show()
+---+ | id| +---+ | 1| | 2| | 3| +---+
-
Проверьте в ADQM (например, используя консольный клиент clickhouse-client), что таблица создалась и заполнилась данными:
SELECT * FROM test_spark;
┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘
Пример загрузки данных из Spark 3 в ADQM
-
Укажите настройки коннектора:
val options_write = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "test_spark_write", "spark.adqm.create.table.engine" -> "MergeTree")
-
Выполните следующую команду, чтобы создать в ADQM таблицу
test_spark_write
и записать в нее данные из созданного в предыдущем примере DataFrame:test.write.format("adqm").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Проверьте в ADQM, что в новую таблицу добавились данные:
SELECT * FROM test_spark_write;
┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘