Примеры использования ADB Spark 3 Connector

Предварительные требования

В статье показаны примеры передачи данных между ADB и Spark 3 с использованием ADB Spark 3 Connector. При выполнении примеров соблюдены следующие условия:

  • Кластер ADB развернут согласно руководству Online-установка.

  • Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.1.2.1.b1.

  • Сервис Spark 3 установлен в кластере ADH.

  • IP-адрес мастер-хоста ADB — 10.92.40.38. Для входящих подключений PostgreSQL по умолчанию используется порт с номером 5432.

  • База данных с именем adb существует в кластере ADB.

  • Пользователь с именем adb_to_spark, привилегиями CREATEEXTTABLE и паролем 123 создан в БД кластера ADB. Для добавления пользователя можно использовать следующий запрос:

    CREATE ROLE adb_to_spark
    WITH  CREATEEXTTABLE(protocol='gpfdist',type='readable')
          CREATEEXTTABLE(protocol='gpfdist',type='writable')
          LOGIN
          PASSWORD '123';
  • Для пользователя adb_to_spark настроен доступ к базе данных adb с хостов кластера ADH. Для этой цели отредактирован файл pg_hba.conf следующим образом:

    host    all  adb_to_spark       0.0.0.0/0     md5

    Обратите внимание, что приведенная конфигурация предназначена для использования исключительно в тестовых целях. Вместо 0.0.0.0/0 можно перечислить IP-адреса хостов ADH-кластера, на которых установлен сервис Spark 3 (с подсетью).

    РЕКОМЕНДАЦИЯ

    Редактирование файла pg_hba.conf возможно в web-интерфейсе ADCM. Для этого заполните параметр Custom pg_hba section на вкладке Configuration сервиса ADB в кластере ADB. Чтобы применить изменения, нажмите Save и запустите сервисное действие Reconfigure.

  • Таблица author существует в базе данных adb. Для создания и заполнения таблицы предназначены следующие запросы:

    CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL)
    WITH (appendoptimized=true)
    DISTRIBUTED BY(id);
    INSERT INTO author(id, name) VALUES
    (1,'Virginia Woolf'),
    (2,'J.R.R. Tolkien'),
    (3,'Harper Lee'),
    (4,'J.D. Salinger'),
    (5,'George Orwell'),
    (6,'John Steinbeck'),
    (7,'Margaret Mitchell'),
    (8,'Alan Moore'),
    (9,'Jack Kerouac'),
    (10,'Ernest Hemingway');
  • Пользователь adb_to_spark имеет необходимые права на доступ к таблице author:

    GRANT SELECT, INSERT ON public.author TO adb_to_spark;

Для взаимодействия ADB Spark 3 Connector и ADB используется JDBC-соединение. Драйвер PostgreSQL JDBC driver поставляется вместе с коннектором.

Примеры использования: Scala

Примеры кода на Scala, описанные далее, предназначены для запуска в интерактивной оболочке для Scala — spark3-shell.

Чтение данных из ADB в Spark

  1. Откройте spark3-shell на хосте, где установлен Spark 3:

    $ sudo -u hdfs spark3-shell
  2. Укажите настройки коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB author:

    val options = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "author"
       )

    Результат:

    val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  3. Создайте DataFrame:

    val adb_author  = spark.read.format("adb").options(options).load()

    Результат:

    val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
  4. Проверьте содержимое DataFrame:

    adb_author.show()

    Результат:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  5. Получите схему выбранной таблицы:

    adb_author.printSchema()

    Результат:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  6. Вычислите количество строк в таблице:

    adb_author.count()

    Результат:

    val res9: Long = 10

Выполнение произвольного запроса SQL в ADB

Для отправки произвольного SQL-запроса из Spark в ADB выполните следующие шаги:

  1. Установите дополнительные настройки Spark-сессии и включите вспомогательные функции:

    spark.stop
    val spark1 = org.apache.spark.sql.SparkSession.
       builder().
       master("local[*]").
       appName("spark_example").
       config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb").
       config("spark.adb.driver","org.postgresql.Driver").
       config("spark.adb.user","adb_to_spark").
       config("spark.adb.password","123").
       getOrCreate()
    import io.arenadata.spark.adb.implicits._

    Результат:

    val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
  2. Создайте произвольную таблицу test_spark в ADB и заполните ее тестовыми данными:

    spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
  3. Создайте DataFrame на основе новой таблицы ADB test_spark:

    val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")

    Результат:

    val res: org.apache.spark.sql.DataFrame = [id: int]
  4. Проверьте содержимое DataFrame:

    res.show()

    Результат:

    +---+
    | id|
    +---+
    |  2|
    |  3|
    |  1|
    +---+

