Details
Description
Issue 1: Broadcast join is changing to sort merge join , after spark-beeline session restarts.
Precondition : The JDBC/Thrift Server is continuously running.
Steps:
0: jdbc:hive2://10.18.18.214:23040/default> use x1; +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.18.18.214:23040/default> create table cv (a int, b string) stored as parquet; +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.18.18.214:23040/default> create table c (a int, b string) stored as parquet; +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.18.18.214:23040/default> insert into table c values (1,'a'); +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.18.18.214:23040/default> insert into table cv values (1,'a'); +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.18.18.214:23040/default> select * from c , cv where c.a = cv. +----+----+----+----+--+ | a | b | a | b | +----+----+----+----+--+ | 1 | a | 1 | a | +----+----+----+----+--+
Before Restarting the session (spark-beeline)
explain select * from c , cv where c.a = cv.a;
== Physical Plan == *(2) BroadcastHashJoina#3284, a#3286, Inner, BuildRight :- *(2) Project a#3284, b#3285 : +- *(2) Filter isnotnull(a#3284) : +- *(2) FileScan parquet x1.ca#3284,b#3285 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project a#3286, b#3287 +- *(1) Filter isnotnull(a#3286) +- *(1) FileScan parquet x1.cva#3286,b#3287 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> |
After Session Restarts (spark-beeline)
explain select * from c , cv where c.a = cv.a;
== Physical Plan == (5) *SortMergeJoin a#3312, a#3314, Inner :- *(2) Sort a#3312 ASC NULLS FIRST, false, 0 : +- Exchange hashpartitioning(a#3312, 200) : +- *(1) Project a#3312, b#3313 : +- *(1) Filter isnotnull(a#3312) : +- *(1) FileScan parquet x1.ca#3312,b#3313 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> +- *(4) Sort a#3314 ASC NULLS FIRST, false, 0 +- Exchange hashpartitioning(a#3314, 200) +- *(3) Project a#3314, b#3315 +- *(3) Filter isnotnull(a#3314) +- *(3) FileScan parquet x1.cva#3314,b#3315 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> |
Note: JDBC Server is continuously running at the time of session restart i.e. Application is not restarting. The driver remains the same.