PySpark
PySpark is a Python wrapper for Spark that allows invocation of native Spark methods from the Python code. The core of PySpark is the Py4J library that allows a Python interpreter to manipulate Spark objects in a running JVM.
This section describes the use of PySpark via spark3-submit
and PySpark shell.
All the examples refer to the Spark3 service, however the steps are applicable for the Spark service as well.
Cluster requirements
The examples demonstrated in this section should run in the following environment:
-
ADH cluster nodes are CentOS 7-powered machines. For other OS types, the commands/paths may slightly differ.
-
ADH cluster 3.1.2.1.b1 or later is installed. The cluster has the Spark3 service installed.
During the cluster installation, ADCM also installs a Python3 interpreter to /opt/pyspark3-python/ on the hosts with the Spark3 Client component. This Python is used in the scenarios below, however, you can use a custom Python 3.10+ instance to run PySpark.
-
A separate HDFS directory for loading test data is created and the user who runs PySpark is the owner of this directory.
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
Use PySpark and spark3-submit
The following scenario shows how to submit a Spark job in Python using spark3-submit
in the cluster mode.
-
Set the
PYSPARK_PYTHON
environment variable on all cluster hosts to allow Spark to run the specific Python executable.For this, in ADCM, go to Clusters → <your_cluster_name> → Services → Spark3 → Primary configuration and add the line
export PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3
to the spark-env.sh field (the field appears after selecting Show advanced). Then, save the configuration and restart the Spark3 service. -
Create a test script named test.py.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) (1) df.write.csv("hdfs:///user/admin/tmp/test") (2) quit()
1 Creates a sample DataFrame. 2 Writes the DataFrame contents to HDFS as a CSV file. -
Run
spark3-submit
and provide test.py as an argument. In this scenario,spark3-submit
must run under theadmin
user.$ /bin/spark3-submit \ --deploy-mode cluster \ (1) --master yarn \ (2) test.py
1 Runs Spark in the cluster mode. 2 The master URL to run Spark on a YARN cluster. The cluster location is resolved based on the HADOOP_CONF_DIR
orYARN_CONF_DIR
environment variables. -
Verify HDFS writes:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/
Observe the DataFrame parts persisted in HDFS:
Found 3 items -rw-r--r-- 3 admin admin 0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS -rw-r--r-- 3 admin admin 17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv -rw-r--r-- 3 admin admin 25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
Use PySpark shell
PySpark shell allows you to run Spark jobs in the interactive mode. The following example shows how to install PySpark shell and run a Spark job in the local mode.
-
Create a new virtual environment using Python from /opt/pyspark3-python/:
$ cd ~ $ mkdir pyspark-demo $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
-
Activate the virtual environment:
$ source pyspark-demo/venv/bin/activate
-
Run PySpark:
$ pyspark3
You should see a similar output:
Python 3.10.4 (main, Sep 7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more information. Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.3.2 /_/ Using Python version 3.10.4 (main, Sep 7 2023 08:17:33) Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040 Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020). SparkSession available as 'spark'.
-
Submit the following Python code directly to the PySpark shell. This Spark job creates a sample DataFrame and writes it to HDFS through a NameNode endpoint.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1) quit()
1 <active-nn-host>
must point to an active NameNode. To identify the active NameNode, use the command below:$ sudo -u hdfs hdfs haadmin -getAllServiceState
A sample output:
ka-adh-1.ru-central1.internal:8020 standby ka-adh-3.ru-central1.internal:8020 active
-
Verify HDFS writes:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/
Observe the DataFrame persisted in HDFS:
-rw-r--r-- 3 admin hadoop 9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
Steps for a secured cluster
If you have enabled security for your cluster, that is:
-
installed Apache Ranger within ADPS;
-
kerberized both ADPS and ADH;
-
joined all Hadoop nodes into a domain and SSSD is used;
-
activated the HDFS/YARN plugins;
-
configured Ranger policies for HDFS/YARN plugins.
For such a cluster, the steps on using PySpark are identical, the main requirements are:
-
Your user/domain group has access to specific HDFS paths, defined in Ranger ACL.
-
You have to get a Kerberos ticket using
kinit
as shown below:$ kinit myuser@EXAMPLE_REALM.COM