Details
Description
Description : Size in bytes of the query is coming in EB in case of parquet datasource. this would impact the performance , since join queries would always go as Sort Merge Join.
Precondition : spark.sql.statistics.fallBackToHdfs = true
Steps:
0: jdbc:hive2://10.xx:23040/default> create table t1110 (a int, b string) using parquet PARTITIONED BY (b) ; +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (2,'b'); +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.1xx:23040/default> insert into t1110 values (1,'a'); +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.xx.xx:23040/default> select * from t1110; +----+----+--+ | a | b | +----+----+--+ | 1 | a | | 2 | b | +----+----+--+
Cost of the query shows sizeInBytes in EB
explain cost select * from t1110; | == Optimized Logical Plan == Relation[a#23,b#24] parquet, Statistics(sizeInBytes=8.0 EB, hints=none) == Physical Plan == *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int> |
This would lead to Sort Merge Join in case of join query
0: jdbc:hive2://10.xx.xx:23040/default> create table t110 (a int, b string) using parquet PARTITIONED BY (b) ; +---------+--+ | Result | +---------+--+ +---------+--+ 0: jdbc:hive2://10.xx.xx:23040/default> insert into t110 values (1,'a'); +---------+--+ | Result | +---------+--+ +---------+--+ explain select * from t1110 t1 join t110 t2 on t1.a=t2.a; | == Physical Plan == *(5) SortMergeJoin [a#23], [a#55], Inner :- *(2) Sort [a#23 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#23, 200) : +- *(1) Project [a#23, b#24] : +- *(1) Filter isnotnull(a#23) : +- *(1) FileScan parquet open.t1110[a#23,b#24] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t1110], PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int> +- *(4) Sort [a#55 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#55, 200) +- *(3) Project [a#55, b#56] +- *(3) Filter isnotnull(a#55) +- *(3) FileScan parquet open.t110[a#55,b#56] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hacluster/user/sparkhive/warehouse/open.db/t110], PartitionCount: 1, PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int> |