Примеры использования ADB Spark 3 Connector

Предварительные требования

В статье показаны примеры передачи данных между ADB и Spark 3 с использованием ADB Spark 3 Connector. При выполнении примеров соблюдены следующие условия:

  • Кластер ADB развернут согласно руководству Online-установка.

  • Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.1.2.1.b1.

  • Сервис Spark 3 установлен в кластере ADH.

  • IP-адрес мастер-хоста ADB — 10.92.40.38. Для входящих подключений PostgreSQL по умолчанию используется порт с номером 5432.

  • База данных с именем adb существует в кластере ADB.

  • Пользователь с именем adb_to_spark, привилегиями CREATEEXTTABLE и паролем 123 создан в БД кластера ADB. Для добавления пользователя можно использовать следующий запрос:

    CREATE ROLE adb_to_spark
    WITH  CREATEEXTTABLE(protocol='gpfdist',type='readable')
          CREATEEXTTABLE(protocol='gpfdist',type='writable')
          LOGIN
          PASSWORD '123';
  • Для пользователя adb_to_spark настроен доступ к базе данных adb с хостов кластера ADH. Для этой цели отредактирован файл pg_hba.conf следующим образом:

    host    all  adb_to_spark       0.0.0.0/0     md5

    Обратите внимание, что приведенная конфигурация предназначена для использования исключительно в тестовых целях. Вместо 0.0.0.0/0 можно перечислить IP-адреса хостов ADH-кластера, на которых установлен сервис Spark 3 (с подсетью).

    РЕКОМЕНДАЦИЯ

    Редактирование файла pg_hba.conf возможно в web-интерфейсе ADCM. Для этого заполните параметр Custom pg_hba section на вкладке Configuration сервиса ADB в кластере ADB. Чтобы применить изменения, нажмите Save и запустите сервисное действие Reconfigure.

  • Таблица author существует в базе данных adb. Для создания и заполнения таблицы предназначены следующие запросы:

    CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL)
    WITH (appendoptimized=true)
    DISTRIBUTED BY(id);
    INSERT INTO author(id, name) VALUES
    (1,'Virginia Woolf'),
    (2,'J.R.R. Tolkien'),
    (3,'Harper Lee'),
    (4,'J.D. Salinger'),
    (5,'George Orwell'),
    (6,'John Steinbeck'),
    (7,'Margaret Mitchell'),
    (8,'Alan Moore'),
    (9,'Jack Kerouac'),
    (10,'Ernest Hemingway');
  • Пользователь adb_to_spark имеет необходимые права на доступ к таблице author:

    GRANT SELECT, INSERT ON public.author TO adb_to_spark;

Для взаимодействия ADB Spark 3 Connector и ADB используется JDBC-соединение. Драйвер PostgreSQL JDBC driver поставляется вместе с коннектором.

Команды, приведенные в примерах ниже, выполняются в интерактивной оболочке для Scala — spark3-shell.

Чтение данных из ADB в Spark

  1. Откройте spark3-shell на хосте, где установлен Spark 3:

    $ sudo -u hdfs spark3-shell
  2. Укажите настройки коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB author:

    val options = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "author"
       )

    Результат:

    val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  3. Создайте DataFrame:

    val adb_author  = spark.read.format("adb").options(options).load()

    Результат:

    val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
  4. Проверьте содержимое DataFrame:

    adb_author.show()

    Результат:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  5. Получите схему выбранной таблицы:

    adb_author.printSchema()

    Результат:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  6. Вычислите количество строк в таблице:

    adb_author.count()

    Результат:

    val res9: Long = 10

Выполнение произвольного запроса SQL в ADB

Для отправки произвольного SQL-запроса из Spark в ADB выполните следующие шаги:

  1. Установите дополнительные настройки Spark-сессии и включите вспомогательные функции:

    spark.stop
    val spark1 = org.apache.spark.sql.SparkSession.
       builder().
       master("local[*]").
       appName("spark_example").
       config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb").
       config("spark.adb.driver","org.postgresql.Driver").
       config("spark.adb.user","adb_to_spark").
       config("spark.adb.password","123").
       getOrCreate()
    import io.arenadata.spark.adb.implicits._

    Результат:

    val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
  2. Создайте произвольную таблицу test_spark в ADB и заполните ее тестовыми данными:

    spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
  3. Создайте DataFrame на основе новой таблицы ADB test_spark:

    val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")

    Результат:

    val res: org.apache.spark.sql.DataFrame = [id: int]
  4. Проверьте содержимое DataFrame:

    res.show()

    Результат:

    +---+
    | id|
    +---+
    |  2|
    |  3|
    |  1|
    +---+

Запись данных из Spark в ADB

  1. Установите настройки коннектора для записи данных в ADB:

    val options_write = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "test_spark_write")

    Результат:

    val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  2. Выполните следующую команду для создания таблицы test_spark_write в ADB и загрузки в нее данных из созданного в предыдущем примере DataFrame res:

    res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
  3. Проверьте, что данные успешно загружены в ADB (например, используя psql):

    • Вывод структуры таблицы:

      \d+ public.test_spark_write
                         Table "public.test_spark_write"
       Column |  Type   | Modifiers | Storage | Stats target | Description
      --------+---------+-----------+---------+--------------+-------------
       id     | integer |           | plain   |              |
      Distributed by: (id)
    • Проверка содержимого таблицы:

      SELECT * FROM public.test_spark_write;
       id
      ----
        2
        1
        3
      (3 rows)
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней