Details
-
Sub-task
-
Status: Resolved
-
Minor
-
Resolution: Incomplete
-
1.6.1
-
None
Description
Basic setup
scala> case class Data1(key: String, value1: Int) scala> case class Data2(key: String, value2: Int) scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x)) .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x)) .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
Join on key
scala> partition1.join(partition2, "key").explain == Physical Plan == Project [key#0,value1#1,value2#13] +- SortMergeJoin [key#0], [key#12] :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
And we get a super efficient join with no shuffle.
But if we add a filter our join gets less efficient and we end up with a shuffle.
scala> partition1.join(partition2, "key").filter($"value1" === $"value2").explain == Physical Plan == Project [key#0,value1#1,value2#13] +- SortMergeJoin [value1#1,key#0], [value2#13,key#12] :- Sort [value1#1 ASC,key#0 ASC], false, 0 : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None +- Sort [value2#13 ASC,key#12 ASC], false, 0 +- TungstenExchange hashpartitioning(value2#13,key#12,200), None +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
And we can avoid the shuffle if use a filter statement that can't be pushed in the join.
scala> partition1.join(partition2, "key").filter($"value1" >= $"value2").explain == Physical Plan == Project [key#0,value1#1,value2#13] +- Filter (value1#1 >= value2#13) +- SortMergeJoin [key#0], [key#12] :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#0 ASC], false, 0, None +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), Sort [key#12 ASC], false, 0, None
What's the best way to avoid the filter pushdown here??
Attachments
Issue Links
- is related to
-
SPARK-32806 SortMergeJoin with partial hash distribution can be optimized to remove shuffle
- In Progress
- links to