Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27969

Non-deterministic expressions in filters or projects can unnecessarily prevent all scan-time column pruning, harming performance

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0
    • Fix Version/s: None
    • Component/s: Optimizer, SQL
    • Labels:
      None

      Description

      If a scan operator is followed by a projection or filter and those operators contain any non-deterministic expressions then scan column pruning optimizations are completely skipped, harming query performance.

      Here's an example of the problem:

      import org.apache.spark.sql.functions._
      val df = spark.createDataset(Seq(
        (1, 2, 3, 4, 5),
        (1, 2, 3, 4, 5)
      ))
      val tmpPath = java.nio.file.Files.createTempDirectory("column-pruning-bug").toString()
      df.write.parquet(tmpPath)
      val fromParquet = spark.read.parquet(tmpPath)

      If all expressions are deterministic then, as expected, column pruning is pushed into the scan

      fromParquet.select("_1").explain
      
      == Physical Plan == *(1) FileScan parquet [_1#68] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug7865798834978814548], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>

      However, if we add a non-deterministic filter then no column pruning is performed (even though pruning would be safe!):

      fromParquet.select("_1").filter(rand() =!= 0).explain
      
      == Physical Plan ==
      *(1) Project [_1#127]
      +- *(1) Filter NOT (rand(-1515289268025792238) = 0.0)
      +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>

      Similarly, a non-deterministic expression in a second projection can end up being collapsed such that it prevents column pruning:

      fromParquet.select("_1").select($"_1", rand()).explain
      
      == Physical Plan ==
      *(1) Project [_1#127, rand(1267140591146561563) AS rand(1267140591146561563)#141]
      +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>
      

      I believe that this is caused by SPARK-10316: the Parquet column pruning code relies on the PhysicalOperation unapply method for extracting projects and filters and this helper purposely fails to match if any projection or filter is non-deterministic.

      It looks like this conservative behavior may have originally been added to avoid pushdown / re-ordering of non-deterministic filter expressions. However, in this case I feel that it's too conservative: even though we can't push down non-deterministic filters we should still be able to perform column pruning. 

      /cc Wenchen Fan and Michael Armbrust (it looks like you discussed collapsing of non-deterministic projects in the SPARK-10316 PR, which is related to why the third example above did not prune).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                joshrosen Josh Rosen
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated: