Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24556

ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.2
    • 2.4.0
    • SQL
    • None

    Description

      Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:

      val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
      val df1 = df.as("t1")
      val df2 = df.as("t2")
      val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
      t.cache.orderBy($"t2.j").explain
      

      Before fix:

      == Physical Plan ==
      *(1) Sort [j#14 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
         +- InMemoryTableScan [i#5, j#6, i#13, j#14]
               +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                     +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                        :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
                        :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                        :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                        :        +- LocalTableScan [i#5, j#6]
                        +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                           +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
      

      Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like:

      == Physical Plan ==
      *(1) Sort [j#14 ASC NULLS FIRST], true, 0
      +- InMemoryTableScan [i#5, j#6, i#13, j#14]
            +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
                  +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                     :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                     :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                     :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                     :        +- LocalTableScan [i#5, j#6]
                     +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                        +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
      

      Attachments

        Activity

          People

            yucai yucai
            yucai yucai
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: