Scheduler jobs

When you run Spark in the cluster mode, there are two scopes where scheduling and resource allocation is applicable:

  • Scheduling across a cluster. This assumes scheduling of separate Spark applications that run on different cluster nodes.

  • Scheduling within an application. This assumes scheduling of parallel Spark jobs like collect, save, etc., within one Spark application.

Schedule across a cluster

To run Spark in the cluster mode, a cluster manager like YARN can be used to schedule Spark jobs and allocate resources for them on each cluster host.

Static resource allocation

One way to control resource allocation with YARN is by specifying fixed resource amount when running spark-submit (hence called static resource allocation). Below is an example of running Spark on YARN with special flags that specify the amount of allocated resources.

$ ./bin/spark-submit --class <app-class-name> \
    --master yarn \ (1)
    --deploy-mode cluster \ (2)
    --driver-memory 4g \ (3)
    --num-executors (4)
    --executor-memory 2g \ (5)
    --executor-cores 1 \ (6)
    ...
1 Defines the cluster manager to use.
2 Instructs Spark to run the driver program on worker nodes.
3 The maximum memory to allocate for the driver program.
4 The number of executors to allocate on the cluster.
5 The maximum memory to allocate per one executor.
6 The number of CPU cores to allocate per one executor.

Dynamic resource allocation

Another approach to control the allocation of cluster resources is the Spark’s built-in dynamic resource allocation feature. With this feature enabled, Spark can dynamically allocate and deallocate executors based on the application workload. To enable the feature for Spark/Spark3 ADH services, use the Switch dynamic resource allocation action in ADCM UI.

Use an external shuffle service

When dynamic resource allocation is enabled, you might want to switch to an external shuffle service that provides better reliability and resource consumption. Spark shuffle service (SS) is responsible for moving blocks of data between Spark executors or between executors and the driver program during the shuffle phase. By default, Spark uses the SS implementation for YARN that runs on every NodeManager of the YARN cluster. The default SS implementation is available at /usr/lib/spark/yarn/spark-{version}-yarn-shuffle.jar. However, this SS implementation has a drawback — it removes all the shuffle data persisted on disk if an executor dies.

For maximum reliability and speed, you might want to use an external SS implementation, and below are the steps on wiring your SS to the Spark cluster:

  1. Add the JAR with your SS implementation to the classpath, e.g /usr/lib/spark/*. When creating your own SS implementation, you can refer to /usr/lib/spark/yarn/spark-{version}-yarn-shuffle.jar as an example.

  2. In ADCM, go to Clusters → <YOUR_CLUSTER> → Services → YARN → Primary configuration, and specify your SS class name in the yarn.nodemanager.aux-services.spark_shuffle.class field.

  3. Increase each NodeManager’s heap size by setting YARN_HEAPSIZE (1000 by default) in etc/hadoop/yarn-env.sh to avoid GC issues during the shuffle phase.

  4. Restart YARN.

Alternatively, you can provide SS configuration in the spark-shuffle-site.xml file and put this file to the SS classpath. For more details on SS configuration properties, see Spark documentation.

Schedule within application

Inside a Spark application (hereinafter an "application" refers to a single SparkContext instance) scheduling and resource allocation is also applicable as multiple parallel Spark jobs can run simultaneously if they are submitted from separate threads. In this section a "job" means a Spark action like reduce, save, collect, etc., as well as any related tasks required to evaluate that action.

FIFO and Fair sharing modes

By default, Spark jobs run in the FIFO order. Once submitted, each job gets split into stages (for example, map/reduce phases), and the first job to start gets the priority on all available resources while its stages have tasks to run. Then, the second job gets the priority, and so on. If a job at the head of the queue does not need all the available resources, the pending jobs can start right away, but if the jobs at the head of the queue are compute-intensive, the queued jobs may be delayed significantly.

Apart from FIFO, Spark can also run jobs in the fair sharing mode. In this mode, Spark assigns tasks between jobs in a round-robin fashion, so that all jobs get roughly equal shares of cluster resources. This means that short jobs submitted while a long job is still running can start receiving resources right away without waiting for the long job to complete, thus still showing good response time. Fair sharing is disabled by default and can be enabled by setting spark.scheduler.mode=FAIR on a SparkContext instance as shown below:

val conf = new SparkConf()
          .setMaster(...)
          .setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

Fair scheduler pools

Spark allows you to group jobs into pools and set various scheduling parameters (e.g. weight) for each pool. This can be useful to create high-priority pools for more important jobs.

By default, all submitted jobs go to the default pool. You can force jobs to go to a specific pool by adding the spark.scheduler.pool local property to your SparkContext. This must be done in the same thread where jobs are submitted, for example:

sc.setLocalProperty("spark.scheduler.pool", "mPool")

By default, each pool gets an equal share of the cluster, and inside each pool, jobs run in the FIFO order. For example, if you create one pool per user, then each user will get an equal share of cluster resources, and the queries from each user will run one-by-one.

You can tune the scheduler pools behavior using the configuration properties below:

  • schedulingMode. Defines whether jobs within a pool should run either in the FIFO order or share the pool’s resources fairly. The value should be either FIFO (default) or FAIR.

  • weight. Defines the pool’s priority relative to other pools. For example, if you set a pool’s weight to 2, this pool gets 2x more resources than other active pools. Using high weight values (for example, 1000), you can implement the priority for several pools. The default weight is 1.

  • minShare. Apart from weight, each pool can be given minimum shares as a number of CPU cores. The fair scheduler always attempts to meet all active pools' minimum shares before redistributing extra resources according to the weights. Therefore, the minShare property can be another way to ensure that a pool can always get up to a certain number of resources (e.g. 10 cores) quickly without giving it a high priority for the rest of the cluster. By default, each pool’s minShare is 0.

The above properties should be defined in the fairscheduler.xml file similar to /etc/spark3/conf.dist/fairscheduler.xml.template as shown below.

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

Then, you should either put your fairscheduler.xml on the classpath, or set the SparkConf spark.scheduler.allocation.file property that points to your fairscheduler.xml, for example:

conf.set("spark.scheduler.allocation.file", "file:///path/to/local-file")
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/hdfs-file")

For more details on fair scheduler pool properties, see Spark documentation.

Found a mistake? Seleсt text and press Ctrl+Enter to report it