Uploaded image for project: 'Apache Drill'
  1. Apache Drill
  2. DRILL-4833

Union-All with a small cardinality input on one side does not get parallelized




      When a Union-All has an input that is a LIMIT 1 (or some small value relative to the slice_target), and that input is accessing Parquet files, Drill does an optimization where a single Parquet file is read (based on the rowcount statistics in the Parquet file, we determine that reading 1 file is sufficient). This also means that the max width for that major fragment is set to 1 because only 1 minor fragment is needed to read 1 row-group.

      The net effect of this is the width of 1 is applied to the major fragment which consists of union-all and its inputs. This is sub-optimal because it prevents parallelization of the other input and the union-all operator itself.

      Here's an example query and plan that illustrates the issue:

      alter session set `planner.slice_target` = 1;
      explain plan for 
      (select c.c_nationkey, c.c_custkey, c.c_name
      dfs.`/Users/asinha/data/tpchmulti/customer` c
      inner join
      dfs.`/Users/asinha/data/tpchmulti/nation`  n
      on c.c_nationkey = n.n_nationkey)
      union all
      (select c_nationkey, c_custkey, c_name
      from dfs.`/Users/asinha/data/tpchmulti/customer` c limit 1)
      | text | json |
      | 00-00    Screen
      00-01      Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
      00-02        Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
      00-03          UnionAll(all=[true])
      00-05            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
      00-07              HashJoin(condition=[=($0, $3)], joinType=[inner])
      00-10                Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
      00-13                  HashToRandomExchange(dist0=[[$0]])
      01-01                    UnorderedMuxExchange
      03-01                      Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
      03-02                        Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/customer]], selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
      00-09                Project(n_nationkey=[$0])
      00-12                  HashToRandomExchange(dist0=[[$0]])
      02-01                    UnorderedMuxExchange
      04-01                      Project(n_nationkey=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
      04-02                        Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/Users/asinha/data/tpchmulti/nation]], selectionRoot=file:/Users/asinha/data/tpchmulti/nation, numFiles=1, usedMetadataFile=false, columns=[`n_nationkey`]]])
      00-04            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
      00-06              SelectionVectorRemover
      00-08                Limit(fetch=[1])
      00-11                  Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/Users/asinha/data/tpchmulti/customer/01.parquet]], selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])

      Note that Union-all and HashJoin are part of fragment 0 (single minor fragment) even though they could have been parallelized. This clearly affects performance for larger data sets.




            amansinha100 Aman Sinha
            amansinha100 Aman Sinha
            Robert Hou Robert Hou
            0 Vote for this issue
            3 Start watching this issue