PySpark

PySpark — это Python-обертка над Spark, которая позволяет вызывать нативные методы Spark из Python-кода. Ядром PySpark является библиотека Py4J, которая позволяет интерпретатору Python получить доступ к объектам Spark в JVM.

В данной статье показаны примеры использования PySpark через spark3-submit и PySpark shell. В примерах задействован сервис Spark3, однако все шаги применимы и для сервиса Spark.

Требования к кластеру

Примеры, описанные в данной статье, предназначены для выполнения в кластере со следующими характеристиками:

  • Узлы кластера — CentOS 7 машины. Для других OS команды/пути могут отличаться.

  • Установлен кластер ADH 3.1.2.1.b1 или более поздней версии. В кластере установлен сервис Spark3.

    В процессе установки ADH-кластера ADCM также устанавливает интерпретатор Python3 в директорию /opt/pyspark3-python/ на тех хостах, где расположен компонент Spark3 Client. Данный интерпретатор Python используется для всех примеров, описанных в разделе, однако возможно использование другого интерпретатора Python версии 3.10+.

  • В HDFS создана директория для тестовой загрузки данных. Пользователь, от имени которого запускается PySpark, должен быть назначен владельцем (owner) этой директории.

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin

Использование PySpark и spark3-submit

Ниже показан запуск Spark-задачи на Python с использованием spark3-submit в режиме кластера.

  1. На всех узлах кластера установите переменную окружения PYSPARK_PYTHON, которая позволяет Spark вызывать нужный Python-интерпретатор. Для этого в ADCM перейдите на страницу Clusters → <имя_кластера> → Services → Spark3 → Primary configuration и в поле spark-env.sh добавьте строку export PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3 (поле становится видимым после выбора опции Show advanced). Затем сохраните настройки и перезапустите сервис Spark3.

  2. Создайте тестовый файл test.py.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data) (1)
    df.write.csv("hdfs:///user/admin/tmp/test") (2)
    quit()
    1 Создание DataFrame из коллекции.
    2 Запись содержимого DataFrame в HDFS в виде CSV-файла.
  3. Запустите spark3-submit, передавая test.py в качестве аргумента. В данном примере spark3-submit необходимо запускать от пользователя admin.

    $ /bin/spark3-submit \
        --deploy-mode cluster \ (1)
        --master yarn \ (2)
        test.py
    1 Запуск Spark в режиме кластера.
    2 Master URL для запуска Spark в YARN-кластере. Адрес кластера определяется на основе переменных окружения HADOOP_CONF_DIR или YARN_CONF_DIR.
  4. Проверьте запись данных в HDFS:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/

    Части (parts) DataFrame сохранены в HDFS:

    Found 3 items
    -rw-r--r--   3 admin admin          0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS
    -rw-r--r--   3 admin admin         17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
    -rw-r--r--   3 admin admin         25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv

Использование PySpark shell

PySpark shell позволяет запускать Spark-задачи в интерактивном режиме. Ниже показана установка PySpark shell и запуск задачи Spark в локальном режиме.

  1. Создайте новый virtual environment, используя Python из /opt/pyspark3-python/:

    $ cd ~
    $ mkdir pyspark-demo
    $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
  2. Активируйте virtual environment:

    $ source pyspark-demo/venv/bin/activate
  3. Запустите PySpark:

    $ pyspark3

    Вывод имеет следующий вид:

    Python 3.10.4 (main, Sep  7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    
    Using Python version 3.10.4 (main, Sep  7 2023 08:17:33)
    Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040
    Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020).
    SparkSession available as 'spark'.
  4. Выполните следующий код непосредственно в PySpark shell. В данном примере создается Spark DataFrame с последующей записью в HDFS через NameNode.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data)
    df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1)
    quit()
    1 <active-nn-host> должен указывать на активную NameNode. Чтобы определить активную NameNode, используйте команду:
    $ sudo -u hdfs hdfs haadmin -getAllServiceState

    Пример вывода:

    ka-adh-1.ru-central1.internal:8020                 standby
    ka-adh-3.ru-central1.internal:8020                 active
  5. Проверьте успешную запись данных в HDFS:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/

    Данные сохранены в HDFS:

    -rw-r--r--   3 admin hadoop          9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop          8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop         17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv

Действия в защищенном кластере

Если в кластере настроена безопасность, а именно:

  • установлен Apache Ranger в ADPS;

  • керберизованы ADPS и ADH;

  • ноды кластера объединены в домен и используют SSSD;

  • активированы плагины HDFS/YARN;

  • настроены политики Ranger для HDFS/YARN-плагинов.

В таком кластере процедура использования PySpark полностью идентична вышеописанной, главными требованиями остаются:

  1. Группа пользователя/домен должны иметь доступ к HDFS-директориям, определенным в списке управления доступом Ranger ACL.

  2. Перед началом работы необходимо получить тикет Kerberos, используя kinit как показано ниже:

    $ kinit myuser@EXAMPLE_REALM.COM
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней