Примеры использования ADB Spark 3 Connector
Предварительные требования
В статье показаны примеры передачи данных между ADB и Spark 3 с использованием ADB Spark 3 Connector. При выполнении примеров соблюдены следующие условия:
-
Кластер ADB развернут согласно руководству Online-установка.
-
Кластер ADH развернут согласно руководству Online-установка. Минимальная версия ADH — 3.1.2.1.b1.
-
Сервис Spark 3 установлен в кластере ADH.
-
IP-адрес мастер-хоста ADB —
10.92.40.38
. Для входящих подключений PostgreSQL по умолчанию используется порт с номером5432
. -
База данных с именем
adb
существует в кластере ADB. -
Пользователь с именем
adb_to_spark
, привилегиямиCREATEEXTTABLE
и паролем123
создан в БД кластера ADB. Для добавления пользователя можно использовать следующий запрос:CREATE ROLE adb_to_spark WITH CREATEEXTTABLE(protocol='gpfdist',type='readable') CREATEEXTTABLE(protocol='gpfdist',type='writable') LOGIN PASSWORD '123';
-
Для пользователя
adb_to_spark
настроен доступ к базе данныхadb
с хостов кластера ADH. Для этой цели отредактирован файл pg_hba.conf следующим образом:host all adb_to_spark 0.0.0.0/0 md5
Обратите внимание, что приведенная конфигурация предназначена для использования исключительно в тестовых целях. Вместо
0.0.0.0/0
можно перечислить IP-адреса хостов ADH-кластера, на которых установлен сервис Spark 3 (с подсетью).РЕКОМЕНДАЦИЯРедактирование файла pg_hba.conf возможно в web-интерфейсе ADCM. Для этого заполните параметр Custom pg_hba section на вкладке Configuration сервиса ADB в кластере ADB. Чтобы применить изменения, нажмите Save и запустите сервисное действие Reconfigure.
-
Таблица
author
существует в базе данныхadb
. Для создания и заполнения таблицы предназначены следующие запросы:CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL) WITH (appendoptimized=true) DISTRIBUTED BY(id);
INSERT INTO author(id, name) VALUES (1,'Virginia Woolf'), (2,'J.R.R. Tolkien'), (3,'Harper Lee'), (4,'J.D. Salinger'), (5,'George Orwell'), (6,'John Steinbeck'), (7,'Margaret Mitchell'), (8,'Alan Moore'), (9,'Jack Kerouac'), (10,'Ernest Hemingway');
-
Пользователь
adb_to_spark
имеет необходимые права на доступ к таблицеauthor
:GRANT SELECT, INSERT ON public.author TO adb_to_spark;
Для взаимодействия ADB Spark 3 Connector и ADB используется JDBC-соединение. Драйвер PostgreSQL JDBC driver поставляется вместе с коннектором.
Команды, приведенные в примерах ниже, выполняются в интерактивной оболочке для Scala — spark3-shell.
Чтение данных из ADB в Spark
-
Откройте spark3-shell на хосте, где установлен Spark 3:
$ sudo -u hdfs spark3-shell
-
Укажите настройки коннектора для чтения данных из предварительно созданной и заполненной таблицы ADB
author
:val options = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "author" )
Результат:
val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Создайте DataFrame:
val adb_author = spark.read.format("adb").options(options).load()
Результат:
val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
-
Проверьте содержимое DataFrame:
adb_author.show()
Результат:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Получите схему выбранной таблицы:
adb_author.printSchema()
Результат:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Вычислите количество строк в таблице:
adb_author.count()
Результат:
val res9: Long = 10
Выполнение произвольного запроса SQL в ADB
Для отправки произвольного SQL-запроса из Spark в ADB выполните следующие шаги:
-
Установите дополнительные настройки Spark-сессии и включите вспомогательные функции:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb"). config("spark.adb.driver","org.postgresql.Driver"). config("spark.adb.user","adb_to_spark"). config("spark.adb.password","123"). getOrCreate() import io.arenadata.spark.adb.implicits._
Результат:
val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
-
Создайте произвольную таблицу
test_spark
в ADB и заполните ее тестовыми данными:spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
-
Создайте DataFrame на основе новой таблицы ADB
test_spark
:val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")
Результат:
val res: org.apache.spark.sql.DataFrame = [id: int]
-
Проверьте содержимое DataFrame:
res.show()
Результат:
+---+ | id| +---+ | 2| | 3| | 1| +---+
Запись данных из Spark в ADB
-
Установите настройки коннектора для записи данных в ADB:
val options_write = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "test_spark_write")
Результат:
val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Выполните следующую команду для создания таблицы
test_spark_write
в ADB и загрузки в нее данных из созданного в предыдущем примере DataFrameres
:res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Проверьте, что данные успешно загружены в ADB (например, используя psql):
-
Вывод структуры таблицы:
\d+ public.test_spark_write
Table "public.test_spark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id | integer | | plain | | Distributed by: (id)
-
Проверка содержимого таблицы:
SELECT * FROM public.test_spark_write;
id ---- 2 1 3 (3 rows)
-