Spark performance tuning
This section describes Spark performance optimization techniques and provides relevant Spark configuration properties used to fine-tune the optimization features.
NOTE
Throughout this section, "Spark" refers to the Spark3 ADH service; some of the described features are not fully supported by the Spark ADH service.
|
Spark configuration properties mentioned in this section can be set using sparkSession.setConf("property=value")
or via Spark SQL SET key=value
commands.
For details on configuration parameters used for performance tuning, see Spark documentation.
Cache data in memory
Spark SQL can cache tables in memory using a columnar format by calling spark.catalog.cacheTable("tableName")
or dataFrame.cache()
.
After invoking these methods, Spark will scan only required columns and will automatically manage compression to minimize memory usage and GC load.
To remove a table from memory, use spark.catalog.uncacheTable("tableName")
or dataFrame.unpersist()
.
Property | Description | Default value |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
If set to |
true |
spark.sql.inMemoryColumnarStorage.batchSize |
Controls the batch size for columnar caching. A larger batch size can improve memory utilization/compression at the cost of higher OOMs risk |
10000 |
spark.sql.files.maxPartitionBytes |
The maximum number of bytes to pack into a single partition when reading files. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON |
134217728 (128 MB) |
spark.sql.files.openCostInBytes |
Affects how many partitions the input data will be read into. The estimated "cost" to open a file, measured by the number of bytes that can be scanned at a time. This setting is useful when putting multiple files into a partition. It is better to over-estimate the cost, in this case the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This property is effective only when using file-based data sources such as Parquet, ORC, and JSON |
4194304 (4 MB) |
spark.sql.files.minPartitionNum |
The suggested (but not guaranteed) minimum number of split file partitions.
If not set, the default value is taken from |
Value of |
spark.sql.broadcastTimeout |
The timeout in seconds to complete a broadcast operation during broadcast JOINs |
300 |
spark.sql.autoBroadcastJoinThreshold |
The maximum size (in bytes) for a table to be broadcast to all worker nodes during a JOIN.
Setting this value to |
10485760 (10 MB) |
spark.sql.shuffle.partitions |
Sets the number of partitions to use when shuffling data for joins/aggregations |
200 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
Sets a threshold to enable parallel listing for job input paths. If the number of input paths is larger than this threshold, Spark will list the files by using a Spark distributed job. Otherwise, it will fall back to sequential listing. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON |
32 |
spark.sql.sources.parallelPartitionDiscovery.parallelism |
Sets the maximum listing parallelism for job input paths. If the number of input paths is larger than this value, it will be throttled down to this value. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON |
10000 |
Use JOIN strategy hints
Spark SQL JOINs are transformations that involve massive data shuffling over the network hence they may have performance issues when not designed with care.
Using JOIN strategy hints you can instruct Spark to follow a specific JOIN strategy for a given table.
You can greatly improve Spark JOIN performance by hinting a strategy that best fits the particular JOIN case.
An example of joining 2 tables using the BROADCAST
hint is below.
val joined_table = spark.table("src").join(spark.table("records").hint("broadcast"), "key")
If several hints are added for a table, Spark applies hints with the following priority:
-
BROADCAST
-
MERGE
-
SHUFFLE_HASH
-
SHUFFLE_REPLACE_NL
NOTE
There is no guarantee that Spark will actually use the hinted strategy since the strategy may not support all the JOIN types.
|
Use partitioning hints
In Spark SQL, partitioning hints allow you to suggest a specific partitioning strategy. The following partitioning hints are available:
-
COALESCE. Used to control the number of output files. Accepts a partition number as a parameter.
-
REPARTITION. Used to repartition to the specified number of partitions. Accepts a partition number, columns, or both/neither of them as parameters.
-
REPARTITION_BY_RANGE. Accepts column names and optional partition number.
-
REBALANCE. Used to re-balance the query results so that the partitions become of the reasonable size, without a significant skew.
The hints usage examples are below.
SELECT /*+ COALESCE(5) */ * FROM t
SELECT /*+ REPARTITION(5) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(5, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(5, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(5) */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t
SELECT /*+ REBALANCE(5, c) */ * FROM t
Adaptive query execution
Adaptive Query Execution (AQE) is an optimization technique that relies on the run-time statistics to choose the most efficient query execution plan.
AQE is enabled by default and can be disabled using the spark.sql.adaptive.enabled
property.
The major AQE features are described below.
Coalesce post-shuffle partitions
This AQE feature coalesces the post-shuffle partitions based on the output statistics, so you do not need to manually set a shuffle partition number that would fit your dataset.
Instead, Spark will pick a suitable shuffle partition number at run time (assuming you have set a large enough initial number of shuffle partitions via the spark.sql.adaptive.coalescePartitions.initialPartitionNum
property).
This feature is activated by setting both spark.sql.adaptive.enabled=true
and spark.sql.adaptive.coalescePartitions.enabled=true
.
Property | Description | Default value |
---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
If set to |
true |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
If set to |
true |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
The minimum size of shuffle partitions after coalescing.
This value can be at most 20% of |
1 MB |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
The initial number of shuffle partitions before coalescing.
If not set, the value is equal to |
— |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
The advisory size in bytes for shuffle partitions during adaptive optimization (when |
64 MB |
Convert sort-merge JOIN to broadcast JOIN
AQE automatically converts sort-merge JOIN to the broadcast hash JOIN when the run-time statistics of any JOINed side is smaller than the adaptive broadcast hash JOIN threshold (defined by spark.sql.adaptive.autoBroadcastJoinThreshold
).
This is not as efficient as using the broadcast hash JOIN initially, but it is still more efficient than sort-merge joining, as you can save sorting of both JOIN sides, and read shuffle files locally to reduce network traffic (assuming spark.sql.adaptive.localShuffleReader.enabled=true
).
Property | Description | Default value |
---|---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
Sets the maximum size in bytes for a table to be broadcast to all worker nodes during a join.
Setting this value to |
— |
Convert sort-merge JOIN to shuffled hash JOIN
AQE automatically converts sort-merge JOINs to shuffled hash JOINs when all post-shuffle partitions are smaller than the threshold defined by spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
.
Property | Description | Default value |
---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
Sets the maximum size in bytes per partition that can be allowed to build a local hash map.
If this value is not smaller than |
0 |
Optimize skew JOIN
Data skew often leads to severe degradation of JOIN operations.
This AQE feature dynamically handles a skew in sort-merge JOINs by splitting (and replicating if needed) skewed tasks into more or less evenly sized tasks.
The feature is activated when both spark.sql.adaptive.enabled=true
and spark.sql.adaptive.skewJoin.enabled=true
are set.
Property | Description | Default value |
---|---|---|
spark.sql.adaptive.skewJoin.enabled |
If set to |
true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
A partition is considered skewed if its size (in bytes) is larger than this factor multiplied by the median partition size, and the partition is also larger than |
5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
A partition is considered skewed if its size (in bytes) is larger than this threshold and the partition is also larger than |
256MB |