Пример использования

Прежде чем приступить к использованию ADQM Spark 3 Connector, необходимо уточнить следующую информацию:

  • имя хоста ADQM;

  • порт для взаимодействия с ADQM;

  • реквизиты пользователя для подключения;

  • имя базы данных, к которой необходимо подключиться;

  • имя таблицы, к которой нужно получить доступ.

 
В примерах, приведенных в этой статье, используются следующие тестовые данные:

  • хост ADQM — 10.92.17.146;

  • порт — 9000;

  • пользователь ADQM — default (без пароля);

  • база данных — default;

  • таблица для чтения из ADQM — users:

    ┌─user_id─┬─name───┬───role──┐
    │       1 │ john   │    admin│
    │       2 │ mary   │   author│
    │       3 │ andrew │   author│
    │       4 │ harry  │ reviewer│
    │       5 │ ann    │view only│
    └─────────┴────────┴─────────┘

Для взаимодействия ADQM Spark 3 Connector и ADQM используется соединение JDBC. Драйвер ClickHouse Native JDBC поставляется вместе с коннектором.

Команды выполняются в интерактивной оболочке для Scala — spark3-shell.

Пример загрузки данных из ADQM в Spark 3

  1. Укажите настройки коннектора для чтения данных из таблицы ADQM default.users:

    val options_read = Map(
        "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000",
        "spark.adqm.user" -> "default",
        "spark.adqm.dbschema" -> "default",
        "spark.adqm.dbtable" -> "users")
  2. Создайте DataFrame:

    val adqm_users = spark.read.format("adqm").options(options_read).load()

    Результат выполнения команды:

    adqm_users: org.apache.spark.sql.DataFrame = [user_id: int, name: string ... 1 more field]
  3. Проверьте содержимое DataFrame:

    adqm_users.show()
    +-------+------+---------+
    |user_id|  name|     role|
    +-------+------+---------+
    |      1|  john|    admin|
    |      2|  mary|   author|
    |      3|andrew|   author|
    |      4| harry| reviewer|
    |      5|   ann|view only|
    +-------+------+---------+

Пример выполнения произвольного запроса SQL на узле ADQM

  1. Отредактируйте настройки Spark-сессии и включите вспомогательные функции:

    spark.stop
    val spark1 = org.apache.spark.sql.SparkSession.
        builder().
        master("local[*]").
        appName("spark_example").
        config("spark.adqm.url","jdbc:clickhouse://10.92.17.146:9000").
        config("spark.adqm.user","default").
        getOrCreate()
    import io.arenadata.spark.adqm.implicits._
  2. Создайте произвольную таблицу в ADQM и заполните ее тестовыми данными:

    spark1.executeAdqmQueryOnShard("create table default.test_spark(id integer) ENGINE  = MergeTree() ORDER BY id;")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(1);")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(2);")
    spark1.executeAdqmQueryOnShard("insert into default.test_spark values(3);")
  3. Создайте DataFrame на основе новой таблицы ADQM:

    val test = spark1.executeAdqmSelectQueryOnShard("select * from default.test_spark;")
    test: org.apache.spark.sql.DataFrame = [id: int]
  4. Проверьте содержимое DataFrame:

    test.show()
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    +---+
  5. Проверьте в ADQM (например, используя консольный клиент clickhouse-client), что таблица создалась и заполнилась данными:

    SELECT * FROM test_spark;
    ┌─id─┐
    │  1 │
    │  2 │
    │  3 │
    └────┘

Пример загрузки данных из Spark 3 в ADQM

  1. Укажите настройки коннектора:

    val options_write = Map(
        "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000",
        "spark.adqm.user" -> "default",
        "spark.adqm.dbschema" -> "default",
        "spark.adqm.dbtable" -> "test_spark_write",
        "spark.adqm.create.table.engine" -> "MergeTree")
  2. Выполните следующую команду, чтобы создать в ADQM таблицу test_spark_write и записать в нее данные из созданного в предыдущем примере DataFrame:

    test.write.format("adqm").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
  3. Проверьте в ADQM, что в новую таблицу добавились данные:

    SELECT * FROM test_spark_write;
    ┌─id─┐
    │  1 │
    │  2 │
    │  3 │
    └────┘
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней