PySpark
PySpark — это Python-обертка над Spark, которая позволяет вызывать нативные методы Spark из Python-кода. Ядром PySpark является библиотека Py4J, которая позволяет интерпретатору Python получить доступ к объектам Spark в JVM.
В данной статье показаны примеры использования PySpark через spark3-submit
и PySpark shell.
В примерах задействован сервис Spark3, однако все шаги применимы и для сервиса Spark.
Требования к кластеру
Примеры, описанные в данной статье, предназначены для выполнения в кластере со следующими характеристиками:
-
Узлы кластера — CentOS 7 машины. Для других OS команды/пути могут отличаться.
-
Установлен кластер ADH 3.1.2.1.b1 или более поздней версии. В кластере установлен сервис Spark3.
В процессе установки ADH-кластера ADCM также устанавливает интерпретатор Python3 в директорию /opt/pyspark3-python/ на тех хостах, где расположен компонент Spark3 Client. Данный интерпретатор Python используется для всех примеров, описанных в разделе, однако возможно использование другого интерпретатора Python версии 3.10+.
-
В HDFS создана директория для тестовой загрузки данных. Пользователь, от имени которого запускается PySpark, должен быть назначен владельцем (owner) этой директории.
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
Использование PySpark и spark3-submit
Ниже показан запуск Spark-задачи на Python с использованием spark3-submit
в режиме кластера.
-
На всех узлах кластера установите переменную окружения
PYSPARK_PYTHON
, которая позволяет Spark вызывать нужный Python-интерпретатор. Для этого в ADCM перейдите на страницу Clusters → <имя_кластера> → Services → Spark3 → Primary configuration и в поле spark-env.sh добавьте строкуexport PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3
(поле становится видимым после выбора опции Show advanced). Затем сохраните настройки и перезапустите сервис Spark3. -
Создайте тестовый файл test.py.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) (1) df.write.csv("hdfs:///user/admin/tmp/test") (2) quit()
1 Создание DataFrame из коллекции. 2 Запись содержимого DataFrame в HDFS в виде CSV-файла. -
Запустите
spark3-submit
, передавая test.py в качестве аргумента. В данном примереspark3-submit
необходимо запускать от пользователяadmin
.$ /bin/spark3-submit \ --deploy-mode cluster \ (1) --master yarn \ (2) test.py
1 Запуск Spark в режиме кластера. 2 Master URL для запуска Spark в YARN-кластере. Адрес кластера определяется на основе переменных окружения HADOOP_CONF_DIR
илиYARN_CONF_DIR
. -
Проверьте запись данных в HDFS:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/
Части (parts) DataFrame сохранены в HDFS:
Found 3 items -rw-r--r-- 3 admin admin 0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS -rw-r--r-- 3 admin admin 17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv -rw-r--r-- 3 admin admin 25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
Использование PySpark shell
PySpark shell позволяет запускать Spark-задачи в интерактивном режиме. Ниже показана установка PySpark shell и запуск задачи Spark в локальном режиме.
-
Создайте новый virtual environment, используя Python из /opt/pyspark3-python/:
$ cd ~ $ mkdir pyspark-demo $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
-
Активируйте virtual environment:
$ source pyspark-demo/venv/bin/activate
-
Запустите PySpark:
$ pyspark3
Вывод имеет следующий вид:
Python 3.10.4 (main, Sep 7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more information. Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.3.2 /_/ Using Python version 3.10.4 (main, Sep 7 2023 08:17:33) Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040 Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020). SparkSession available as 'spark'.
-
Выполните следующий код непосредственно в PySpark shell. В данном примере создается Spark DataFrame с последующей записью в HDFS через NameNode.
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession conf = SparkConf() conf.setMaster('yarn') conf.setAppName('spark-yarn') sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate() data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)] df = sparkSession.createDataFrame(data) df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1) quit()
1 <active-nn-host>
должен указывать на активную NameNode. Чтобы определить активную NameNode, используйте команду:$ sudo -u hdfs hdfs haadmin -getAllServiceState
Пример вывода:
ka-adh-1.ru-central1.internal:8020 standby ka-adh-3.ru-central1.internal:8020 active
-
Проверьте успешную запись данных в HDFS:
$ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/
Данные сохранены в HDFS:
-rw-r--r-- 3 admin hadoop 9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv -rw-r--r-- 3 admin hadoop 17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
Действия в защищенном кластере
Если в кластере настроена безопасность, а именно:
-
установлен Apache Ranger в ADPS;
-
керберизованы ADPS и ADH;
-
ноды кластера объединены в домен и используют SSSD;
-
активированы плагины HDFS/YARN;
-
настроены политики Ranger для HDFS/YARN-плагинов.
В таком кластере процедура использования PySpark полностью идентична вышеописанной, главными требованиями остаются:
-
Группа пользователя/домен должны иметь доступ к HDFS-директориям, определенным в списке управления доступом Ranger ACL.
-
Перед началом работы необходимо получить тикет Kerberos, используя
kinit
как показано ниже:$ kinit myuser@EXAMPLE_REALM.COM