spark-shell
Spark shell предоставляет среду для использования функционала Spark. Доступны множество различных команд, которые можно использовать для обработки данных в интерактивной оболочке.
Основной концепцией Apache Spark является устойчивый распределенный набор данных (Resilient Distributed Datasheet, RDD). Это неизменяемая распределенная коллекция данных, разделенная по машинам в кластере.
Функции преобразования
Функции преобразования (transformation functions) — это операции над RDD, такие как filter()
, map()
или union()
, которые порождают новый RDD.
Функция | Описание |
---|---|
map(function) |
Возвращает новый RDD, применяя функцию к элементу |
filter(function) |
Возвращает новый dataset из тех исходных элементов, для которых функция возвращает значение |
filterByRange(lower, upper) |
Возвращает RDD с элементами в указанном диапазоне |
flatMap(function) |
Действует подобно |
reduceByKey(function,[num Tasks]) |
Агрегирует значения ключа с помощью функции |
groupByKey([num Tasks]) |
Преобразует (K, V) в (K, <iterable V>) |
distinct([num Tasks]) |
Устраняет дубликаты из RDD |
mapPartitions(function) |
Запускается отдельно на каждом разделе RDD |
mapPartitionsWithIndex(function) |
Применяет функцию к разделам, отмеченным числовым значением |
sample(withReplacement, fraction, seed) |
Используется для выборки части данных с использованием заданного случайного числа |
union(anotherRDD) |
Возвращает новый RDD, содержащий все элементы исходного и |
intersection(anotherRDD) |
Возвращает новый RDD, содержащий пересечение элементов в исходном и |
cartesian() |
Возвращает декартово произведение всех пар элементов |
subtract(anotherRDD) |
Создает новый RDD, удаляя элементы |
join(RDD,[numTasks]) |
Объединяет два элемента набора данных с общими аргументами. При объединении (A,B) и (A,C) создает новый RDD (A,(B,C)) |
cogroup(RDD,[numTasks]) |
Преобразует (A,B) в (A, <iterable B>) |
Функции действия
Функции действия (Action functions) — это операции, которые запускают вычисления, такие как count()
, first()
, take(n)
или collect()
.
Функция | Описание |
---|---|
count() |
Возвращает количество элементов в RDD |
collect() |
Возвращает количество элементов в RDD в виде массива |
reduce(function) |
Агрегирует элементы данных в RDD, принимая два аргумента и возвращая один |
take(n) |
Выбирает первые |
foreach(function) |
Выполняет функцию для каждого элемента данных в RDD |
first() |
Получает первый элемент данных RDD |
saveastextfile(path) |
Записывает содержимое RDD в один или несколько текстовых файлов в локальной файловой системе |
takeordered(n, [ordering]) |
Возвращает первые |
Уровни персистентного хранения
Уровни персистентного хранения (Persistence storage levels) определяют, где и каким образом сохранять или кешировать RDD, DataFrame или Dataset.
Уровень | Описание |
---|---|
MEMORY_ONLY |
Значение по умолчанию. Сохраняет RDD в памяти кластера в виде десериализованного Java-объекта |
MEMORY_AND_DISK |
Сохраняет в виде десериализованного Java-объекта. Если RDD не помещается в памяти кластера, он сохраняет разделы (partitions) на диске и считывает их |
MEMORY_ONLY_SER |
Сохраняет RDD в виде десериализованного Java-объекта. Эта операция более ЦПУ-затратна |
MEMORY_ONLY_DISK_SER |
Эта опция аналогична |
DISC_ONLY |
Сохраняет RDD только на диск |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 и так далее |
Эта опция аналогична другим уровням, за исключением того, что разделы реплицируются на 2 slave-узла |
Функции персистентности
Функции персистентности Spark (persistence functions) позволяют сохранять (или кешировать) dataset в памяти между операциями. При сохранении RDD каждый узел кластера сохраняет разделы RDD в памяти и использует их при выполнении операций над dataset (или производными этого dataset). Это увеличивает скорость выполнения последующих операций. Подобное кеширование является ключевым инструментом для итерационных алгоритмов и быстрого интерактивного взаимодействия.
Функция | Описание |
---|---|
cache() |
Используется, чтобы избежать ненужных повторных вычислений.
Эквивалентно |
persist(<storage_level>) |
Сохраняет RDD с заданным уровнем хранения |
unpersist() |
Помечает RDD как неперсистентный (non-persistent) и удаляет блок из памяти и с диска |
checkpoint() |
Сохраняет файл в директории checkpoint, так что все ссылки на его родительский RDD будут удалены |
Примеры
Чтобы запустить Spark shell, выполните команду:
$ ./bin/spark-shell
Ниже приведены примеры операций над RDD на языке Scala.
Чтение файла из локальной файловой системы:
val data = sc.textFile("dataset1.txt")
Создание RDD посредством распараллеливания:
val num = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
val Dataset = sc.parallelize(num)
Подсчет всех элементов в RDD:
Dataset.count()
Запись выходных/обработанных данных в текстовый файл:
counts.saveAsTextFile("output")
Фильтрация RDD:
val DFData = data.filter(line => line.contains("one"))