Пример запуска задач c RDD

Spark resilient distributed dataset (RDD) — это отказоустойчивая коллекция, элементы которой распределены по узлам кластера и могут обрабатываться параллельно.

В данной статье показаны базовые примеры использования RDD на языке Scala, которые можно запускать в spark-shell. Также Spark поддерживает другие языки, например Java, Python и R.

Операции с RDD

Создание

Существует два способа создания нового RDD, примеры представлены ниже.

  • Используя метод parallelize()

  • Из внешнего источника данных, например, HDFS

Для создания RDD из коллекции можно использовать метод sparkContext.parallelize() в коде программы драйвера.

val dataSeq = Seq(("foo", 0), ("bar", 1), ("buzz", 2))
val rdd=sc.parallelize(dataSeq)

val dataArr = Array(0, 1, 2, 3)
val rdd1=sc.parallelize(dataArr)

val emptyRDD = sc.parallelize(Seq.empty[String])

Данные для RDD можно получить из внешнего хранилища, например, HDFS, HBase или любого другого источника данных, реализующего Hadoop InputFormat. Ниже показан способ чтения данных из HDFS.

val rdd2 = sc.textFile("hdfs://adh/user/admin/testload/file_test.csv")

После создания объекта RDD вы можете выполнять основные операции Spark над RDD.

Модификация

Все операции RDD делятся на два типа:

  • Трансформации (transformations), которые возвращают новый RDD. Например, map(), flatMap(), union() и так далее.

  • Действия (actions), которые возвращают некое значение программе-драйверу после выполнения вычислений над RDD. Например, collect(), count() и так далее.

Все операции с RDD, включая создание и модификацию, являются "ленивыми" (lazy), то есть они не выполняются сразу. Вычисления производятся только тогда, когда какой-то результат должен быть возвращен программе-драйверу, например, вызов rdd.take(10).

Полная справочная информация о поддерживаемых операциях доступна на странице Spark RDD API.

Просмотр содержимого

Для просмотра содержимого RDD используйте метод rdd.take(x), который возвращает первые x элементов RDD.

ПРИМЕЧАНИЕ
Используйте с осторожностью конструкции типа rdd.collect().foreach(println), так как это может привести к нехватке памяти на машине-драйвере Spark, поскольку collect() загружает данные RDD со всех узлов кластера на одну машину.

Пример

Следующий пример демонстрирует использование основных операций с RDD. Чтобы пример отработал без ошибок, соответствующий файл с данными (test_lorem.txt) должен быть доступен в HDFS. Создание тестового файла показано ниже.

  1. В локальной файловой системе создайте файл test_lorem.txt:

    echo "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus." > test_lorem.txt
  2. Создайте директорию в HDFS, установите права доступа и скопируйте тестовый файл в HDFS:

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
    $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/test_lorem.txt /user/admin/test_lorem.txt

    Необходимо убедиться, что пользователь hdfs имеет доступ к файлу test_lorem.txt.

Когда тестовые данные загружены в HDFS, вы можете выполнить следующий Scala-код непосредственно в spark-shell (/bin/spark-shell).

val rdd = sc.textFile("hdfs://adh/user/admin/test_lorem.txt")
println("Created RDD:")
rdd.take(10)

val resultRDD = rdd.flatMap(line => line.split(" ")) (1)
      .map(word => (word,1)) (2)
      .reduceByKey(_+_) (3)
      .sortBy(_._2, false) (4)

resultRDD.take(100).foreach(println)

resultRDD.persist() (5)
println("Writing RDD to a file: /user/admin/lorem_demo_result")
resultRDD.saveAsTextFile("hdfs://adh/user/admin/lorem_demo_result") (6)
1 Разбиение каждой строки текста исходного файла на отдельные слова.
2 Создание кортежа (<word>,1) для каждого слова.
3 Редуцирование коллекции таким образом, чтобы остались лишь уникальные слова с количеством повторений.
4 Сортировка слов по количеству повторений.
5 Кеширование RDD в памяти. В этот момент выполняется синхронизация RDD по всем узлам кластера.
6 Запись RDD в HDFS. В HDFS создается директория с частями (parts) данных RDD.

 

В результате выполнения примера в консоль выводится следующий результат.

Created RDD:
val res0: Array[String] = Array(Lorem ipsum dolor sit amet, ...)

val resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at sortBy at <console>:4

(Aenean,3)
(vulputate,2)
(pede,2)
(nec,,2)
(Donec,2)
(justo,,2)
(montes,,1)
...

Writing RDD to a file: /user/admin/lorem_demo_result

Запись RDD в HDFS в примере выше можно проверить с помощью команды:

$ sudo -u hdfs hdfs dfs -ls /user/admin/lorem_demo_result

Пример вывода:

Found 3 items
-rw-r--r--   3 admin admin          0 2023-07-25 20:55 /user/admin/lorem_demo_result/_SUCCESS
-rw-r--r--   3 admin admin         64 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00000
-rw-r--r--   3 admin admin        753 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00001
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней