Example of launching jobs with an RDD
Spark resilient distributed dataset (RDD) is a fault-tolerant collection of elements distributed across the nodes of a cluster that can be operated on in parallel.
This section provides basic RDD usage examples in Scala that can run in spark-shell. Also, Spark supports other programming languages like Java, Python, and R.
RDD operations
Create
There are two ways to create a new RDD, see the examples below.
To create a new RDD from a collection, you can use sparkContext.parallelize()
in the driver program code.
val dataSeq = Seq(("foo", 0), ("bar", 1), ("buzz", 2))
val rdd=sc.parallelize(dataSeq)
val dataArr = Array(0, 1, 2, 3)
val rdd1=sc.parallelize(dataArr)
val emptyRDD = sc.parallelize(Seq.empty[String])
You can create an RDD from an external storage like HDFS, HBase, or any other data source implementing Hadoop InputFormat. A way to read data from HDFS is below.
val rdd2 = sc.textFile("hdfs://adh/user/admin/file_test.csv")
Once an RDD object is created, you can perform main Spark operations on the RDD.
Modify
All RDD operations are divided into two types:
-
Transformations that return a new modified RDD. For example,
map()
,flatMap()
,union()
, etc. -
Actions that return a value to the driver program after running a computation on the RDD. For example,
collect()
,count()
, etc.
All RDD operations, including creation and modification, are lazy, i.e. they do not compute right away.
The computation is done only when some result must be returned to the driver program, for example rdd.take(10)
.
For a complete reference on supported operations, see Spark RDD API.
View contents
To view the contents of an RDD, use rdd.take(x)
that returns first x
elements of the RDD.
NOTE
Be careful with using constructs like rdd.collect().foreach(println) as this may cause the Spark driver machine to run out of memory, since collect() fetches the entire RDD from all the cluster nodes to a single machine.
|
Example
The snippet below summarizes basic RDD operations. To run the example without errors, create the corresponding test file (test_lorem.txt) in HDFS. To create the test file:
-
On your local file system, create the test_lorem.txt file with dummy contents:
echo "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus." > test_lorem.txt
-
Create a test directory in HDFS, provide access permissions, and copy the test file to HDFS:
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/test_lorem.txt /user/admin/test_lorem.txt
Please ensure that the
hdfs
user has access to the test_lorem.txt file.
Once the test data is loaded to HDFS, you can run the following Scala code in spark-shell (/bin/spark-shell).
val rdd = sc.textFile("hdfs://adh/user/admin/test_lorem.txt")
println("Created RDD:")
rdd.take(10)
val resultRDD = rdd.flatMap(line => line.split(" ")) (1)
.map(word => (word,1)) (2)
.reduceByKey(_+_) (3)
.sortBy(_._2, false) (4)
resultRDD.take(100).foreach(println)
resultRDD.persist() (5)
println("Writing RDD to a file: /user/admin/lorem_demo_result")
resultRDD.saveAsTextFile("hdfs://adh/user/admin/lorem_demo_result") (6)
1 | For each line of the original text file, split the string into words. |
2 | For each word, create a tuple (<word>,1) . |
3 | Sum up repetitive words and leave only unique words with number of occurrences. |
4 | Sort words by occurrence count. |
5 | Cache RDD in RAM. This synchronizes the RDD contents among all cluster nodes. |
6 | Store RDD contents to HDFS. This creates an HDFS directory with parts of RDD data. |
Running the example above outputs the following:
Created RDD: val res0: Array[String] = Array(Lorem ipsum dolor sit amet, ...) val resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at sortBy at <console>:4 (Aenean,3) (vulputate,2) (pede,2) (nec,,2) (Donec,2) (justo,,2) (montes,,1) ... Writing RDD to a file: /user/admin/lorem_demo_result
After running the example, check the RDD writes to HDFS:
$ sudo -u hdfs hdfs dfs -ls /user/admin/lorem_demo_result
This should return a similar output:
Found 3 items -rw-r--r-- 3 admin admin 0 2023-07-25 20:55 /user/admin/lorem_demo_result/_SUCCESS -rw-r--r-- 3 admin admin 64 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00000 -rw-r--r-- 3 admin admin 753 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00001