Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26576

Broadcast hint not applied to partitioned table

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.2, 2.3.2, 2.4.0
    • 2.4.1, 3.0.0
    • SQL
    • None

    Description

      Broadcast hint is not applied to partitioned Parquet table. Below "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed in Optimized Plan.

      scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) PARTITIONED BY (dateint INT) STORED AS parquet")
      
      scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
      
      scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
      
      == Parsed Logical Plan ==
      'Join UsingJoin(Inner,List(dateint))
      :- SubqueryAlias `jzhuge`.`parquet_with_part`
      :  +- Relation[val#28,dateint#29] parquet
      +- ResolvedHint (broadcast)
         +- SubqueryAlias `jzhuge`.`parquet_with_part`
            +- Relation[val#32,dateint#33] parquet
      
      == Analyzed Logical Plan ==
      dateint: int, val: string, val: string
      Project [dateint#29, val#28, val#32]
      +- Join Inner, (dateint#29 = dateint#33)
         :- SubqueryAlias `jzhuge`.`parquet_with_part`
         :  +- Relation[val#28,dateint#29] parquet
         +- ResolvedHint (broadcast)
            +- SubqueryAlias `jzhuge`.`parquet_with_part`
               +- Relation[val#32,dateint#33] parquet
      
      == Optimized Logical Plan ==
      Project [dateint#29, val#28, val#32]
      +- Join Inner, (dateint#29 = dateint#33)
         :- Project [val#28, dateint#29]
         :  +- Filter isnotnull(dateint#29)
         :     +- Relation[val#28,dateint#29] parquet
         +- Project [val#32, dateint#33]
            +- Filter isnotnull(dateint#33)
               +- Relation[val#32,dateint#33] parquet
      
      == Physical Plan ==
      *(5) Project [dateint#29, val#28, val#32]
      +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
         :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
         :  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
         :     +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: [], ReadSchema: struct<val:string>
         +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
            +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
      

      Broadcast hint is applied to Parquet table without partition. Below "BroadcastHashJoin" is chosen as expected.

      scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint INT) STORED AS parquet")
      
      scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
      
      scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
      
      == Parsed Logical Plan ==
      'Join UsingJoin(Inner,List(dateint))
      :- SubqueryAlias `jzhuge`.`parquet_no_part`
      :  +- Relation[val#44,dateint#45] parquet
      +- ResolvedHint (broadcast)
         +- SubqueryAlias `jzhuge`.`parquet_no_part`
            +- Relation[val#50,dateint#51] parquet
      
      == Analyzed Logical Plan ==
      dateint: int, val: string, val: string
      Project [dateint#45, val#44, val#50]
      +- Join Inner, (dateint#45 = dateint#51)
         :- SubqueryAlias `jzhuge`.`parquet_no_part`
         :  +- Relation[val#44,dateint#45] parquet
         +- ResolvedHint (broadcast)
            +- SubqueryAlias `jzhuge`.`parquet_no_part`
               +- Relation[val#50,dateint#51] parquet
      
      == Optimized Logical Plan ==
      Project [dateint#45, val#44, val#50]
      +- Join Inner, (dateint#45 = dateint#51)
         :- Filter isnotnull(dateint#45)
         :  +- Relation[val#44,dateint#45] parquet
         +- ResolvedHint (broadcast)
            +- Filter isnotnull(dateint#51)
               +- Relation[val#50,dateint#51] parquet
      
      == Physical Plan ==
      *(2) Project [dateint#45, val#44, val#50]
      +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
         :- *(2) Project [val#44, dateint#45]
         :  +- *(2) Filter isnotnull(dateint#45)
         :     +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
            +- *(1) Project [val#50, dateint#51]
               +- *(1) Filter isnotnull(dateint#51)
                  +- *(1) FileScan parquet jzhuge.parquet_no_part[val#50,dateint#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
      

      Observed similar issue with partitioned Orc table. SequenceFile is fine.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jzhuge John Zhuge
            jzhuge John Zhuge
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment