ADB Spark 3 Connector usage examples

Requirements

This article describes how to transfer data between ADB and Spark 3 via ADB Spark 3 Connector. The following prerequisites are met:

  • The ADB cluster is installed according to the Online installation guide.

  • The ADH cluster is installed according to the Online installation guide. The minimal ADH version is 3.1.2.1.b1.

  • The Spark 3 service is added to the ADH cluster.

  • IP address of the ADB master host is 10.92.40.38. The default port number for income PostgreSQL connections is 5432.

  • The adb database exists in the ADB cluster.

  • The adb_to_spark user with the CREATEEXTTABLE privileges and 123 password exists in the ADB cluster. To add a user, you can run the following query:

    CREATE ROLE adb_to_spark
    WITH  CREATEEXTTABLE(protocol='gpfdist',type='readable')
          CREATEEXTTABLE(protocol='gpfdist',type='writable')
          LOGIN
          PASSWORD '123';
  • The adb_to_spark user can access the adb database from the ADH cluster. For this purpose, you can change the pg_hba.conf file as follows:

    host    all  adb_to_spark       0.0.0.0/0     md5

    Note that it is not a production-ready configuration. Use it only for test needs. Instead 0.0.0.0/0, you can write IP addresses of the hosts in the ADH cluster where Spark 3 is installed (with a subnet number).

    TIP

    You can modify the pg_hba.conf file via the ADCM web interface. To do this, fill in the Custom pg_hba section parameter on the Configuration tab of the ADB service in the ADB cluster. To apply changes, click Save and run the service Reconfigure action.

  • The author table exists in the adb database. To create and fill in a table, use the following queries:

    CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL)
    WITH (appendoptimized=true)
    DISTRIBUTED BY(id);
    INSERT INTO author(id, name) VALUES
    (1,'Virginia Woolf'),
    (2,'J.R.R. Tolkien'),
    (3,'Harper Lee'),
    (4,'J.D. Salinger'),
    (5,'George Orwell'),
    (6,'John Steinbeck'),
    (7,'Margaret Mitchell'),
    (8,'Alan Moore'),
    (9,'Jack Kerouac'),
    (10,'Ernest Hemingway');
  • The adb_to_spark user has necessary permissions on the author table:

    GRANT SELECT, INSERT ON public.author TO adb_to_spark;

ADB Spark 3 Connector communicates with ADB via a JDBC connection. The PostgreSQL JDBC driver comes with the connector.

Usage examples: Scala

The examples of Scala code presented below are intended for execution in the interactive shell for Scala — spark3-shell.

Load data from ADB to Spark

  1. Open spark3-shell on the host where Spark 3 is installed:

    $ sudo -u hdfs spark3-shell
  2. Specify the connector settings for reading data from the ADB author table:

    val options = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "author"
       )

    The result:

    val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  3. Register a DataFrame:

    val adb_author  = spark.read.format("adb").options(options).load()

    The result:

    val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
  4. Check the DataFrame content:

    adb_author.show()

    The result:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  5. Get the table schema:

    adb_author.printSchema()

    The result:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  6. Calculate a number of the table rows:

    adb_author.count()

    The result:

    val res9: Long = 10

Run any SQL query in ADB

You can execute any SQL query through the connector as well. To do this, run the following commands:

  1. Set additional Spark session settings and enable auxiliary functions:

    spark.stop
    val spark1 = org.apache.spark.sql.SparkSession.
       builder().
       master("local[*]").
       appName("spark_example").
       config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb").
       config("spark.adb.driver","org.postgresql.Driver").
       config("spark.adb.user","adb_to_spark").
       config("spark.adb.password","123").
       getOrCreate()
    import io.arenadata.spark.adb.implicits._

    The result:

    val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
  2. Create an arbitrary table (test_spark) in ADB and fill it with some test data:

    spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
  3. Create a DataFrame based on the new ADB table (test_spark):

    val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")

    The result:

    val res: org.apache.spark.sql.DataFrame = [id: int]
  4. Check the DataFrame content:

    res.show()

    The result:

    +---+
    | id|
    +---+
    |  2|
    |  3|
    |  1|
    +---+

