Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.3.0
-
None
Description
I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed table, here's the desc formatted my_bucketed_tbl output:
-------------------------------------------
col_nam | data_type | comment |
-------------------------------------------
bundle | string | null |
ifa | string | null |
date_ | date | null |
hour | int | null |
|
||
Database | default | |
Table | my_bucketed_tbl | |
Owner | zeppelin | |
Created | Thu Dec 21 13:43:... | |
Last Access | Thu Jan 01 00:00:... | |
Type | EXTERNAL | |
Provider | orc | |
Num Buckets | 16 | |
Bucket Columns | [`ifa`] | |
Sort Columns | [`ifa`] | |
Table Properties | [transient_lastDd... | |
Location | hdfs:/user/hive/w... | |
Serde Library | org.apache.hadoop... | |
InputFormat | org.apache.hadoop... | |
OutputFormat | org.apache.hadoop... | |
Storage Properties | [serialization.fo... |
-------------------------------------------
When I'm executing an explain of a group by query, I can see that we've spared the exchange phase :
sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain == Physical Plan == SortAggregate(key=[ifa#932], functions=[max(bundle#920)]) +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)]) +- *Sort [ifa#932 ASC NULLS FIRST], false, 0 +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>
But, when I replace Spark's max function with collect_set, I can see that the execution plan is the same as a non-bucketed table, means, the exchange phase is not spared :
sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain == Physical Plan == ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)]) +- Exchange hashpartitioning(ifa#1010, 200) +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)]) +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string