Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.7.0
-
None
Description
When a Union-All has an input that is a LIMIT 1 (or some small value relative to the slice_target), and that input is accessing Parquet files, Drill does an optimization where a single Parquet file is read (based on the rowcount statistics in the Parquet file, we determine that reading 1 file is sufficient). This also means that the max width for that major fragment is set to 1 because only 1 minor fragment is needed to read 1 row-group.
The net effect of this is the width of 1 is applied to the major fragment which consists of union-all and its inputs. This is sub-optimal because it prevents parallelization of the other input and the union-all operator itself.
Here's an example query and plan that illustrates the issue:
alter session set `planner.slice_target` = 1; explain plan for (select c.c_nationkey, c.c_custkey, c.c_name from dfs.`/Users/asinha/data/tpchmulti/customer` c inner join dfs.`/Users/asinha/data/tpchmulti/nation` n on c.c_nationkey = n.n_nationkey) union all (select c_nationkey, c_custkey, c_name from dfs.`/Users/asinha/data/tpchmulti/customer` c limit 1) +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2]) 00-02 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2]) 00-03 UnionAll(all=[true]) 00-05 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2]) 00-07 HashJoin(condition=[=($0, $3)], joinType=[inner]) 00-10 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2]) 00-13 HashToRandomExchange(dist0=[[$0]]) 01-01 UnorderedMuxExchange 03-01 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)]) 03-02 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/customer]], selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]]) 00-09 Project(n_nationkey=[$0]) 00-12 HashToRandomExchange(dist0=[[$0]]) 02-01 UnorderedMuxExchange 04-01 Project(n_nationkey=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)]) 04-02 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/nation]], selectionRoot=file:/Users/asinha/data/tpchmulti/nation, numFiles=1, usedMetadataFile=false, columns=[`n_nationkey`]]]) 00-04 Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2]) 00-06 SelectionVectorRemover 00-08 Limit(fetch=[1]) 00-11 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/Users/asinha/data/tpchmulti/customer/01.parquet]], selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
Note that Union-all and HashJoin are part of fragment 0 (single minor fragment) even though they could have been parallelized. This clearly affects performance for larger data sets.