Примеры использования ADB Spark 3 Connector
Предварительные требования
В статье показаны примеры передачи данных между ADB и Spark 3 с использованием ADB Spark 3 Connector. При выполнении примеров соблюдены следующие условия:
-
Кластер ADB развернут согласно руководству Online-установка.
-
Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.1.2.1.b1.
-
Сервис Spark 3 установлен в кластере ADH.
-
IP-адрес мастер-хоста ADB —
10.92.40.38
. Для входящих подключений PostgreSQL по умолчанию используется порт с номером5432
. -
База данных с именем
adb
существует в кластере ADB. -
Пользователь с именем
adb_to_spark
, привилегиямиCREATEEXTTABLE
и паролем123
создан в БД кластера ADB. Для добавления пользователя можно использовать следующий запрос:CREATE ROLE adb_to_spark WITH CREATEEXTTABLE(protocol='gpfdist',type='readable') CREATEEXTTABLE(protocol='gpfdist',type='writable') LOGIN PASSWORD '123';
-
Для пользователя
adb_to_spark
настроен доступ к базе данныхadb
с хостов кластера ADH. Для этой цели отредактирован файл pg_hba.conf следующим образом:host all adb_to_spark 0.0.0.0/0 md5
Обратите внимание, что приведенная конфигурация предназначена для использования исключительно в тестовых целях. Вместо
0.0.0.0/0
можно перечислить IP-адреса хостов ADH-кластера, на которых установлен сервис Spark 3 (с подсетью).РЕКОМЕНДАЦИЯРедактирование файла pg_hba.conf возможно в web-интерфейсе ADCM. Для этого заполните параметр Custom pg_hba section на вкладке Configuration сервиса ADB в кластере ADB. Чтобы применить изменения, нажмите Save и запустите сервисное действие Reconfigure.
-
Таблица
author
существует в базе данныхadb
. Для создания и заполнения таблицы предназначены следующие запросы:CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL) WITH (appendoptimized=true) DISTRIBUTED BY(id);
INSERT INTO author(id, name) VALUES (1,'Virginia Woolf'), (2,'J.R.R. Tolkien'), (3,'Harper Lee'), (4,'J.D. Salinger'), (5,'George Orwell'), (6,'John Steinbeck'), (7,'Margaret Mitchell'), (8,'Alan Moore'), (9,'Jack Kerouac'), (10,'Ernest Hemingway');
-
Пользователь
adb_to_spark
имеет необходимые права на доступ к таблицеauthor
:GRANT SELECT, INSERT ON public.author TO adb_to_spark;
Для взаимодействия ADB Spark 3 Connector и ADB используется JDBC-соединение. Драйвер PostgreSQL JDBC driver поставляется вместе с коннектором.
Примеры использования: Scala
Примеры кода на Scala, описанные далее, предназначены для запуска в интерактивной оболочке для Scala — spark3-shell.
Чтение данных из ADB в Spark
-
Откройте spark3-shell на хосте, где установлен Spark 3:
$ sudo -u hdfs spark3-shell
-
Укажите настройки коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB
author
:val options = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "author" )
Результат:
val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Создайте DataFrame:
val adb_author = spark.read.format("adb").options(options).load()
Результат:
val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
-
Проверьте содержимое DataFrame:
adb_author.show()
Результат:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Получите схему выбранной таблицы:
adb_author.printSchema()
Результат:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Вычислите количество строк в таблице:
adb_author.count()
Результат:
val res9: Long = 10
Выполнение произвольного запроса SQL в ADB
Для отправки произвольного SQL-запроса из Spark в ADB выполните следующие шаги:
-
Установите дополнительные настройки Spark-сессии и включите вспомогательные функции:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb"). config("spark.adb.driver","org.postgresql.Driver"). config("spark.adb.user","adb_to_spark"). config("spark.adb.password","123"). getOrCreate() import io.arenadata.spark.adb.implicits._
Результат:
val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
-
Создайте произвольную таблицу
test_spark
в ADB и заполните ее тестовыми данными:spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
-
Создайте DataFrame на основе новой таблицы ADB
test_spark
:val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")
Результат:
val res: org.apache.spark.sql.DataFrame = [id: int]
-
Проверьте содержимое DataFrame:
res.show()
Результат:
+---+ | id| +---+ | 2| | 3| | 1| +---+
Запись данных из Spark в ADB
-
Установите настройки коннектора для записи данных в ADB:
val options_write = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "test_spark_write")
Результат:
val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Выполните следующую команду для создания таблицы
test_spark_write
в ADB и загрузки в нее данных из созданного в предыдущем примере DataFrameres
:res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Проверьте, что данные успешно загружены в ADB (например, используя psql):
-
Вывод структуры таблицы:
\d+ public.test_spark_write
Table "public.test_spark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id | integer | | plain | | Distributed by: (id)
-
Проверка содержимого таблицы:
SELECT * FROM public.test_spark_write;
id ---- 2 1 3 (3 rows)
-
Примеры использования: PySpark
Примеры кода на Python, приведенные ниже, предназначены для запуска в оболочке PySpark shell, которая доступна на хостах с установленным компонентом Spark3.
Для корректной работы примеров из статьи оболочка PySpark должна запускаться пользователем hdfs
/spark
, чтобы обеспечить достаточные права для взаимодействия с удаленным ADB-кластером.
Ниже показаны команды для запуска оболочки PySpark соответствующим образом.
-
Создайте новый virtual environment, используя Python из /opt/pyspark3-python/:
$ cd ~ $ mkdir pyspark-demo $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
-
Активируйте virtual environment:
$ source pyspark-demo/venv/bin/activate
-
Запустите оболочку PySpark в активированной virtual environment:
$ sudo -u hdfs pyspark3
Пример вывода показан ниже:
(venv) [admin@ka-adh-1 ~]$ sudo -u hdfs pyspark3 Python 3.10.4 (main, Jun 1 2024, 01:11:04) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more information. Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.4.3 /_/ Using Python version 3.10.4 (main, Jun 1 2024 01:11:04) Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4142 Spark context available as 'sc' (master = yarn, app id = application_1724163954275_0023). SparkSession available as 'spark'.
Чтение данных из ADB в Spark
-
В оболочке PySpark укажите параметры коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB
author
:opts_read = { "spark.adb.url": "jdbc:postgresql://10.92.40.38/adb", "spark.adb.user": "adb_to_spark", "spark.adb.password": "123", "spark.adb.dbschema": "public", "spark.adb.dbtable": "author" }
-
Создайте DataFrame:
adb_author = spark.read.format("adb") \ .options(**opts_read) \ .load()
Результат:
24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Starting... 24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Start completed.
-
Проверьте содержимое DataFrame:
adb_author.show()
Результат:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Получите схему выбранной таблицы:
adb_author.printSchema()
Результат:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Вычислите количество строк в таблице:
adb_author.count()
Результат:
10
Запись данных из Spark в ADB
-
Установите настройки коннектора для записи данных в ADB-таблицу
test_pyspark_write
:opts_write = { "spark.adb.url": "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user": "adb_to_spark", "spark.adb.password": "123", "spark.adb.dbschema": "public", "spark.adb.dbtable": "test_pyspark_write" }
-
Создайте новый DataFrame и запишите его содержимое в тестовую таблицу ADB:
data = [ (1, "foo"), (2, "bar"), (3, "buzz") ] columns = ["id", "value"] test_df = spark.createDataFrame(data=data, schema=columns) test_df.write.format("adb") \ .options(**opts_write) \ .mode("overwrite") \ .save()
Вывод:
24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Starting... 24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Start completed. ... 24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff is committing. 24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff committed.
-
Проверьте, что данные успешно загружены в ADB (например, используя psql):
-
Вывод структуры таблицы:
\d+ public.test_pyspark_write
Table "public.test_pyspark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+--------+-----------+----------+--------------+------------- id | bigint | | plain | | value | text | | extended | | Distributed by: (id)
-
Проверка содержимого таблицы:
SELECT * FROM public.test_pyspark_write;
id | value ----+------- 3 | buzz 1 | foo 2 | bar (3 rows)
-