Details
-
Improvement
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
3.1.0
-
None
-
None
Description
When determining whether to do a broadcast join, Spark estimates the size of the smaller table as follows:
- if totalSize is defined and greater than 0, use it.
- else, if rawDataSize is defined and greater than 0, use it
- else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
Therefore, Spark prefers totalSize over rawDataSize.
Unfortunately, totalSize is often quite a bit smaller than the actual table size, since it represents the size of the table's files on disk. Parquet and Orc files, for example, are encoded and compressed. This can result in the JVM throwing an OutOfMemoryError while Spark is loading the table into a HashedRelation, or when Spark actually attempts to broadcast the data.
On the other hand, rawDataSize represents the uncompressed size of the dataset, according to Hive documentation. This seems like a pretty good number to use in preference to totalSize. However, due to HIVE-20079, this value is simply #columns * #rows. Once that bug is fixed, it may be a superior statistic, at least for managed tables.
In the meantime, we could apply a configurable "fudge factor" to totalSize, at least for types of files that are encoded and compressed. Hive has the setting hive.stats.deserialization.factor, which defaults to 1.0, and is described as follows:
in the absence of uncompressed/raw data size, total file size will be used for statistics annotation. But the file may be compressed, encoded and serialized which may be lesser in size than the actual uncompressed/raw data size. This factor will be multiplied to file size to estimate the raw data size.
Also, I propose a configuration setting to allow the user to completely ignore rawDataSize, since that value is broken (due to HIVE-20079). When that configuration setting is set to true, Spark would instead estimate the table as follows:
- if totalSize is defined and greater than 0, use totalSize*fudgeFactor.
- else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
Caveat: This mitigates the issue only for Hive tables. It does not help much when the user is reading files using spark.read.parquet, unless we apply the same fudge factor there.
Attachments
Issue Links
- is related to
-
SPARK-30712 Estimate sizeInBytes from file metadata for parquet files
- Open
- relates to
-
SPARK-30713 Respect mapOutputSize in memory in adaptive execution
- Open
-
SPARK-35833 The Statistics size of PARQUET table is not estimated correctly
- Open
-
SPARK-40038 spark.sql.files.maxPartitionBytes does not observe on-disk compression
- Open
- links to