Конференция Arenadata
Новое время — новый Greenplum
Мы приглашаем вас принять участие в конференции, посвященной будущему Open-Source Greenplum 19 сентября в 18:00:00 UTC +3. Встреча будет проходить в гибридном формате — и офлайн, и онлайн. Онлайн-трансляция будет доступна для всех желающих.
Внезапное закрытие Greenplum его владельцем — компанией Broadcom - стало неприятным сюрпризом для всех, кто использует или планирует начать использовать решения на базе этой технологии. Многие ожидают выхода стабильной версии Greenplum 7 и надеются на её дальнейшее активное развитие.
Arenadata не могла допустить, чтобы разрабатываемый годами Open-Source проект Greenplum прекратил своё существование, поэтому 19 сентября мы представим наш ответ на данное решение Broadcom, а участники сообщества получат исчерпывающие разъяснения на все вопросы о дальнейшей судьбе этой технологии.

На конференции вас ждёт обсуждение следующих тем:

  • План возрождения Greenplum;
  • Дорожная карта;
  • Экспертное обсуждение и консультации.
Осталось до события

Spark performance tuning

This section describes Spark performance optimization techniques and provides relevant Spark configuration properties used to fine-tune the optimization features.

NOTE
Throughout this section, "Spark" refers to the Spark3 ADH service; some of the described features are not fully supported by the Spark ADH service.

Spark configuration properties mentioned in this section can be set using sparkSession.setConf("property=value") or via Spark SQL SET key=value commands. For details on configuration parameters used for performance tuning, see Spark documentation.

Cache data in memory

Spark SQL can cache tables in memory using a columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache(). After invoking these methods, Spark will scan only required columns and will automatically manage compression to minimize memory usage and GC load. To remove a table from memory, use spark.catalog.uncacheTable("tableName") or dataFrame.unpersist().

Property Description Default value

spark.sql.inMemoryColumnarStorage.compressed

If set to true, Spark automatically selects a compression codec for each column based on data statistics

true

spark.sql.inMemoryColumnarStorage.batchSize

Controls the batch size for columnar caching. A larger batch size can improve memory utilization/compression at the cost of higher OOMs risk

10000

spark.sql.files.maxPartitionBytes

The maximum number of bytes to pack into a single partition when reading files. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON

134217728 (128 MB)

spark.sql.files.openCostInBytes

Affects how many partitions the input data will be read into. The estimated "cost" to open a file, measured by the number of bytes that can be scanned at a time. This setting is useful when putting multiple files into a partition. It is better to over-estimate the cost, in this case the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This property is effective only when using file-based data sources such as Parquet, ORC, and JSON

4194304 (4 MB)

spark.sql.files.minPartitionNum

The suggested (but not guaranteed) minimum number of split file partitions. If not set, the default value is taken from spark.default.parallelism. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON

Value of spark.default.parallelism

spark.sql.broadcastTimeout

The timeout in seconds to complete a broadcast operation during broadcast JOINs

300

spark.sql.autoBroadcastJoinThreshold

The maximum size (in bytes) for a table to be broadcast to all worker nodes during a JOIN. Setting this value to -1 disables the broadcasting

10485760 (10 MB)

spark.sql.shuffle.partitions

Sets the number of partitions to use when shuffling data for joins/aggregations

200

spark.sql.sources.parallelPartitionDiscovery.threshold

Sets a threshold to enable parallel listing for job input paths. If the number of input paths is larger than this threshold, Spark will list the files by using a Spark distributed job. Otherwise, it will fall back to sequential listing. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON

32

spark.sql.sources.parallelPartitionDiscovery.parallelism

Sets the maximum listing parallelism for job input paths. If the number of input paths is larger than this value, it will be throttled down to this value. This property is effective only when using file-based data sources such as Parquet, ORC, and JSON

10000

Use JOIN strategy hints

Spark SQL JOINs are transformations that involve massive data shuffling over the network hence they may have performance issues when not designed with care. Using JOIN strategy hints you can instruct Spark to follow a specific JOIN strategy for a given table. You can greatly improve Spark JOIN performance by hinting a strategy that best fits the particular JOIN case. An example of joining 2 tables using the BROADCAST hint is below.

val joined_table = spark.table("src").join(spark.table("records").hint("broadcast"), "key")

If several hints are added for a table, Spark applies hints with the following priority:

  1. BROADCAST

  2. MERGE

  3. SHUFFLE_HASH

  4. SHUFFLE_REPLACE_NL

NOTE
There is no guarantee that Spark will actually use the hinted strategy since the strategy may not support all the JOIN types.

Use partitioning hints

In Spark SQL, partitioning hints allow you to suggest a specific partitioning strategy. The following partitioning hints are available:

  • COALESCE. Used to control the number of output files. Accepts a partition number as a parameter.

  • REPARTITION. Used to repartition to the specified number of partitions. Accepts a partition number, columns, or both/neither of them as parameters.

  • REPARTITION_BY_RANGE. Accepts column names and optional partition number.

  • REBALANCE. Used to re-balance the query results so that the partitions become of the reasonable size, without a significant skew.

The hints usage examples are below.

SELECT /*+ COALESCE(5) */ * FROM t
SELECT /*+ REPARTITION(5) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(5, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(5, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(5) */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t
SELECT /*+ REBALANCE(5, c) */ * FROM t

Adaptive query execution

Adaptive Query Execution (AQE) is an optimization technique that relies on the run-time statistics to choose the most efficient query execution plan. AQE is enabled by default and can be disabled using the spark.sql.adaptive.enabled property. The major AQE features are described below.

Coalesce post-shuffle partitions

This AQE feature coalesces the post-shuffle partitions based on the output statistics, so you do not need to manually set a shuffle partition number that would fit your dataset. Instead, Spark will pick a suitable shuffle partition number at run time (assuming you have set a large enough initial number of shuffle partitions via the spark.sql.adaptive.coalescePartitions.initialPartitionNum property).

This feature is activated by setting both spark.sql.adaptive.enabled=true and spark.sql.adaptive.coalescePartitions.enabled=true.

Property Description Default value

spark.sql.adaptive.coalescePartitions.enabled

If set to true and spark.sql.adaptive.enabled=true, Spark will coalesce contiguous shuffle partitions according to the target size (defined by spark.sql.adaptive.advisoryPartitionSizeInBytes) to avoid too many small tasks

true

spark.sql.adaptive.coalescePartitions.parallelismFirst

If set to true, Spark ignores the target size specified by spark.sql.adaptive.advisoryPartitionSizeInBytes (defaults to 64 MB) when coalescing contiguous shuffle partitions, and only respects the minimum partition size specified by spark.sql.adaptive.coalescePartitions.minPartitionSize (defaults to 1 MB) to maximize the parallelism. This allows avoiding performance regression with enabled AQE. It is recommended to set this property to false and respect the target size specified by spark.sql.adaptive.advisoryPartitionSizeInBytes

true

spark.sql.adaptive.coalescePartitions.minPartitionSize

The minimum size of shuffle partitions after coalescing. This value can be at most 20% of spark.sql.adaptive.advisoryPartitionSizeInBytes. This parameter is useful when the target size is ignored during partition coalescing, which is the default behavior

1 MB

spark.sql.adaptive.coalescePartitions.initialPartitionNum

The initial number of shuffle partitions before coalescing. If not set, the value is equal to spark.sql.shuffle.partitions. This property only has an effect if spark.sql.adaptive.enabled=true and spark.sql.adaptive.coalescePartitions.enabled=true

 — 

spark.sql.adaptive.advisoryPartitionSizeInBytes

The advisory size in bytes for shuffle partitions during adaptive optimization (when spark.sql.adaptive.enabled=true). This property has an effect when Spark coalesces small shuffle partitions or splits skewed shuffle partitions

64 MB

Convert sort-merge JOIN to broadcast JOIN

AQE automatically converts sort-merge JOIN to the broadcast hash JOIN when the run-time statistics of any JOINed side is smaller than the adaptive broadcast hash JOIN threshold (defined by spark.sql.adaptive.autoBroadcastJoinThreshold). This is not as efficient as using the broadcast hash JOIN initially, but it is still more efficient than sort-merge joining, as you can save sorting of both JOIN sides, and read shuffle files locally to reduce network traffic (assuming spark.sql.adaptive.localShuffleReader.enabled=true).

Property Description Default value

spark.sql.adaptive.autoBroadcastJoinThreshold

Sets the maximum size in bytes for a table to be broadcast to all worker nodes during a join. Setting this value to -1 disables the broadcasting. The default value is taken from spark.sql.autoBroadcastJoinThreshold. Notice that this parameter is used only in AQE

 — 

Convert sort-merge JOIN to shuffled hash JOIN

AQE automatically converts sort-merge JOINs to shuffled hash JOINs when all post-shuffle partitions are smaller than the threshold defined by spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold.

Property Description Default value

spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

Sets the maximum size in bytes per partition that can be allowed to build a local hash map. If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partitions do not exceed this value, Spark prefers to use the shuffled hash JOIN instead of sort merge JOIN regardless of the value of spark.sql.join.preferSortMergeJoin

0

Optimize skew JOIN

Data skew often leads to severe degradation of JOIN operations. This AQE feature dynamically handles a skew in sort-merge JOINs by splitting (and replicating if needed) skewed tasks into more or less evenly sized tasks. The feature is activated when both spark.sql.adaptive.enabled=true and spark.sql.adaptive.skewJoin.enabled=true are set.

Property Description Default value

spark.sql.adaptive.skewJoin.enabled

If set to true and spark.sql.adaptive.enabled=true, Spark dynamically handles the skew in a sort-merge JOIN by splitting (and replicating if needed) skewed partitions

true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

A partition is considered skewed if its size (in bytes) is larger than this factor multiplied by the median partition size, and the partition is also larger than spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

A partition is considered skewed if its size (in bytes) is larger than this threshold and the partition is also larger than spark.sql.adaptive.skewJoin.skewedPartitionFactor multiplied by the median partition size. This property should be larger than spark.sql.adaptive.advisoryPartitionSizeInBytes

256MB

Found a mistake? Seleсt text and press Ctrl+Enter to report it