Write data from Spark to ADB

  1. Specify the connector settings for writing data to ADB:

    val options_write = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "test_spark_write")

    The result:

    val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  2. Run the following command to create a test_spark_write table in ADB and write data from the res DataFrame created in the previous example into it:

    res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
  3. Check the data availability in ADB (for example, via psql):

    • Table structure:

      \d+ public.test_spark_write
                         Table "public.test_spark_write"
       Column |  Type   | Modifiers | Storage | Stats target | Description
      --------+---------+-----------+---------+--------------+-------------
       id     | integer |           | plain   |              |
      Distributed by: (id)
    • Table data:

      SELECT * FROM public.test_spark_write;
       id
      ----
        2
        1
        3
      (3 rows)

Usage examples: PySpark

The examples of Python code presented below are intended for execution in the PySpark shell, which is available on the hosts where Spark3 is installed. The shell process must be started under the hdfs/spark user to have sufficient permissions for interacting with a remote ADB cluster. The following commands show how to start the PySpark shell appropriately.

  1. Create a new virtual environment using Python from /opt/pyspark3-python/:

    $ cd ~
    $ mkdir pyspark-demo
    $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
  2. Activate the virtual environment:

    $ source pyspark-demo/venv/bin/activate
  3. In the activated virtual environment, run the PySpark shell:

    $ sudo -u hdfs pyspark3

    You should see a similar output:

    (venv) [admin@ka-adh-1 ~]$ sudo -u hdfs pyspark3
    Python 3.10.4 (main, Jun  1 2024, 01:11:04) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.4.3
          /_/
    
    Using Python version 3.10.4 (main, Jun  1 2024 01:11:04)
    Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4142
    Spark context available as 'sc' (master = yarn, app id = application_1724163954275_0023).
    SparkSession available as 'spark'.

Load data from ADB to Spark

  1. In the PySpark shell, define a dict with connector settings for reading data from the ADB author table:

    opts_read = {
       "spark.adb.url": "jdbc:postgresql://10.92.40.38/adb",
       "spark.adb.user": "adb_to_spark",
       "spark.adb.password": "123",
       "spark.adb.dbschema": "public",
       "spark.adb.dbtable": "author"
    }
  2. Create a DataFrame:

    adb_author = spark.read.format("adb") \
                .options(**opts_read) \
                .load()

    The result:

    24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Starting...
    24/08/28 19:58:59 INFO HikariDataSource: HikariPool-1 - Start completed.
  3. Check the DataFrame content:

    adb_author.show()

    The result:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  4. Get the table schema:

    adb_author.printSchema()

    The result:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  5. Get the number of the table rows:

    adb_author.count()

    The output:

    10

Write data from Spark to ADB

  1. In the PySpark shell, define a dict with connector settings for writing data to the ADB test_pyspark_write table:

    opts_write = {
       "spark.adb.url": "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user": "adb_to_spark",
       "spark.adb.password": "123",
       "spark.adb.dbschema": "public",
       "spark.adb.dbtable": "test_pyspark_write"
    }
  2. Create a sample DataFrame and write its content to the test ADB table:

    data = [
        (1, "foo"),
        (2, "bar"),
        (3, "buzz")
    ]
    columns = ["id", "value"]
    test_df = spark.createDataFrame(data=data, schema=columns)
    
    test_df.write.format("adb") \
        .options(**opts_write) \
        .mode("overwrite") \
        .save()

    The output:

    24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Starting...
    24/08/28 17:34:29 INFO HikariDataSource: HikariPool-3 - Start completed.
    ...
    24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff is committing.
    24/08/28 17:34:31 INFO OverwriteByExpressionExec: Data source write support io.arenadata.spark.adb.spark3.writer.AdbBatchWrite@26b15eff committed.
  3. Check the data availability in ADB (for example, via psql):

    • Table structure:

      \d+ public.test_pyspark_write
                       Table "public.test_pyspark_write"
       Column |  Type  | Modifiers | Storage  | Stats target | Description
      --------+--------+-----------+----------+--------------+-------------
       id     | bigint |           | plain    |              |
       value  | text   |           | extended |              |
      Distributed by: (id)
    • Table data:

      SELECT * FROM public.test_pyspark_write;
       id | value
      ----+-------
        3 | buzz
        1 | foo
        2 | bar
      (3 rows)
Found a mistake? Seleсt text and press Ctrl+Enter to report it