Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27213

Unexpected results when filter is used after distinct

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.2, 2.4.0
    • None
    • PySpark

    Description

      The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.show(5)
      df.filter("y_n='y'").select('x','y','z').distinct().show()
      df.select('x','y','z').distinct().filter("y_n='y'").show()
      
      Output
      x y z y_n
      a 123 12.3 n
      a 123 12.3 y
      a 123 12.4 y

       

      x y z
      a 123 12.3
      a 123 12.4

       

      x y z
      a 123 12.4

      Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter.

      Even if the filter was pushed down, the result would have been accurate.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
      df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
      
      Output

       
      == Parsed Logical Plan ==
      Deduplicate x#74, y#75, z#76
      +- AnalysisBarrier
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Aggregate x#74, y#75, z#76, x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(2) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- *(1) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- *(1) Project x#74, y#75, z#76
      +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
       
      ------------------------------------------------------------------------------------------------------------------- 
       
      == Parsed Logical Plan ==
      'Filter ('y_n = y)
      +- AnalysisBarrier
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76, y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Aggregate x#74, y#75, z#76, x#74, y#75, z#76, first(y_n#77, false) AS y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(3) Project x#74, y#75, z#76
      +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- SortAggregate(key=x#74, y#75, z#76, functions=first(y_n#77, false), output=x#74, y#75, z#76, y_n#77)
      +- *(2) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- SortAggregate(key=x#74, y#75, z#76, functions=partial_first(y_n#77, false), output=x#74, y#75, z#76, first#95, valueSet#96)
      +- *(1) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
        

      The second query. ie "df.select('x','y','z').distinct().filter("y_n='y'").explain(True)" should result in error rather than giving wrong output.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            rinazbelhaj Rinaz Belhaj
            Holden Karau Holden Karau
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment