Overwrite non-partitioned Hive tables

This article describes specifics of writing data to non-partitioned Hive tables using the Spark3 service.

Default overwrite flow

By default, when Spark writes data to a non-partitioned Hive table in the overwrite mode, data loss may occur under certain conditions and configurations. A sample Spark code to write a DataFrame to a table in the overwrite mode is below.

df.write \
  .mode("overwrite") \
  .format("orc") \
  .insertInto(table_name)

The following Spark parameters can be used to ensure pushdowns and optimize operations:

spark.sql.hive.convertMetastoreOrc = true
spark.sql.hive.convertMetastoreParquet = true

If spark.sql.hive.convertMetastoreOrc is set to true, an overwrite operation may lead to intermittent data loss due to the nature of the default commit protocol. The algorithm of the default commit protocol is highlighted in the diagram below.

Default commit flow
Default commit flow
Default commit flow
Default commit flow

OverwriteOnCommitProtocol

To avoid this behavior and force Spark to perform an overwrite only after the DataFrame is successfully flushed to a temporary location, a special implementation of the commit protocol (OverwriteOnCommitProtocol) must be used. There are several ways to enable this protocol, which are described below.

  • Redefine globally

  • Redefine on job launch

  • Set explicitly when creating SparkSession

Using ADCM, specify the following property in Custom spark-defaults.conf (Clusters → <ADHclusterName> → Services → Spark3 → Primary configuration) and restart the Spark3 service:

spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.OverwriteOnCommitProtocol

This makes Spark use OverwriteOnCommitProtocol by default for all jobs.

To enable the protocol for a single job, use the startup parameter when running spark-submit:

$ spark-submit
    --conf spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.OverwriteOnCommitProtocol
    ..

You can specify the protocol when constructing a SparkSession object in your PySpark code:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.OverwriteOnCommitProtocol") \
.config("spark.sql.hive.convertMetastoreOrc", "true") \
.config("spark.sql.hive.convertMetastoreParquet", "true") \
.enableHiveSupport() \
.getOrCreate()

Prerequisites and recommendations

When using OverwriteOnCommitProtocol, consider the following:

  • Ensure that spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version is set to 1 (default value).

  • It is not recommended to use with an S3 object storage due to slow operation speed.

Found a mistake? Seleсt text and press Ctrl+Enter to report it