Запись данных из Spark в ADB

  1. Установите настройки коннектора для записи данных в ADB:

    val options_write = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "test_spark_write")

    Результат:

    val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  2. Выполните следующую команду для создания таблицы test_spark_write в ADB и загрузки в нее данных из созданного в предыдущем примере DataFrame res:

    res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
  3. Проверьте, что данные успешно загружены в ADB (например, используя psql):

    • Вывод структуры таблицы:

      \d+ public.test_spark_write
                         Table "public.test_spark_write"
       Column |  Type   | Modifiers | Storage | Stats target | Description
      --------+---------+-----------+---------+--------------+-------------
       id     | integer |           | plain   |              |
      Distributed by: (id)
    • Проверка содержимого таблицы:

      SELECT * FROM public.test_spark_write;
       id
      ----
        2
        1
        3
      (3 rows)

Примеры использования: PySpark

Примеры кода на Python, приведенные ниже, предназначены для запуска в оболочке PySpark shell, которая доступна на хостах с установленным компонентом Spark3. Для корректной работы примеров из статьи оболочка PySpark должна запускаться пользователем hdfs/spark, чтобы обеспечить достаточные права для взаимодействия с удаленным ADB-кластером. Ниже показаны команды для запуска оболочки PySpark соответствующим образом.

  1. Создайте новый virtual environment, используя Python из /opt/pyspark3-python/:

    $ cd ~
    $ mkdir pyspark-demo
    $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
  2. Активируйте virtual environment:

    $ source pyspark-demo/venv/bin/activate
  3. Запустите оболочку PySpark в активированной virtual environment:

    $ sudo -u hdfs pyspark3

    Пример вывода показан ниже:

    (venv) [admin@ka-adh-1 ~]$ sudo -u hdfs pyspark3
    Python 3.10.4 (main, Jun  1 2024, 01:11:04) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.4.3
          /_/
    
    Using Python version 3.10.4 (main, Jun  1 2024 01:11:04)
    Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4142
    Spark context available as 'sc' (master = yarn, app id = application_1724163954275_0023).
    SparkSession available as 'spark'.

Чтение данных из ADB в Spark

  1. В оболочке PySpark укажите параметры коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB author:

    opts_read = {
       "spark.adb.url": "jdbc:postgresql://10.92.40.38/adb",
       "spark.adb.user": "adb_to_spark",
       "spark.adb.password": "123",
       "spark.adb.dbschema": "public",
       "spark.adb.dbtable": "author"
    }
  2. Создайте DataFrame:

    adb_author = spark.read.format("adb") \
                .options(**opts_read) \
                .load()

    Результат:

    24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Starting...
    24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Start completed.
  3. Проверьте содержимое DataFrame:

    adb_author.show()

    Результат:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  4. Получите схему выбранной таблицы:

    adb_author.printSchema()

    Результат:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  5. Вычислите количество строк в таблице:

    adb_author.count()

    Результат:

    10

Запись данных из Spark в ADB

  1. Установите настройки коннектора для записи данных в ADB-таблицу test_pyspark_write:

    opts_write = {
       "spark.adb.url": "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user": "adb_to_spark",
       "spark.adb.password": "123",
       "spark.adb.dbschema": "public",
       "spark.adb.dbtable": "test_pyspark_write"
    }
  2. Создайте новый DataFrame и запишите его содержимое в тестовую таблицу ADB:

    data = [
        (1, "foo"),
        (2, "bar"),
        (3, "buzz")
    ]
    columns = ["id", "value"]
    test_df = spark.createDataFrame(data=data, schema=columns)
    
    test_df.write.format("adb") \
        .options(**opts_write) \
        .mode("overwrite") \
        .save()

    Вывод:

    24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Starting...
    24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Start completed.
    ...
    24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff is committing.
    24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff committed.
  3. Проверьте, что данные успешно загружены в ADB (например, используя psql):

    • Вывод структуры таблицы:

      \d+ public.test_pyspark_write
                       Table "public.test_pyspark_write"
       Column |  Type  | Modifiers | Storage  | Stats target | Description
      --------+--------+-----------+----------+--------------+-------------
       id     | bigint |           | plain    |              |
       value  | text   |           | extended |              |
      Distributed by: (id)
    • Проверка содержимого таблицы:

      SELECT * FROM public.test_pyspark_write;
       id | value
      ----+-------
        3 | buzz
        1 | foo
        2 | bar
      (3 rows)
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней