Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32753

Deduplicating and repartitioning the same column create duplicate rows with AQE

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.2, 3.1.0
    • SQL

    Description

      To reproduce:

      spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
      val df = spark.sql("select id from v1 group by id distribute by id") 
      println(df.collect().toArray.mkString(","))
      println(df.queryExecution.executedPlan)
      
      // With AQE
      [4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
      AdaptiveSparkPlan(isFinalPlan=true)
      +- CustomShuffleReader local
         +- ShuffleQueryStage 0
            +- Exchange hashpartitioning(id#183L, 10), true
               +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
                  +- Union
                     :- *(1) Range (0, 10, step=1, splits=2)
                     +- *(2) Range (0, 10, step=1, splits=2)
      
      // Without AQE
      [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
      *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Exchange hashpartitioning(id#206L, 10), true
         +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

      Attachments

        Activity

          People

            mauzhang Manu Zhang
            mauzhang Manu Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: