Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.0
-
None
Description
If a scan operator is followed by a projection or filter and those operators contain any non-deterministic expressions then scan column pruning optimizations are completely skipped, harming query performance.
Here's an example of the problem:
import org.apache.spark.sql.functions._ val df = spark.createDataset(Seq( (1, 2, 3, 4, 5), (1, 2, 3, 4, 5) )) val tmpPath = java.nio.file.Files.createTempDirectory("column-pruning-bug").toString() df.write.parquet(tmpPath) val fromParquet = spark.read.parquet(tmpPath)
If all expressions are deterministic then, as expected, column pruning is pushed into the scan
fromParquet.select("_1").explain == Physical Plan == *(1) FileScan parquet [_1#68] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug7865798834978814548], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>
However, if we add a non-deterministic filter then no column pruning is performed (even though pruning would be safe!):
fromParquet.select("_1").filter(rand() =!= 0).explain == Physical Plan == *(1) Project [_1#127] +- *(1) Filter NOT (rand(-1515289268025792238) = 0.0) +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>
Similarly, a non-deterministic expression in a second projection can end up being collapsed such that it prevents column pruning:
fromParquet.select("_1").select($"_1", rand()).explain == Physical Plan == *(1) Project [_1#127, rand(1267140591146561563) AS rand(1267140591146561563)#141] +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>
I believe that this is caused by SPARK-10316: the Parquet column pruning code relies on the PhysicalOperation unapply method for extracting projects and filters and this helper purposely fails to match if any projection or filter is non-deterministic.
It looks like this conservative behavior may have originally been added to avoid pushdown / re-ordering of non-deterministic filter expressions. However, in this case I feel that it's too conservative: even though we can't push down non-deterministic filters we should still be able to perform column pruning.
/cc cloud_fan and marmbrus (it looks like you discussed collapsing of non-deterministic projects in the SPARK-10316 PR, which is related to why the third example above did not prune).
Attachments
Issue Links
- blocks
-
SPARK-27761 Make UDF nondeterministic by default
- Open
- duplicates
-
SPARK-14172 Hive table partition predicate not passed down correctly
- Resolved
-
SPARK-21520 Improvement a special case for non-deterministic projects in optimizer
- Resolved
- is caused by
-
SPARK-10316 respect non-deterministic expressions in PhysicalOperation
- Resolved