ADB Spark 3 Connector usage examples
Requirements
This article describes how to transfer data between ADB and Spark 3 via ADB Spark 3 Connector. The following prerequisites are met:
-
The ADB cluster is installed according to the Online installation guide.
-
The ADH cluster is installed according to the Online installation guide. The minimal ADH version is 3.1.2.1.b1.
-
The Spark 3 service is added to the ADH cluster.
-
IP address of the ADB master host is
10.92.40.38
. The default port number for income PostgreSQL connections is5432
. -
The
adb
database exists in the ADB cluster. -
The
adb_to_spark
user with theCREATEEXTTABLE
privileges and123
password exists in the ADB cluster. To add a user, you can run the following query:CREATE ROLE adb_to_spark WITH CREATEEXTTABLE(protocol='gpfdist',type='readable') CREATEEXTTABLE(protocol='gpfdist',type='writable') LOGIN PASSWORD '123';
-
The
adb_to_spark
user can access theadb
database from the ADH cluster. For this purpose, you can change the pg_hba.conf file as follows:host all adb_to_spark 0.0.0.0/0 md5
Note that it is not a production-ready configuration. Use it only for test needs. Instead
0.0.0.0/0
, you can write IP addresses of the hosts in the ADH cluster where Spark 3 is installed (with a subnet number).TIPYou can modify the pg_hba.conf file via the ADCM web interface. To do this, fill in the Custom pg_hba section parameter on the Configuration tab of the ADB service in the ADB cluster. To apply changes, click Save and run the service Reconfigure action.
-
The
author
table exists in theadb
database. To create and fill in a table, use the following queries:CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL) WITH (appendoptimized=true) DISTRIBUTED BY(id);
INSERT INTO author(id, name) VALUES (1,'Virginia Woolf'), (2,'J.R.R. Tolkien'), (3,'Harper Lee'), (4,'J.D. Salinger'), (5,'George Orwell'), (6,'John Steinbeck'), (7,'Margaret Mitchell'), (8,'Alan Moore'), (9,'Jack Kerouac'), (10,'Ernest Hemingway');
-
The
adb_to_spark
user has necessary permissions on theauthor
table:GRANT SELECT, INSERT ON public.author TO adb_to_spark;
ADB Spark 3 Connector communicates with ADB via a JDBC connection. The PostgreSQL JDBC driver comes with the connector.
Usage examples: Scala
The examples of Scala code presented below are intended for execution in the interactive shell for Scala — spark3-shell.
Load data from ADB to Spark
-
Open spark3-shell on the host where Spark 3 is installed:
$ sudo -u hdfs spark3-shell
-
Specify the connector settings for reading data from the ADB
author
table:val options = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "author" )
The result:
val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Register a DataFrame:
val adb_author = spark.read.format("adb").options(options).load()
The result:
val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
-
Check the DataFrame content:
adb_author.show()
The result:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Get the table schema:
adb_author.printSchema()
The result:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Calculate a number of the table rows:
adb_author.count()
The result:
val res9: Long = 10
Run any SQL query in ADB
You can execute any SQL query through the connector as well. To do this, run the following commands:
-
Set additional Spark session settings and enable auxiliary functions:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb"). config("spark.adb.driver","org.postgresql.Driver"). config("spark.adb.user","adb_to_spark"). config("spark.adb.password","123"). getOrCreate() import io.arenadata.spark.adb.implicits._
The result:
val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
-
Create an arbitrary table (
test_spark
) in ADB and fill it with some test data:spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
-
Create a DataFrame based on the new ADB table (
test_spark
):val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")
The result:
val res: org.apache.spark.sql.DataFrame = [id: int]
-
Check the DataFrame content:
res.show()
The result:
+---+ | id| +---+ | 2| | 3| | 1| +---+
Write data from Spark to ADB
-
Specify the connector settings for writing data to ADB:
val options_write = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "test_spark_write")
The result:
val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Run the following command to create a
test_spark_write
table in ADB and write data from theres
DataFrame created in the previous example into it:res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Check the data availability in ADB (for example, via psql):
-
Table structure:
\d+ public.test_spark_write
Table "public.test_spark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id | integer | | plain | | Distributed by: (id)
-
Table data:
SELECT * FROM public.test_spark_write;
id ---- 2 1 3 (3 rows)
-
Usage examples: PySpark
The examples of Python code presented below are intended for execution in the PySpark shell, which is available on the hosts where Spark3 is installed.
The shell process must be started under the hdfs
/spark
user to have sufficient permissions for interacting with a remote ADB cluster.
The following commands show how to start the PySpark shell appropriately.
-
Create a new virtual environment using Python from /opt/pyspark3-python/:
$ cd ~ $ mkdir pyspark-demo $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
-
Activate the virtual environment:
$ source pyspark-demo/venv/bin/activate
-
In the activated virtual environment, run the PySpark shell:
$ sudo -u hdfs pyspark3
You should see a similar output:
(venv) [admin@ka-adh-1 ~]$ sudo -u hdfs pyspark3 Python 3.10.4 (main, Jun 1 2024, 01:11:04) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more information. Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.4.3 /_/ Using Python version 3.10.4 (main, Jun 1 2024 01:11:04) Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4142 Spark context available as 'sc' (master = yarn, app id = application_1724163954275_0023). SparkSession available as 'spark'.
Load data from ADB to Spark
-
In the PySpark shell, define a dict with connector settings for reading data from the ADB
author
table:opts_read = { "spark.adb.url": "jdbc:postgresql://10.92.40.38/adb", "spark.adb.user": "adb_to_spark", "spark.adb.password": "123", "spark.adb.dbschema": "public", "spark.adb.dbtable": "author" }
-
Create a DataFrame:
adb_author = spark.read.format("adb") \ .options(**opts_read) \ .load()
The result:
24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Starting... 24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Start completed.
-
Check the DataFrame content:
adb_author.show()
The result:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Get the table schema:
adb_author.printSchema()
The result:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Get the number of the table rows:
adb_author.count()
The output:
10
Write data from Spark to ADB
-
In the PySpark shell, define a dict with connector settings for writing data to the ADB
test_pyspark_write
table:opts_write = { "spark.adb.url": "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user": "adb_to_spark", "spark.adb.password": "123", "spark.adb.dbschema": "public", "spark.adb.dbtable": "test_pyspark_write" }
-
Create a sample DataFrame and write its content to the test ADB table:
data = [ (1, "foo"), (2, "bar"), (3, "buzz") ] columns = ["id", "value"] test_df = spark.createDataFrame(data=data, schema=columns) test_df.write.format("adb") \ .options(**opts_write) \ .mode("overwrite") \ .save()
The output:
24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Starting... 24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Start completed. ... 24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff is committing. 24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff committed.
-
Check the data availability in ADB (for example, via psql):
-
Table structure:
\d+ public.test_pyspark_write
Table "public.test_pyspark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+--------+-----------+----------+--------------+------------- id | bigint | | plain | | value | text | | extended | | Distributed by: (id)
-
Table data:
SELECT * FROM public.test_pyspark_write;
id | value ----+------- 3 | buzz 1 | foo 2 | bar (3 rows)
-