Description
Broadcast hint is not applied to partitioned Parquet table. Below "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed in Optimized Plan.
scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) PARTITIONED BY (dateint INT) STORED AS parquet") scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true)) == Parsed Logical Plan == 'Join UsingJoin(Inner,List(dateint)) :- SubqueryAlias `jzhuge`.`parquet_with_part` : +- Relation[val#28,dateint#29] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_with_part` +- Relation[val#32,dateint#33] parquet == Analyzed Logical Plan == dateint: int, val: string, val: string Project [dateint#29, val#28, val#32] +- Join Inner, (dateint#29 = dateint#33) :- SubqueryAlias `jzhuge`.`parquet_with_part` : +- Relation[val#28,dateint#29] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_with_part` +- Relation[val#32,dateint#33] parquet == Optimized Logical Plan == Project [dateint#29, val#28, val#32] +- Join Inner, (dateint#29 = dateint#33) :- Project [val#28, dateint#29] : +- Filter isnotnull(dateint#29) : +- Relation[val#28,dateint#29] parquet +- Project [val#32, dateint#33] +- Filter isnotnull(dateint#33) +- Relation[val#32,dateint#33] parquet == Physical Plan == *(5) Project [dateint#29, val#28, val#32] +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864] : +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: [], ReadSchema: struct<val:string> +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0 +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
Broadcast hint is applied to Parquet table without partition. Below "BroadcastHashJoin" is chosen as expected.
scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint INT) STORED AS parquet") scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => df.join(broadcast(df), "dateint").explain(true)) == Parsed Logical Plan == 'Join UsingJoin(Inner,List(dateint)) :- SubqueryAlias `jzhuge`.`parquet_no_part` : +- Relation[val#44,dateint#45] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_no_part` +- Relation[val#50,dateint#51] parquet == Analyzed Logical Plan == dateint: int, val: string, val: string Project [dateint#45, val#44, val#50] +- Join Inner, (dateint#45 = dateint#51) :- SubqueryAlias `jzhuge`.`parquet_no_part` : +- Relation[val#44,dateint#45] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_no_part` +- Relation[val#50,dateint#51] parquet == Optimized Logical Plan == Project [dateint#45, val#44, val#50] +- Join Inner, (dateint#45 = dateint#51) :- Filter isnotnull(dateint#45) : +- Relation[val#44,dateint#45] parquet +- ResolvedHint (broadcast) +- Filter isnotnull(dateint#51) +- Relation[val#50,dateint#51] parquet == Physical Plan == *(2) Project [dateint#45, val#44, val#50] +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight :- *(2) Project [val#44, dateint#45] : +- *(2) Filter isnotnull(dateint#45) : +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint))) +- *(1) Project [val#50, dateint#51] +- *(1) Filter isnotnull(dateint#51) +- *(1) FileScan parquet jzhuge.parquet_no_part[val#50,dateint#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
Observed similar issue with partitioned Orc table. SequenceFile is fine.
Attachments
Issue Links
- is duplicated by
-
SPARK-26599 BroardCast hint can not work with PruneFileSourcePartitions
- Closed
- links to