spark-shell

Spark shell provides an environment for users to use the Spark functions. It has a lot of different commands that can be used to process data in the interactive shell.

The core concept in Apache Spark is RDD (Resilient Distributed Datasheet). It is an immutable distributed collection of data that is partitioned across machines in a cluster.

Transformation functions

A transformation function is an operation on an RDD such as filter(), map() or union() that yields another RDD.

Function Description

map(function)

Returns a new RDD by applying a function on element

filter(function)

Returns a new dataset formed by selecting those source elements for which the function returns true

filterByRange(lower, upper)

Returns an RDD with elements in the specified range

flatMap(function)

Similar to the map() function but returns a sequence instead of a value

reduceByKey(function,[num Tasks])

It is used to aggregate values of a key using a function

groupByKey([num Tasks])

Used to convert(K,V) to (K, <iterable V>)

distinct([num Tasks])

Used to eliminate duplicates from RDD

mapPartitions(function)

Runs separately on each partition of RDD

mapPartitionsWithIndex(function)

Provides function with an integer value representing the index of the partition

sample(withReplacement, fraction, seed)

Samples a fraction of data using a given random number generating seeds

union(anotherRDD)

Returns a new RDD containing all elements from the source and anotherRDD RDDs

intersection(anotherRDD)

Returns a new RDD that contains an intersection of elements in the source and anotherRDD datasets

cartesian()

Returns the Cartesian product of all pair of elements

subtract(anotherRDD)

Creates new RDD by removing the content of one RDD using another RDD

join(RDD,[numTasks])

Joins two elements of the dataset with common arguments. When invoked on (A,B) and (A,C), it creates a new RDD (A,(B,C))

cogroup(RDD,[numTasks])

Converts (A,B) to (A, <iterable B>)

Action functions

An action function is an operation that triggers a computation such as count(), first(), take(n) or collect().

Function Description

count()

Gets the number of data elements in the RDD

collect()

Gets all the data elements in the RDD as an array

reduce(function)

Aggregates data elements into the RDD by taking two arguments and returning one

take(n)

Fetches the first n elements of the RDD

foreach(function)

Executes function for each data element in the RDD

first()

Retrieves the first data element of the RDD

saveastextfile(path)

Writes the content of RDD to a text file or set of text files to the local system

takeordered(n, [ordering])

Returns the first n elements of RDD using either the natural ordering or a custom comparator

Persistence storage levels

A storage level specifies how and where to persist or cache an RDD, DataFrame, or Dataset.

Level Description

MEMORY_ONLY(default level)

Stores the RDD in available cluster memory as a deserialized Java object

MEMORY_AND_DISK

Stores the RDD as a deserialized Java object. If the RDD does not fit in the cluster memory, it stores the partitions on the disk and reads them

MEMORY_ONLY_SER

Stores the RDD as a serialized Java object. This operation is more CPU intensive

MEMORY_ONLY_DISK_SER

This option is the same as MEMORY_ONLY_SER but stores the data on disk when the memory is not sufficient

DISC_ONLY

Stores RDD only on the disk

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

This option is the same as other levels except that the partitions are replicated on 2 slave nodes

Persistence functions

Spark’s persistence functions allow to persist (or cache) a dataset in memory across operations. When you persist an RDD, each node stores RDD partitions in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future operations on the dataset to be much faster. Caching is a key tool for iterative algorithms and fast interactive use.

Function Description

cache()

Used to avoid unnecessary re-computation. This is same as persist(MEMORY_ONLY)

persist(<storage_level>)

Persists the RDD with the given storage level

unpersist()

Marks the RDD as non-persistent and removes the block from memory and disk

checkpoint()

Saves a file inside the checkpoint directory. All the references of its parent RDD will be removed

Examples

To start the Spark shell, run the following:

$ ./bin/spark-shell

Below you can find sample RDD operations in Scala.

Reading a file from the local file system:

val data = sc.textFile("dataset1.txt")

Creating an RDD through parallelizing:

val num = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
val Dataset = sc.parallelize(num)

Counting items in RDD:

Dataset.count()

Saving output/processed data into the text file:

counts.saveAsTextFile("output")

Filtering an RDD:

val DFData = data.filter(line => line.contains("one"))
Found a mistake? Seleсt text and press Ctrl+Enter to report it