Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
0.7.0
-
None
-
None
Description
The following query has a subquery that groups on 2 columns and outer query that queries on 1 of those columns. With slice_target = 1 to force exchanges, it produces incorrect result:
alter session set `planner.slice_target` = 1; select count(*) from (select l_partksy from (select l_partkey, l_suppkey from cp.`tpch/lineitem.parquet` group by l_partkey, l_suppkey) group by l_partkey ); +------------+ | EXPR$0 | +------------+ | 6227 | +------------+ 1 row selected (1.522 seconds)
Correct result (from Postgres):
count ------- 2000 (1 row)
The cause appears to be related to distribution trait propagation. Here's the EXPLAIN plan:
+------------+------------+ | text | json | +------------+------------+ | 00-00 Screen 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) 00-02 UnionExchange 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) 01-02 Project($f0=[0]) 01-03 HashAgg(group=[{0}]) 01-04 Project(l_partkey=[$0]) 01-05 HashAgg(group=[{0, 1}]) 01-06 HashToRandomExchange(dist0=[[$0]], dist1=[[$1]]) 02-01 HashAgg(group=[{0, 1}]) 02-02 Project(l_partkey=[$1], l_suppkey=[$0]) 02-03 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tpch/lineitem.parquet]], selectionRoot=/tpch/lineitem.parquet, numFiles=1, columns=[`l_partkey`, `l_suppkey`]]])
Note that the HashExchange operator 06 does a distribute on 2 columns l_partkey and l_suppkey in order to perform the 2phase aggregation. These are the group-by columns. However, in the outer query's HashAgg, there is no re-distribution being done. It assumes that data is already hash distributed on l_partkey which is not correct.