Uploaded image for project: 'Apache Drill'
  1. Apache Drill
  2. DRILL-6158

Create a mux operator for union exchange to enable two phase merging instead of foreman merging all the batches.




      Consider the following simple query

      select zz1,zz2,a11 from dfs.tmp.viewtmp limit 100000 offset 10000000

      The following plan is generated for this query

      00-00    Screen : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {1.06048844E8 rows, 5.54015404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 787
      00-01      Project(zz1=[$0], zz2=[$1], a11=[$2]) : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {1.05038844E8 rows, 5.53005404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 786
      00-02        SelectionVectorRemover : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {1.05038844E8 rows, 5.53005404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 785
      00-03          Limit(offset=[10000000], fetch=[100000]) : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {9.4938844E7 rows, 5.42905404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 784
      00-04            UnionExchange : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {8.4838844E7 rows, 5.02505404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 783
      01-01              SelectionVectorRemover : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {7.4738844E7 rows, 4.21705404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 782
      01-02                Limit(fetch=[10100000]) : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {6.4638844E7 rows, 4.11605404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 781
      01-03                  Project(zz1=[$0], zz2=[$2], a11=[$1]) : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 2.3306983E7, cumulative cost = {5.4538844E7 rows, 3.71205404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 780
      01-04                    HashJoin(condition=[=($0, $2)], joinType=[left]) : rowType = RecordType(ANY ZZ1, ANY A, ANY ZZ2): rowcount = 2.3306983E7, cumulative cost = {5.4538844E7 rows, 3.71205404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 779
      01-06                      Scan(groupscan=[EasyGroupScan [selectionRoot=maprfs:/tmp/csvd1, numFiles=3, columns=[`ZZ1`, `A`], files=[maprfs:/tmp/csvd1/D1111aamulti11random2.csv, maprfs:/tmp/csvd1/D1111aamulti11random21.csv, maprfs:/tmp/csvd1/D1111aamulti11random211.csv]]]) : rowType = RecordType(ANY ZZ1, ANY A): rowcount = 2.3306983E7, cumulative cost = {2.3306983E7 rows, 4.6613966E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 776
      01-05                      BroadcastExchange : rowType = RecordType(ANY ZZ2): rowcount = 2641626.0, cumulative cost = {5283252.0 rows, 2.3774634E7 cpu, 0.0 io, 3.2460300288E10 network, 0.0 memory}, id = 778
      02-01                        Scan(groupscan=[EasyGroupScan [selectionRoot=maprfs:/tmp/csvd2, numFiles=1, columns=[`ZZ2`], files=[maprfs:/tmp/csvd2/D222random2.csv]]]) : rowType = RecordType(ANY ZZ2): rowcount = 2641626.0, cumulative cost = {2641626.0 rows, 2641626.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 777

      In case of many minor fragments and huge cluster all the minor fragments feeding into unionExchange will be merged only at the foreman. Eventhough unionExchange is not a bottleneck interms of cpu but it creates huge memory pressure in terms of memory.

      It is observed that due to this mostly on a large cluster with many minor fragments it runs out of memory.

      In this scenario it is always better to locally merge the minor fragments pertaining to a DRILLBIT and send the single stream to the foreman. This divides the memory consumption to all the drillbits and then reduces the memory pressure at the foreman.



        Issue Links



              hanu.ncr Hanumath Rao Maduri
              hanu.ncr Hanumath Rao Maduri
              0 Vote for this issue
              2 Start watching this issue

