Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26576

Broadcast hint not applied to partitioned table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.2, 2.3.2, 2.4.0
    • 2.4.1, 3.0.0
    • SQL
    • None

    Description

      Broadcast hint is not applied to partitioned Parquet table. Below "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed in Optimized Plan.

      scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) PARTITIONED BY (dateint INT) STORED AS parquet")
      
      scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
      
      scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
      
      == Parsed Logical Plan ==
      'Join UsingJoin(Inner,List(dateint))
      :- SubqueryAlias `jzhuge`.`parquet_with_part`
      :  +- Relation[val#28,dateint#29] parquet
      +- ResolvedHint (broadcast)
         +- SubqueryAlias `jzhuge`.`parquet_with_part`
            +- Relation[val#32,dateint#33] parquet
      
      == Analyzed Logical Plan ==
      dateint: int, val: string, val: string
      Project [dateint#29, val#28, val#32]
      +- Join Inner, (dateint#29 = dateint#33)
         :- SubqueryAlias `jzhuge`.`parquet_with_part`
         :  +- Relation[val#28,dateint#29] parquet
         +- ResolvedHint (broadcast)
            +- SubqueryAlias `jzhuge`.`parquet_with_part`
               +- Relation[val#32,dateint#33] parquet
      
      == Optimized Logical Plan ==
      Project [dateint#29, val#28, val#32]
      +- Join Inner, (dateint#29 = dateint#33)
         :- Project [val#28, dateint#29]
         :  +- Filter isnotnull(dateint#29)
         :     +- Relation[val#28,dateint#29] parquet
         +- Project [val#32, dateint#33]
            +- Filter isnotnull(dateint#33)
               +- Relation[val#32,dateint#33] parquet
      
      == Physical Plan ==
      *(5) Project [dateint#29, val#28, val#32]
      +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
         :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
         :  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
         :     +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: [], ReadSchema: struct<val:string>
         +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
            +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
      

      Broadcast hint is applied to Parquet table without partition. Below "BroadcastHashJoin" is chosen as expected.

      scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint INT) STORED AS parquet")
      
      scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
      
      scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
      
      == Parsed Logical Plan ==
      'Join UsingJoin(Inner,List(dateint))
      :- SubqueryAlias `jzhuge`.`parquet_no_part`
      :  +- Relation[val#44,dateint#45] parquet
      +- ResolvedHint (broadcast)
         +- SubqueryAlias `jzhuge`.`parquet_no_part`
            +- Relation[val#50,dateint#51] parquet
      
      == Analyzed Logical Plan ==
      dateint: int, val: string, val: string
      Project [dateint#45, val#44, val#50]
      +- Join Inner, (dateint#45 = dateint#51)
         :- SubqueryAlias `jzhuge`.`parquet_no_part`
         :  +- Relation[val#44,dateint#45] parquet
         +- ResolvedHint (broadcast)
            +- SubqueryAlias `jzhuge`.`parquet_no_part`
               +- Relation[val#50,dateint#51] parquet
      
      == Optimized Logical Plan ==
      Project [dateint#45, val#44, val#50]
      +- Join Inner, (dateint#45 = dateint#51)
         :- Filter isnotnull(dateint#45)
         :  +- Relation[val#44,dateint#45] parquet
         +- ResolvedHint (broadcast)
            +- Filter isnotnull(dateint#51)
               +- Relation[val#50,dateint#51] parquet
      
      == Physical Plan ==
      *(2) Project [dateint#45, val#44, val#50]
      +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
         :- *(2) Project [val#44, dateint#45]
         :  +- *(2) Filter isnotnull(dateint#45)
         :     +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
            +- *(1) Project [val#50, dateint#51]
               +- *(1) Filter isnotnull(dateint#51)
                  +- *(1) FileScan parquet jzhuge.parquet_no_part[val#50,dateint#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
      

      Observed similar issue with partitioned Orc table. SequenceFile is fine.

      Attachments

        Issue Links

          Activity

            People

              jzhuge John Zhuge
              jzhuge John Zhuge
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: