Использование Spark Connect

В данной статье показаны примеры удаленного взаимодействия с кластером Spark3 через Spark Connect.

Компонент Spark3 Connect

Чтобы использовать Spark Connect, в вашем ADH-кластере должен быть установлен соответствующий компонент (Spark3 Connect) сервиса Spark3. Этот компонент можно добавить с помощью действий сервиса Spark3 в ADCM. После добавления в кластер компонент готов к работе из коробки и не требует настройки.

Использование Spark Connect в приложениях Spark

Код клиентского приложения Spark, в котором используется Spark Connect, почти не отличается от обычного приложения Spark. Главное отличие — использование удаленной Spark-сессии (объект SparkSession), которая отвечает за соединение с удаленным сервером Spark Connect. При использовании удаленной сессии код приложения Spark выполняется в YARN-кластере, где установлен сервер Spark Connect. После создания удаленной SparkSession взаимодействие с объектом сессии ведется таким же образом, как если бы это была обычная сессия Spark. Все коммуникации с удаленным сервером Spark Connect выполняются автоматически с помощью клиентской библиотеки Spark Connect.

Основные способы создания удаленной сессии показаны ниже.

Использование spark3-submit

При использовании /bin/spark3-submit для запуска клиентских Spark-приложений создание удаленной сессии в коде приложения выглядит следующим образом:

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('spark-yarn')
sparkSession = SparkSession
    .builder
    .appName("spark-yarn")
    .config(conf=conf)
    .remote("sc://<sc_host>:<sc_port>") (1)
    .getOrCreate()

data = [('First', 1), ('Second', 2), ('Third', 3)]
df = sparkSession.createDataFrame(data)
df.write.csv("hdfs:///user/admin/tmp/test") (2)
quit()
1 Создание удаленной сессии Spark. <sc_host>:<sc_port> — это gRPC-эндпоинт компонента Spark3 Connect. Актуальный адрес эндпоинта доступен в ADCM на странице Clusters → <your_cluster_name> → Services → Spark3 → Info.
2 Запись содержимого DataFrame в HDFS. Все вычисления и операции записи выполняются в кластере Spark3, к которому относится сервер Spark Connect.

После запуска такого приложения в выводе содержится строка, сигнализирующая о соединении с удаленным сервером Spark Connect.

Client connected to the Spark Connect server at ka-adh-1.ru-central1.internal:15002
ПРИМЕЧАНИЕ
В приложении Spark одновременно может существовать только 1 объект SparkSession. Вы можете удалить существующую сессию с помощью метода SparkSession.stop() и создать новый объект удаленной сессии как показано в примере выше.

Использование PySpark shell

При использовании PySpark shell (/usr/bin/pyspark3) объект SparkSession создается автоматически при запуске.

...
SparkSession available as 'spark'.

Для того чтобы сессия автоматически создавалась удаленной, используйте флаг --remote при запуске /usr/bin/pyspark3. Например:

./bin/pyspark --remote "sc://<sc_host>:<sc_port>"

Где <sc_host>:<sc_port> — это gRPC-эндпоинт компонента Spark3 Connect. Актуальный адрес эндпоинта доступен в ADCM на странице Clusters → <your_cluster_name> → Services → Spark3 → Info.

Аутентификация

Spark Connect не предоставляет встроенных средств аутентификации, а gRPC-канал между клиентскими приложениями и сервером Spark Connect является незащищенным. При необходимости одним из способов защитить данный канал связи является использование gRPC-шлюзов.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней