spark-shell

Spark shell предоставляет среду для использования функционала Spark. Доступны множество различных команд, которые можно использовать для обработки данных в интерактивной оболочке.

Основной концепцией Apache Spark является устойчивый распределенный набор данных (Resilient Distributed Datasheet, RDD). Это неизменяемая распределенная коллекция данных, разделенная по машинам в кластере.

Функции преобразования

Функции преобразования (transformation functions) — это операции над RDD, такие как filter(), map() или union(), которые порождают новый RDD.

Функция Описание

map(function)

Возвращает новый RDD, применяя функцию к элементу

filter(function)

Возвращает новый dataset из тех исходных элементов, для которых функция возвращает значение true

filterByRange(lower, upper)

Возвращает RDD с элементами в указанном диапазоне

flatMap(function)

Действует подобно map(), однако возвращает последовательность вместо значения

reduceByKey(function,[num Tasks])

Агрегирует значения ключа с помощью функции

groupByKey([num Tasks])

Преобразует (K, V) в (K, <iterable V>)

distinct([num Tasks])

Устраняет дубликаты из RDD

mapPartitions(function)

Запускается отдельно на каждом разделе RDD

mapPartitionsWithIndex(function)

Применяет функцию к разделам, отмеченным числовым значением

sample(withReplacement, fraction, seed)

Используется для выборки части данных с использованием заданного случайного числа

union(anotherRDD)

Возвращает новый RDD, содержащий все элементы исходного и anotherRDD RDD

intersection(anotherRDD)

Возвращает новый RDD, содержащий пересечение элементов в исходном и anotherRDD RDD

cartesian()

Возвращает декартово произведение всех пар элементов

subtract(anotherRDD)

Создает новый RDD, удаляя элементы anotherRDD из исходного RDD

join(RDD,[numTasks])

Объединяет два элемента набора данных с общими аргументами. При объединении (A,B) и (A,C) создает новый RDD (A,(B,C))

cogroup(RDD,[numTasks])

Преобразует (A,B) в (A, <iterable B>)

Функции действия

Функции действия (Action functions) — это операции, которые запускают вычисления, такие как count(), first(), take(n) или collect().

Функция Описание

count()

Возвращает количество элементов в RDD

collect()

Возвращает количество элементов в RDD в виде массива

reduce(function)

Агрегирует элементы данных в RDD, принимая два аргумента и возвращая один

take(n)

Выбирает первые n элементов из RDD

foreach(function)

Выполняет функцию для каждого элемента данных в RDD

first()

Получает первый элемент данных RDD

saveastextfile(path)

Записывает содержимое RDD в один или несколько текстовых файлов в локальной файловой системе

takeordered(n, [ordering])

Возвращает первые n элементов RDD, используя либо естественный порядок, либо кастомный компаратор

Уровни персистентного хранения

Уровни персистентного хранения (Persistence storage levels) определяют, где и каким образом сохранять или кешировать RDD, DataFrame или Dataset.

Уровень Описание

MEMORY_ONLY

Значение по умолчанию. Сохраняет RDD в памяти кластера в виде десериализованного Java-объекта

MEMORY_AND_DISK

Сохраняет в виде десериализованного Java-объекта. Если RDD не помещается в памяти кластера, он сохраняет разделы (partitions) на диске и считывает их

MEMORY_ONLY_SER

Сохраняет RDD в виде десериализованного Java-объекта. Эта операция более ЦПУ-затратна

MEMORY_ONLY_DISK_SER

Эта опция аналогична MEMORY_ONLY_SER, однако сохраняет данные на диск, если памяти недостаточно

DISC_ONLY

Сохраняет RDD только на диск

MEMORY_ONLY_2, MEMORY_AND_DISK_2 и так далее

Эта опция аналогична другим уровням, за исключением того, что разделы реплицируются на 2 slave-узла

Функции персистентности

Функции персистентности Spark (persistence functions) позволяют сохранять (или кешировать) dataset в памяти между операциями. При сохранении RDD каждый узел кластера сохраняет разделы RDD в памяти и использует их при выполнении операций над dataset (или производными этого dataset). Это увеличивает скорость выполнения последующих операций. Подобное кеширование является ключевым инструментом для итерационных алгоритмов и быстрого интерактивного взаимодействия.

Функция Описание

cache()

Используется, чтобы избежать ненужных повторных вычислений. Эквивалентно persist(MEMORY_ONLY)

persist(<storage_level>)

Сохраняет RDD с заданным уровнем хранения

unpersist()

Помечает RDD как неперсистентный (non-persistent) и удаляет блок из памяти и с диска

checkpoint()

Сохраняет файл в директории checkpoint, так что все ссылки на его родительский RDD будут удалены

Примеры

Чтобы запустить Spark shell, выполните команду:

$ ./bin/spark-shell

Ниже приведены примеры операций над RDD на языке Scala.

Чтение файла из локальной файловой системы:

val data = sc.textFile("dataset1.txt")

Создание RDD посредством распараллеливания:

val num = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
val Dataset = sc.parallelize(num)

Подсчет всех элементов в RDD:

Dataset.count()

Запись выходных/обработанных данных в текстовый файл:

counts.saveAsTextFile("output")

Фильтрация RDD:

val DFData = data.filter(line => line.contains("one"))
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней