Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27969

Non-deterministic expressions in filters or projects can unnecessarily prevent all scan-time column pruning, harming performance

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.0
    • None
    • Optimizer, SQL

    Description

      If a scan operator is followed by a projection or filter and those operators contain any non-deterministic expressions then scan column pruning optimizations are completely skipped, harming query performance.

      Here's an example of the problem:

      import org.apache.spark.sql.functions._
      val df = spark.createDataset(Seq(
        (1, 2, 3, 4, 5),
        (1, 2, 3, 4, 5)
      ))
      val tmpPath = java.nio.file.Files.createTempDirectory("column-pruning-bug").toString()
      df.write.parquet(tmpPath)
      val fromParquet = spark.read.parquet(tmpPath)

      If all expressions are deterministic then, as expected, column pruning is pushed into the scan

      fromParquet.select("_1").explain
      
      == Physical Plan == *(1) FileScan parquet [_1#68] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug7865798834978814548], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>

      However, if we add a non-deterministic filter then no column pruning is performed (even though pruning would be safe!):

      fromParquet.select("_1").filter(rand() =!= 0).explain
      
      == Physical Plan ==
      *(1) Project [_1#127]
      +- *(1) Filter NOT (rand(-1515289268025792238) = 0.0)
      +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>

      Similarly, a non-deterministic expression in a second projection can end up being collapsed such that it prevents column pruning:

      fromParquet.select("_1").select($"_1", rand()).explain
      
      == Physical Plan ==
      *(1) Project [_1#127, rand(1267140591146561563) AS rand(1267140591146561563)#141]
      +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>
      

      I believe that this is caused by SPARK-10316: the Parquet column pruning code relies on the PhysicalOperation unapply method for extracting projects and filters and this helper purposely fails to match if any projection or filter is non-deterministic.

      It looks like this conservative behavior may have originally been added to avoid pushdown / re-ordering of non-deterministic filter expressions. However, in this case I feel that it's too conservative: even though we can't push down non-deterministic filters we should still be able to perform column pruning. 

      /cc Wenchen Fan and Michael Armbrust (it looks like you discussed collapsing of non-deterministic projects in the SPARK-10316 PR, which is related to why the third example above did not prune).

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            joshrosen Josh Rosen
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment