Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27213

Unexpected results when filter is used after distinct

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.3.2, 2.4.0
    • Fix Version/s: None
    • Component/s: PySpark

      Description

      The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.show(5)
      df.filter("y_n='y'").select('x','y','z').distinct().show()
      df.select('x','y','z').distinct().filter("y_n='y'").show()
      
      Output
      x y z y_n
      a 123 12.3 n
      a 123 12.3 y
      a 123 12.4 y

       

      x y z
      a 123 12.3
      a 123 12.4

       

      x y z
      a 123 12.4

      Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter.

      Even if the filter was pushed down, the result would have been accurate.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
      df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
      
      Output

       
      == Parsed Logical Plan ==
      Deduplicate x#74, y#75, z#76
      +- AnalysisBarrier
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Aggregate x#74, y#75, z#76, x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(2) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- *(1) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- *(1) Project x#74, y#75, z#76
      +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
       
      ------------------------------------------------------------------------------------------------------------------- 
       
      == Parsed Logical Plan ==
      'Filter ('y_n = y)
      +- AnalysisBarrier
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76, y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Aggregate x#74, y#75, z#76, x#74, y#75, z#76, first(y_n#77, false) AS y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(3) Project x#74, y#75, z#76
      +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- SortAggregate(key=x#74, y#75, z#76, functions=first(y_n#77, false), output=x#74, y#75, z#76, y_n#77)
      +- *(2) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- SortAggregate(key=x#74, y#75, z#76, functions=partial_first(y_n#77, false), output=x#74, y#75, z#76, first#95, valueSet#96)
      +- *(1) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
        

      The second query. ie "df.select('x','y','z').distinct().filter("y_n='y'").explain(True)" should result in error rather than giving wrong output.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                rinazbelhaj Rinaz Belhaj
                Shepherd:
                Holden Karau
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated: