Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32753

Deduplicating and repartitioning the same column create duplicate rows with AQE

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.2, 3.1.0
    • Component/s: SQL
    • Labels:

      Description

      To reproduce:

      spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
      val df = spark.sql("select id from v1 group by id distribute by id") 
      println(df.collect().toArray.mkString(","))
      println(df.queryExecution.executedPlan)
      
      // With AQE
      [4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
      AdaptiveSparkPlan(isFinalPlan=true)
      +- CustomShuffleReader local
         +- ShuffleQueryStage 0
            +- Exchange hashpartitioning(id#183L, 10), true
               +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
                  +- Union
                     :- *(1) Range (0, 10, step=1, splits=2)
                     +- *(2) Range (0, 10, step=1, splits=2)
      
      // Without AQE
      [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
      *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Exchange hashpartitioning(id#206L, 10), true
         +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

        Attachments

          Activity

            People

            • Assignee:
              mauzhang Manu Zhang
              Reporter:
              mauzhang Manu Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: