Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.1
-
None
-
Spark 2.3.1
Hadoop 2.7.3
Description
BuildSide is not coming as expected.
Pre-requisites:
CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
Steps:
Scenario 1:
spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80000000000')")
spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan
val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
Result 1:
scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin c1#0L, c1#1L, Inner, BuildRight
:- *(2) Filter isnotnull(c1#0L)
: +- HiveTableScan c1#0L, HiveTableRelation `default`.`small3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, c1#0L
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#1L)
+- HiveTableScan c1#1L, HiveTableRelation `default`.`big3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, c1#1L
scala> val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight
scala> println(buildSide)
BuildRight
Scenario 2:
spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80')")
spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan
val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
Result 2:
scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin c1#4L, c1#5L, Inner, BuildRight
:- *(2) Filter isnotnull(c1#4L)
: +- HiveTableScan c1#4L, HiveTableRelation `default`.`small4`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, c1#4L
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#5L)
+- HiveTableScan c1#5L, HiveTableRelation `default`.`big4`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, c1#5L
scala> val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight