PySpark
PySpark is a Python wrapper for Spark that allows invocation of native Spark methods from the Python code. The core of PySpark is the Py4J library that allows a Python interpreter to manipulate Spark objects in a running JVM.
This section describes the use of PySpark via spark3-submit
and PySpark shell.
Cluster requirements
The examples demonstrated in this section should run in the following environment:
-
ADH cluster nodes are CentOS 7-powered machines. For other OS types, the commands/paths may slightly differ.
-
ADH cluster 3.1.2.1.b1 or later is installed. The cluster has the Spark3 service installed.
During the cluster installation, ADCM also installs a Python3 interpreter to /opt/pyspark3-python/ on the hosts with the Spark3 Client component. This Python is used in the scenarios below, however, you can use a custom Python 3.10+ instance to run PySpark.
-
A separate HDFS directory for loading test data is created and the user who runs PySpark is the owner of this directory.
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
Use PySpark and spark3-submit
The following scenario shows how to submit a Spark job in Python using spark3-submit
in the cluster mode.
-
Set the
PYSPARK_PYTHON
environment variable on all cluster hosts to allow Spark to run the specific Python executable.For this, in ADCM, go to Clusters → <your_cluster_name> → Services → Spark3 → Primary configuration and add the line
export PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3
to the spark-env.sh field (the field appears after selecting Show advanced). Then, save the configuration and restart the Spark3 service. -
Create a test script named test.py.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) (1) df.write.csv("hdfs:///user/admin/tmp/test") (2) quit()
1 Creates a sample DataFrame. 2 Writes the DataFrame contents to HDFS as a CSV file. -
Run
spark3-submit
and provide test.py as an argument. In this scenario,spark3-submit
must run under theadmin
user.$ /bin/spark3-submit \ --deploy-mode cluster \ (1) --master yarn \ (2) test.py
1 Runs Spark in the cluster mode. 2 The master URL to run Spark on a YARN cluster. The cluster location is resolved based on the HADOOP_CONF_DIR
orYARN_CONF_DIR
environment variables. -
Verify HDFS writes:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/
Observe the DataFrame parts persisted in HDFS:
Found 3 items -rw-r--r-- 3 admin admin 0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS -rw-r--r-- 3 admin admin 17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv -rw-r--r-- 3 admin admin 25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
Use PySpark shell
PySpark shell allows you to run Spark jobs in the interactive mode. The following example shows how to install PySpark shell and run a Spark job in the local mode.
-
Create a new virtual environment using Python from /opt/pyspark3-python/:
$ cd ~ $ mkdir pyspark-demo $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
-
Activate the virtual environment:
$ source pyspark-demo/venv/bin/activate
-
Run PySpark:
$ pyspark3
You should see a similar output:
Python 3.10.4 (main, Sep 7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more information. Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.3.2 /_/ Using Python version 3.10.4 (main, Sep 7 2023 08:17:33) Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040 Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020). SparkSession available as 'spark'.
-
Submit the following Python code directly to the PySpark shell. This Spark job creates a sample DataFrame and writes it to HDFS through a NameNode endpoint.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1) quit()
1 <active-nn-host>
must point to an active NameNode. To identify the active NameNode, use the command below:$ sudo -u hdfs hdfs haadmin -getAllServiceState
A sample output:
ka-adh-1.ru-central1.internal:8020 standby ka-adh-3.ru-central1.internal:8020 active
-
Verify HDFS writes:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/
Observe the DataFrame persisted in HDFS:
-rw-r--r-- 3 admin hadoop 9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
Steps for a secured cluster
If you have enabled security for your cluster, that is:
-
installed Apache Ranger within ADPS;
-
kerberized both ADPS and ADH;
-
joined all Hadoop nodes into a domain and SSSD is used;
-
activated the HDFS/YARN plugins;
-
configured Ranger policies for HDFS/YARN plugins.
For such a cluster, the steps on using PySpark are identical, the main requirements are:
-
Your user/domain group has access to specific HDFS paths, defined in Ranger ACL.
-
You have to get a Kerberos ticket using
kinit
as shown below:$ kinit myuser@EXAMPLE_REALM.COM