Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24399

Reused Exchange is used where it should not be

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1
    • SQL

    Description

      Reused Exchange produces wrong result. Here is the code to reproduce the issue:

       
      import org.apache.spark.sql.functions.{sum, lit}
      import org.apache.spark.sql.expressions.Window
      
      
      val row1 = (1, 3, 4, 50)
      val row2 = (2, 2, 2, 250)
      val row3 = (3, 2, 4, 250)
      val row4 = (4, 3, 1, 350)
      val data = Seq(row1, row2, row3, row4)
      
      
      val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()
      
      val w = Window.partitionBy($"id")
      
      val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
        .filter($"activity_sum" > 50)
        .filter($"pFilter".isin(2, 3))
        .agg(sum($"metricToAgg"))
        .withColumn("t", lit("first_union_part"))
      
      val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
        .filter($"activity_sum" > 50)
        .filter($"secondFilter".isin(2, 3))
        .agg(sum($"metricToAgg"))
        .withColumn("t", lit("second_union_part"))
      
      val finalDF = firstUnionPart.union(secondUnionPart)
      finalDF.show()
      
      +----------------+-----------------+ 
      |sum(metricToAgg)| t               | 
      +----------------+-----------------+ 
      | 850            | first_union_part| 
      | 850            |second_union_part| 
      +----------------+-----------------+
      

       

      The second row is wrong, it should be 250, instead of 850, which you can see if you show both unionParts separately:

      firstUnionPart.show() 
      +----------------+----------------+ 
      |sum(metricToAgg)|               t| 
      +----------------+----------------+ 
      |             850|first_union_part| 
      +----------------+----------------+
      
      secondUnionPart.show()
      +----------------+-----------------+
      |sum(metricToAgg)|                t|
      +----------------+-----------------+
      |             250|second_union_part|
      +----------------+-----------------+

       

      The ReusedExchange replaced the part of the query plan in the second branch of the union by the query plan from the first branch as you can see from explain() function.

      I did some inspection and it appears that both sub-plans have the same canonicalized plans and therefore the ReusedExchange takes place. But I don't think they should have the same canonicalized plan, since according to the notes in the source code only plans that evaluate to the same result can have same canonicalized plans. And the two sub-plans in this query lead in principle to different results, because in the second union there is filter on different column than in the first union.

       

      Interesting think happens when we change the name of the second column from "pFilter" to "kFilter". In this case query works fine and produces correct result, as you can see here:

      import org.apache.spark.sql.functions.{sum, lit}
      import org.apache.spark.sql.expressions.Window
      
      
      val row1 = (1, 3, 4, 50)
      val row2 = (2, 2, 2, 250)
      val row3 = (3, 2, 4, 250)
      val row4 = (4, 3, 1, 350)
      val data = Seq(row1, row2, row3, row4)
      
      
      val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()
      
      val w = Window.partitionBy($"id")
      
      val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
        .filter($"activity_sum" > 50)
        .filter($"kFilter".isin(2, 3))
        .agg(sum($"metricToAgg"))
        .withColumn("t", lit("first_union_part"))
      
      val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
        .filter($"activity_sum" > 50)
        .filter($"secondFilter".isin(2, 3))
        .agg(sum($"metricToAgg"))
        .withColumn("t", lit("second_union_part"))
      
      val finalDF = firstUnionPart.union(secondUnionPart)
      
      finalDF.show()
      
      +----------------+-----------------+
      |sum(metricToAgg)|                t|
      +----------------+-----------------+
      |             850| first_union_part|
      |             250|second_union_part|
      +----------------+-----------------+

       

      The result is now correct and the only think we changed is a name of one column. The ReusedExchange does not happen here and I checked that the canonicalized plans now really differ.

       

      The key points to reproduce this bug are:

      1. Use union (or some operator with multiple branches)
      2. Use cache to have InMemoryTableScan
      3. Use operator that forces Exchange in the plan (in this case window function call)
      4. Use column names that will have specific alphabetical order

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            vrbad David Vrba
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: