Использование Spark Connect
В данной статье показаны примеры удаленного взаимодействия с кластером Spark3 через Spark Connect.
Компонент Spark3 Connect
Чтобы использовать Spark Connect, в вашем ADH-кластере должен быть установлен соответствующий компонент (Spark3 Connect) сервиса Spark3. Этот компонент можно добавить с помощью действий сервиса Spark3 в ADCM. После добавления в кластер компонент готов к работе из коробки и не требует настройки.
Использование Spark Connect в приложениях Spark
Код клиентского приложения Spark, в котором используется Spark Connect, почти не отличается от обычного приложения Spark.
Главное отличие — использование удаленной Spark-сессии (объект SparkSession), которая отвечает за соединение с удаленным сервером Spark Connect.
При использовании удаленной сессии код приложения Spark выполняется в YARN-кластере, где установлен сервер Spark Connect.
После создания удаленной SparkSession
взаимодействие с объектом сессии ведется таким же образом, как если бы это была обычная сессия Spark.
Все коммуникации с удаленным сервером Spark Connect выполняются автоматически с помощью клиентской библиотеки Spark Connect.
Основные способы создания удаленной сессии показаны ниже.
Использование spark3-submit
При использовании /bin/spark3-submit для запуска клиентских Spark-приложений создание удаленной сессии в коде приложения выглядит следующим образом:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('spark-yarn')
sparkSession = SparkSession
.builder
.appName("spark-yarn")
.config(conf=conf)
.remote("sc://<sc_host>:<sc_port>") (1)
.getOrCreate()
data = [('First', 1), ('Second', 2), ('Third', 3)]
df = sparkSession.createDataFrame(data)
df.write.csv("hdfs:///user/admin/tmp/test") (2)
quit()
1 | Создание удаленной сессии Spark.
<sc_host>:<sc_port> — это gRPC-эндпоинт компонента Spark3 Connect.
Актуальный адрес эндпоинта доступен в ADCM на странице Clusters → <your_cluster_name> → Services → Spark3 → Info. |
2 | Запись содержимого DataFrame в HDFS. Все вычисления и операции записи выполняются в кластере Spark3, к которому относится сервер Spark Connect. |
После запуска такого приложения в выводе содержится строка, сигнализирующая о соединении с удаленным сервером Spark Connect.
Client connected to the Spark Connect server at ka-adh-1.ru-central1.internal:15002
ПРИМЕЧАНИЕ
В приложении Spark одновременно может существовать только 1 объект SparkSession .
Вы можете удалить существующую сессию с помощью метода SparkSession.stop() и создать новый объект удаленной сессии как показано в примере выше.
|
Использование PySpark shell
При использовании PySpark shell (/usr/bin/pyspark3) объект SparkSession
создается автоматически при запуске.
... SparkSession available as 'spark'.
Для того чтобы сессия автоматически создавалась удаленной, используйте флаг --remote
при запуске /usr/bin/pyspark3.
Например:
./bin/pyspark --remote "sc://<sc_host>:<sc_port>"
Где <sc_host>:<sc_port>
— это gRPC-эндпоинт компонента Spark3 Connect.
Актуальный адрес эндпоинта доступен в ADCM на странице Clusters → <your_cluster_name> → Services → Spark3 → Info.