Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18245 Improving support for bucketed table
  3. SPARK-18067

SortMergeJoin adds shuffle if join predicates have non partitioned columns

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 1.6.1
    • None
    • SQL

    Description

      Basic setup

      scala> case class Data1(key: String, value1: Int)
      scala> case class Data2(key: String, value2: Int)
      
      scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
          .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
      scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
          .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
      

      Join on key

      scala> partition1.join(partition2, "key").explain
      == Physical Plan ==
      Project [key#0,value1#1,value2#13]
      +- SortMergeJoin [key#0], [key#12]
         :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
         +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
      

      And we get a super efficient join with no shuffle.

      But if we add a filter our join gets less efficient and we end up with a shuffle.

      scala> partition1.join(partition2, "key").filter($"value1" === $"value2").explain
      == Physical Plan ==
      Project [key#0,value1#1,value2#13]
      +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
         :- Sort [value1#1 ASC,key#0 ASC], false, 0
         :  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
         :     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
         +- Sort [value2#13 ASC,key#12 ASC], false, 0
            +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
               +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
      

      And we can avoid the shuffle if use a filter statement that can't be pushed in the join.

      scala> partition1.join(partition2, "key").filter($"value1" >= $"value2").explain
      == Physical Plan ==
      Project [key#0,value1#1,value2#13]
      +- Filter (value1#1 >= value2#13)
         +- SortMergeJoin [key#0], [key#12]
            :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None
            +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
      

      What's the best way to avoid the filter pushdown here??

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pjoneswork Paul Jones
              Votes:
              1 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: