Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25403

Broadcast join is changing to sort merge join , after spark-beeline session restarts.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.1, 2.3.1
    • 3.0.0
    • SQL
    • None
    • Spark 2.3.1

      Hadoop 2.7.2

    Description

      Issue 1: Broadcast join is changing to sort merge join , after spark-beeline session restarts.

      Precondition : The JDBC/Thrift Server is continuously running. 

      Steps:

       

      0: jdbc:hive2://10.18.18.214:23040/default> use x1;
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.18.18.214:23040/default> create table cv (a int, b string) stored as parquet;
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.18.18.214:23040/default> create table c (a int, b string) stored as parquet;
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.18.18.214:23040/default> insert into table c values (1,'a');
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.18.18.214:23040/default> insert into table cv values (1,'a');
      +---------+--+
      | Result |
      +---------+--+
      +---------+--+
      0: jdbc:hive2://10.18.18.214:23040/default> select * from c , cv where c.a = cv.
      +----+----+----+----+--+
      | a | b | a | b |
      +----+----+----+----+--+
      | 1 | a | 1 | a |
      +----+----+----+----+--+
      
      

       

      Before Restarting the session (spark-beeline)
      

      explain select * from c , cv where c.a = cv.a;

      == Physical Plan ==
      *(2) BroadcastHashJoina#3284, a#3286, Inner, BuildRight
      :- *(2) Project a#3284, b#3285
      : +- *(2) Filter isnotnull(a#3284)
      : +- *(2) FileScan parquet x1.ca#3284,b#3285 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
      +- *(1) Project a#3286, b#3287
      +- *(1) Filter isnotnull(a#3286)
      +- *(1) FileScan parquet x1.cva#3286,b#3287 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string>
      After Session Restarts (spark-beeline)
      

       explain select * from c , cv where c.a = cv.a;

      == Physical Plan == (5) *SortMergeJoin a#3312, a#3314, Inner :- *(2) Sort a#3312 ASC NULLS FIRST, false, 0 : +- Exchange hashpartitioning(a#3312, 200) : +- *(1) Project a#3312, b#3313 : +- *(1) Filter isnotnull(a#3312) : +- *(1) FileScan parquet x1.ca#3312,b#3313 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> +- *(4) Sort a#3314 ASC NULLS FIRST, false, 0 +- Exchange hashpartitioning(a#3314, 200) +- *(3) Project a#3314, b#3315 +- *(3) Filter isnotnull(a#3314) +- *(3) FileScan parquet x1.cva#3314,b#3315 Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string>

      Note: JDBC Server is continuously running at the time of session restart i.e. Application is not restarting. The driver remains the same.

      Attachments

        Activity

          People

            yumwang Yuming Wang
            Ayush007 Ayush Anubhava
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: