PySpark

PySpark is a Python wrapper for Spark that allows invocation of native Spark methods from the Python code. The core of PySpark is the Py4J library that allows a Python interpreter to manipulate Spark objects in a running JVM.

This section describes the use of PySpark via spark3-submit and PySpark shell.

Cluster requirements

The examples demonstrated in this section should run in the following environment:

  • ADH cluster nodes are CentOS 7-powered machines. For other OS types, the commands/paths may slightly differ.

  • ADH cluster 3.1.2.1.b1 or later is installed. The cluster has the Spark3 service installed.

    During the cluster installation, ADCM also installs a Python3 interpreter to /opt/pyspark3-python/ on the hosts with the Spark3 Client component. This Python is used in the scenarios below, however, you can use a custom Python 3.10+ instance to run PySpark.

  • A separate HDFS directory for loading test data is created and the user who runs PySpark is the owner of this directory.

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin

Use PySpark and spark3-submit

The following scenario shows how to submit a Spark job in Python using spark3-submit in the cluster mode.

  1. Set the PYSPARK_PYTHON environment variable on all cluster hosts to allow Spark to run the specific Python executable.

    For this, in ADCM, go to Clusters → <your_cluster_name> → Services → Spark3 → Primary configuration and add the line export PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3 to the spark-env.sh field (the field appears after selecting Show advanced). Then, save the configuration and restart the Spark3 service.

  2. Create a test script named test.py.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data) (1)
    df.write.csv("hdfs:///user/admin/tmp/test") (2)
    quit()
    1 Creates a sample DataFrame.
    2 Writes the DataFrame contents to HDFS as a CSV file.
  3. Run spark3-submit and provide test.py as an argument. In this scenario, spark3-submit must run under the admin user.

    $ /bin/spark3-submit \
        --deploy-mode cluster \ (1)
        --master yarn \ (2)
        test.py
    1 Runs Spark in the cluster mode.
    2 The master URL to run Spark on a YARN cluster. The cluster location is resolved based on the HADOOP_CONF_DIR or YARN_CONF_DIR environment variables.
  4. Verify HDFS writes:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/

    Observe the DataFrame parts persisted in HDFS:

    Found 3 items
    -rw-r--r--   3 admin admin          0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS
    -rw-r--r--   3 admin admin         17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
    -rw-r--r--   3 admin admin         25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv

Use PySpark shell

PySpark shell allows you to run Spark jobs in the interactive mode. The following example shows how to install PySpark shell and run a Spark job in the local mode.

  1. Create a new virtual environment using Python from /opt/pyspark3-python/:

    $ cd ~
    $ mkdir pyspark-demo
    $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
  2. Activate the virtual environment:

    $ source pyspark-demo/venv/bin/activate
  1. Run PySpark:

    $ pyspark3

    You should see a similar output:

    Python 3.10.4 (main, Sep  7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    
    Using Python version 3.10.4 (main, Sep  7 2023 08:17:33)
    Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040
    Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020).
    SparkSession available as 'spark'.
  2. Submit the following Python code directly to the PySpark shell. This Spark job creates a sample DataFrame and writes it to HDFS through a NameNode endpoint.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data)
    df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1)
    quit()
    1 <active-nn-host> must point to an active NameNode. To identify the active NameNode, use the command below:
    $ sudo -u hdfs hdfs haadmin -getAllServiceState

    A sample output:

    ka-adh-1.ru-central1.internal:8020                 standby
    ka-adh-3.ru-central1.internal:8020                 active
  3. Verify HDFS writes:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/

    Observe the DataFrame persisted in HDFS:

    -rw-r--r--   3 admin hadoop          9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop          8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop         17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv

Steps for a secured cluster

If you have enabled security for your cluster, that is:

  • installed Apache Ranger within ADPS;

  • kerberized both ADPS and ADH;

  • joined all Hadoop nodes into a domain and SSSD is used;

  • activated the HDFS/YARN plugins;

  • configured Ranger policies for HDFS/YARN plugins.

For such a cluster, the steps on using PySpark are identical, the main requirements are:

  1. Your user/domain group has access to specific HDFS paths, defined in Ranger ACL.

  2. You have to get a Kerberos ticket using kinit as shown below:

    $ kinit myuser@EXAMPLE_REALM.COM
Found a mistake? Seleсt text and press Ctrl+Enter to report it