Details

      Description

      Selecting a count of dir0, dir1, etc against a parquet directory returns 0 rows.

      select count(dir0) from `min_max_dir`;
      ---------

      EXPR$0

      ---------

      0

      ---------

      select count(dir1) from `min_max_dir`;
      ---------

      EXPR$0

      ---------

      0

      ---------

      If I put both dir0 and dir1 in the same select, it returns expected result:
      select count(dir0), count(dir1) from `min_max_dir`;
      ----------------+

      EXPR$0 EXPR$1

      ----------------+

      600 600

      ----------------+

      Here is the physical plan for count(dir0) query:

      00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {22.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1346
      00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1345
      00-02        Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1344
      00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@3da85d3b[columns = null, isStarQuery = false, isSkipQuery = false]]) : rowType = RecordType(BIGINT count): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1343
      

      Here is part of the explain plan for the count(dir0) and count(dir1) in the same select:

      00-00    Screen : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1206.0 rows, 15606.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1623
      00-01      Project(EXPR$0=[$0], EXPR$1=[$1]) : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1622
      00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[COUNT($1)]) : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1621
      00-03          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1999/Apr/voter20.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1999/MAR/voter15.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1985/jan/voter5.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1985/apr/voter60.parquet/0_0_0.parquet],..., ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/2014/jul/voter35.parquet/0_0_0.parquet]], selectionRoot=maprfs:/drill/testdata/min_max_dir, numFiles=16, usedMetadataFile=false, columns=[`dir0`, `dir1`]]]) : rowType = RecordType(ANY dir0, ANY dir1): rowcount = 600.0, cumulative cost = {600.0 rows, 1200.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1620
      

      Notice that in the first case, "org.apache.drill.exec.store.pojo.PojoRecordReader" is used.

        Issue Links

          Activity

          Hide
          rkins Rahul Challapalli added a comment -

          Krystal Can you confirm whether this a regression from 1.6 ?

          Show
          rkins Rahul Challapalli added a comment - Krystal Can you confirm whether this a regression from 1.6 ?
          Hide
          jni Jinfeng Ni added a comment - - edited

          I run the query on 1.4.0, and saw the same problem. I have not checked earlier version. But it's likely that this problem has been there for long time.

          This bug also happened on 1.0.0 release.

          Show
          jni Jinfeng Ni added a comment - - edited I run the query on 1.4.0, and saw the same problem. I have not checked earlier version. But it's likely that this problem has been there for long time. This bug also happened on 1.0.0 release.
          Hide
          khfaraaz Khurram Faraaz added a comment -

          Let me know and I can share data, this is from 1.9.0

          0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`;
          +---------+
          | EXPR$0  |
          +---------+
          | 0       |
          +---------+
          1 row selected (17.973 seconds)
          0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir0) from `DRILL_4589`;
          +------+------+
          | text | json |
          +------+------+
          | 00-00    Screen
          00-01      Project(EXPR$0=[$0])
          00-02        Project(EXPR$0=[$0])
          00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@2e50797e[columns = null, isStarQuery = false, isSkipQuery = false]])
          
          0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`;
          +---------+
          | EXPR$0  |
          +---------+
          | 0       |
          +---------+
          1 row selected (11.24 seconds)
          0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir1) from `DRILL_4589`;
          +------+------+
          | text | json |
          +------+------+
          | 00-00    Screen
          00-01      Project(EXPR$0=[$0])
          00-02        Project(EXPR$0=[$0])
          00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]])
          

          Querying both dir0 and dir1 in the same query returns correct results.

          0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`;
          +-----------+-----------+
          |  EXPR$0   |  EXPR$1   |
          +-----------+-----------+
          | 30148545  | 30144920  |
          +-----------+-----------+
          1 row selected (69.032 seconds)
          
          Show
          khfaraaz Khurram Faraaz added a comment - Let me know and I can share data, this is from 1.9.0 0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`; +---------+ | EXPR$0 | +---------+ | 0 | +---------+ 1 row selected (17.973 seconds) 0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir0) from `DRILL_4589`; +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(EXPR$0=[$0]) 00-02 Project(EXPR$0=[$0]) 00-03 Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@2e50797e[columns = null, isStarQuery = false, isSkipQuery = false]]) 0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`; +---------+ | EXPR$0 | +---------+ | 0 | +---------+ 1 row selected (11.24 seconds) 0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir1) from `DRILL_4589`; +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(EXPR$0=[$0]) 00-02 Project(EXPR$0=[$0]) 00-03 Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]]) Querying both dir0 and dir1 in the same query returns correct results. 0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`; +-----------+-----------+ | EXPR$0 | EXPR$1 | +-----------+-----------+ | 30148545 | 30144920 | +-----------+-----------+ 1 row selected (69.032 seconds)
          Hide
          arina Arina Ielchiieva added a comment -

          Looks like the problem is with ConvertCountToDirectScan rule when we check number of null values in column. oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)) will return 0 if column does not exist,
          It will also return 0 if column has only null values. In case of dir0 or any other file system partition or implicit columns they are not present in columnValueCounts map.
          It’s good idea to convert to Direct Scan when oldGrpScan.getColumnValueCount returns 0, since count will return 0 anyway and we won’t have to spend time reading all table files.
          We might return -1 for the cases when column is not found and read all table files. This will work totally fine for file system partition and implicit columns but if column doesn’t exist for real we’ll read all table files in vein.
          Unfortunately we can’t find out if column is file system partition or implicit in ConvertCountToDirectScan since we don’t have access to session OptionManager where current file system partition and implicit columns names are stored (you know, they can be changed at runtime). In ParquetGroupScan we do have access to OptionManager using formatPlugin.getContext().getOptionManager() but this is system option manager and it doesn’t hold information about session options (current file system partition and implicit columns names can be changed at session level).

          Show
          arina Arina Ielchiieva added a comment - Looks like the problem is with ConvertCountToDirectScan rule when we check number of null values in column . oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)) will return 0 if column does not exist , It will also return 0 if column has only null values. In case of dir0 or any other file system partition or implicit columns they are not present in columnValueCounts map. It’s good idea to convert to Direct Scan when oldGrpScan.getColumnValueCount returns 0, since count will return 0 anyway and we won’t have to spend time reading all table files. We might return -1 for the cases when column is not found and read all table files. This will work totally fine for file system partition and implicit columns but if column doesn’t exist for real we’ll read all table files in vein. Unfortunately we can’t find out if column is file system partition or implicit in ConvertCountToDirectScan since we don’t have access to session OptionManager where current file system partition and implicit columns names are stored (you know, they can be changed at runtime). In ParquetGroupScan we do have access to OptionManager using formatPlugin.getContext().getOptionManager() but this is system option manager and it doesn’t hold information about session options (current file system partition and implicit columns names can be changed at session level).
          Hide
          arina Arina Ielchiieva added a comment -

          From the discussion with Jinfeng Ni:

          Returning -1 for implicit columns would solve this problem, but it would regress for "select count(nonExistCol)". Basically, 4 types of column count statistics:
          1) column exists and meta data has the statistics: return correct stat
          2) column exists and no meta data : return -1
          3) column does not exist ==> count(nonExistCol) = 0, return 0
          4) implicit columns : parquet meta data does not have such column. But such columns do exists : currently return incorrect 0. should return -1.

          The ideal solution is to differentiate case 3 and case 4. If we could not find ideal solution, we then have no choice but consider something else.

          Show
          arina Arina Ielchiieva added a comment - From the discussion with Jinfeng Ni : Returning -1 for implicit columns would solve this problem, but it would regress for "select count(nonExistCol)". Basically, 4 types of column count statistics: 1) column exists and meta data has the statistics: return correct stat 2) column exists and no meta data : return -1 3) column does not exist ==> count(nonExistCol) = 0, return 0 4) implicit columns : parquet meta data does not have such column. But such columns do exists : currently return incorrect 0. should return -1. The ideal solution is to differentiate case 3 and case 4. If we could not find ideal solution, we then have no choice but consider something else.
          Hide
          arina Arina Ielchiieva added a comment - - edited

          Fix will cover the following aspects:

          1. ConvertCountToDirectScan will be able to distinguish between implicit / directory and non-existent columns, relates to current Jira DRILL-4735.
          To achieve this `Agg_on_scan` and `Agg_on_proj_on_scan` rules will take new parameter in constructor OptimizerRulesContext, similar for prune scan rules. It will help to find out if column is implicit / directory or not. OptimizerRulesContext has access to session options through PlannerSettings which are crucial for defining current implicit / directory column names. After we receive list of columns to which rule will be applied, we'll checks if this list contains implicit or directory columns. If contains, we won't apply the rule.

          2. ConvertCountToDirectScan rule to be applicable for 2 or more COUNT aggregates, relates to DRILL-1691.
          When column statistics is available we use PojoRecordReader to return its value. PojoRecordReader requires exact model. In our case we'll need reader that will allow dynamic model usage (the one where we don't know how many columns it will have). For this purpose DynamicPojoRecordReader will be used. Instead of exact model it will accept dynamic model represented by List<LinkedHashMap<String, Object>> records, list of maps where key -> column name, value -> column value. Common logic between PojoRecordReader and DynamicPojoRecordReader will extracted in abstract parent class.

          3. Currently when ConvertCountToDirectScan is applied in plan we see the following:

          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]])
          

          User has no idea that column statistics was used to calculate the result, if partition pruning took place etc, relates to DRILL-5357 and DRILL-3407.
          Currently we use DirectGroupScan to hold our record reader. To include more information we'll extend DirectGroupScan to MetadataDirectGroupScan which will contain information about read files if any. Also PojoRecordReader and DirectPojoRecordReader toString methods will be overridden to show meaningful information to the user.
          Example:

          Scan(groupscan=[usedMetadata = true, files = [/tpch/nation.parquet], DynamicPojoRecordReader{records=[{count0=25, count1=25, count2=25}]}])
          
          Show
          arina Arina Ielchiieva added a comment - - edited Fix will cover the following aspects: 1. ConvertCountToDirectScan will be able to distinguish between implicit / directory and non-existent columns, relates to current Jira DRILL-4735 . To achieve this `Agg_on_scan` and `Agg_on_proj_on_scan` rules will take new parameter in constructor OptimizerRulesContext , similar for prune scan rules . It will help to find out if column is implicit / directory or not. OptimizerRulesContext has access to session options through PlannerSettings which are crucial for defining current implicit / directory column names. After we receive list of columns to which rule will be applied, we'll checks if this list contains implicit or directory columns. If contains, we won't apply the rule. 2. ConvertCountToDirectScan rule to be applicable for 2 or more COUNT aggregates, relates to DRILL-1691 . When column statistics is available we use PojoRecordReader to return its value. PojoRecordReader requires exact model. In our case we'll need reader that will allow dynamic model usage (the one where we don't know how many columns it will have). For this purpose DynamicPojoRecordReader will be used. Instead of exact model it will accept dynamic model represented by List<LinkedHashMap<String, Object>> records , list of maps where key -> column name, value -> column value. Common logic between PojoRecordReader and DynamicPojoRecordReader will extracted in abstract parent class. 3. Currently when ConvertCountToDirectScan is applied in plan we see the following: Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]]) User has no idea that column statistics was used to calculate the result, if partition pruning took place etc, relates to DRILL-5357 and DRILL-3407 . Currently we use DirectGroupScan to hold our record reader. To include more information we'll extend DirectGroupScan to MetadataDirectGroupScan which will contain information about read files if any. Also PojoRecordReader and DirectPojoRecordReader toString methods will be overridden to show meaningful information to the user. Example: Scan(groupscan=[usedMetadata = true, files = [/tpch/nation.parquet], DynamicPojoRecordReader{records=[{count0=25, count1=25, count2=25}]}])
          Hide
          jni Jinfeng Ni added a comment -

          Arina Ielchiieva, the proposal looks good to me. It's great that you went beyond the scope of this JIRA, and is going to fixes couple of related issues. Thanks.

          Show
          jni Jinfeng Ni added a comment - Arina Ielchiieva , the proposal looks good to me. It's great that you went beyond the scope of this JIRA, and is going to fixes couple of related issues. Thanks.
          Hide
          arina Arina Ielchiieva added a comment - - edited

          During implementation some details have changed:
          1. It turned out that we can get access to session options directly in ConvertCountToDirectScan using PrelUtil.getPlannerSettings(call.getPlanner()) so now there is no need to pass OptimizerRulesContext to ConvertCountToDirectScan. We will skip applying this rule if directory column is present in selection, on the contrary for implicit columns, we'll set count result to total records count since, they are based on the files and there is no data without a file. Also there has been done some refactoring in ConvertCountToDirectScan, counts collection logic was encapsulated in CountsCollector class which is a helper class.

          2. We still introduced DynamicPojoRecordReader class but it would accept two parameters. First schema represented by LinkedHashMap<String, Class<?>> and second records itself represented by List<List<T>>. We force user to pass schema to cover the case when there is no records to be read but we still need schema to proceed. If records of the same type, user may set T to that very type, if records contains different types, T should be set to Object.

          3. MetadataDirectGroupScan string representation now includes also number of files:

          [usedMetadata = true, files = [/tpch/nation.parquet], numFiles = 1]
          
          Show
          arina Arina Ielchiieva added a comment - - edited During implementation some details have changed: 1. It turned out that we can get access to session options directly in ConvertCountToDirectScan using PrelUtil.getPlannerSettings(call.getPlanner()) so now there is no need to pass OptimizerRulesContext to ConvertCountToDirectScan . We will skip applying this rule if directory column is present in selection, on the contrary for implicit columns, we'll set count result to total records count since, they are based on the files and there is no data without a file. Also there has been done some refactoring in ConvertCountToDirectScan , counts collection logic was encapsulated in CountsCollector class which is a helper class. 2. We still introduced DynamicPojoRecordReader class but it would accept two parameters. First schema represented by LinkedHashMap<String, Class<?>> and second records itself represented by List<List<T>> . We force user to pass schema to cover the case when there is no records to be read but we still need schema to proceed. If records of the same type, user may set T to that very type, if records contains different types, T should be set to Object . 3. MetadataDirectGroupScan string representation now includes also number of files: [usedMetadata = true, files = [/tpch/nation.parquet], numFiles = 1]
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user arina-ielchiieva opened a pull request:

          https://github.com/apache/drill/pull/882

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.
          CountsCollector class was introduced to encapsulate counts collection logic.

          3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used
          during calculation and for which files it was applied.

          Details in Jira DRILL-4735(https://issues.apache.org/jira/browse/DRILL-4735).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/drill/pull/882.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #882


          commit 7d75460132f41f2f075d9d279bf71e9e43b060b8
          Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
          Date: 2017-07-20T16:26:44Z

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.
          CountsCollector class was introduced to encapsulate counts collection logic.

          3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used
          during calculation and for which files it was applied.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user arina-ielchiieva opened a pull request: https://github.com/apache/drill/pull/882 DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied. Details in Jira DRILL-4735 ( https://issues.apache.org/jira/browse/DRILL-4735 ). You can merge this pull request into a Git repository by running: $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #882 commit 7d75460132f41f2f075d9d279bf71e9e43b060b8 Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com> Date: 2017-07-20T16:26:44Z DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131446392

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java —
          @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
          @Override
          public void onMatch(RelOptRuleCall call) {
          final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);

          • final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
          • final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
            + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
            + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;

          final GroupScan oldGrpScan = scan.getGroupScan();
          final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());

          • // Only apply the rule when :
            + // Only apply the rule when:
            // 1) scan knows the exact row count in getSize() call,
            // 2) No GroupBY key,
          • // 3) only one agg function (Check if it's count below).
          • // 4) No distinct agg call.
            + // 3) No distinct agg call.
            if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
            && agg.getGroupCount() == 0
          • && agg.getAggCallList().size() == 1
            && !agg.containsDistinctCall())) { return; }
          • AggregateCall aggCall = agg.getAggCallList().get(0);
            -
          • if (aggCall.getAggregation().getName().equals("COUNT") ) {
            -
          • long cnt = 0;
          • // count == > empty arg ==> rowCount
          • // count(Not-null-input) ==> rowCount
          • if (aggCall.getArgList().isEmpty() ||
          • (aggCall.getArgList().size() == 1 &&
          • ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - }

            else if (aggCall.getArgList().size() == 1) {

          • // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
          • int index = aggCall.getArgList().get(0);
            -
          • if (proj != null) {
          • // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
          • // For instance,
          • // Agg - count($0)
          • // \
          • // Proj - Exp={$1}
          • // \
          • // Scan (col1, col2).
          • // return count of "col2" in Scan's metadata, if found.
            -
          • if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - }

            else

            { - return; // do not apply for all other cases. - }
          • }
            + final CountsCollector countsCollector = new CountsCollector(settings);
            + // if counts were not collected, rule won't be applied
            + if (!countsCollector.collect(agg, scan, project)) { + return; + }
          • String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            + final RelDataType scanRowType = constructDataType(agg);
          • cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
          • if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - }
          • } else { - return; // do nothing. - }

            + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
            + buildSchema(scanRowType.getFieldNames()),
            + Collections.singletonList(countsCollector.getCounts()));

          • RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory());
            + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
            + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
          • final ScanPrel newScan = ScanPrel.create(scan,
          • scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt),
          • scanRowType);
            + final ScanPrel newScan = ScanPrel.create(scan,
            + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan,
            + scanRowType);
          • List<RexNode> exprs = Lists.newArrayList();
          • exprs.add(RexInputRef.of(0, scanRowType));
            + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
            + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
          • final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
          • .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType());
            + call.transformTo(newProject);
            + }
          • call.transformTo(newProj);
            + /**
            + * For each aggregate call creates field with "count$" prefix and bigint type.
            + * Constructs record type for created fields.
            + *
            + * @param aggregateRel aggregate relation expression
            + * @return record type
            + */
            + private RelDataType constructDataType(DrillAggregateRel aggregateRel)
            Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); }

          /**

          • * Class to represent the count aggregate result.
            + * Builds schema based on given field names.
            + * Type for each schema is set to long.class.
            + *
            + * @param fieldNames field names
            + * @return schema
            */
          • public static class CountQueryResult {
          • public long count;
            + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames)
            Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName}
          • public CountQueryResult(long cnt) {
          • this.count = cnt;
            + /**
            + * For each field creates row expression.
            + *
            + * @param rowType row type
            + * @return list of row expressions
            + */
            + private List<RexNode> prepareFieldExpressions(RelDataType rowType)
            Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; }
          • private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) {
          • List<RelDataTypeField> fields = Lists.newArrayList();
          • fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT)));
            + /**
            + * Helper class to collect counts based on metadata information.
            + * For example, for parquet files it can be obtained from parquet footer (total row count)
            + * or from parquet metadata files (column counts).
            + */
            + private class CountsCollector {
            +
            + private final PlannerSettings settings;
            + private final Set<String> implicitColumnsNames;
            + private final List<SchemaPath> columns;
            + private final List<Long> counts;
            +
            + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + }
          • return new RelRecordType(fields);
          • }
            + /**
            + * Collects counts for each aggregation call.
            + * Will fail to collect counts if:
            + * <ol>
            + * <li>was not able to determine count for at least one aggregation call</li>
            + * <li>count if used for file system partition column</li>
            + * </ol>
            + *
            + * @param agg aggregate relational expression
            + * @param scan scan relational expression
            + * @param project project relational expression
            + *
            + * @return true if counts were collected, false otherwise
            + */
            + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + }

            +
            + /**
            + * @return list of counts
            + */
            + List<Long> getCounts() {
            + return counts;

              • End diff –

          Minor comment: The API of this class seems to assume user would call collect() and only it return true then call getCounts(). What will happen if user call getCounts() even when collect() return false? Seems user would get some partial information. Probably return an empty collection in case requirements not meet.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131446392 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java — @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, // 3) only one agg function (Check if it's count below). // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } AggregateCall aggCall = agg.getAggCallList().get(0); - if (aggCall.getAggregation().getName().equals("COUNT") ) { - long cnt = 0; // count == > empty arg ==> rowCount // count(Not-null-input) ==> rowCount if (aggCall.getArgList().isEmpty() || (aggCall.getArgList().size() == 1 && ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) // \ // Proj - Exp={$1} // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); final ScanPrel newScan = ScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); List<RexNode> exprs = Lists.newArrayList(); exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); } /** * Class to represent the count aggregate result. + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema */ public static class CountQueryResult { public long count; + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName} public CountQueryResult(long cnt) { this.count = cnt; + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List<RexNode> prepareFieldExpressions(RelDataType rowType) Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; } private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { List<RelDataTypeField> fields = Lists.newArrayList(); fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); + /** + * Helper class to collect counts based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + */ + private class CountsCollector { + + private final PlannerSettings settings; + private final Set<String> implicitColumnsNames; + private final List<SchemaPath> columns; + private final List<Long> counts; + + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + } return new RelRecordType(fields); } + /** + * Collects counts for each aggregation call. + * Will fail to collect counts if: + * <ol> + * <li>was not able to determine count for at least one aggregation call</li> + * <li>count if used for file system partition column</li> + * </ol> + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * + * @return true if counts were collected, false otherwise + */ + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + } + + /** + * @return list of counts + */ + List<Long> getCounts() { + return counts; End diff – Minor comment: The API of this class seems to assume user would call collect() and only it return true then call getCounts(). What will happen if user call getCounts() even when collect() return false? Seems user would get some partial information. Probably return an empty collection in case requirements not meet.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131447047

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java —
          @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
          @Override
          public void onMatch(RelOptRuleCall call) {
          final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);

          • final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
          • final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
            + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
            + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;

          final GroupScan oldGrpScan = scan.getGroupScan();
          final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());

          • // Only apply the rule when :
            + // Only apply the rule when:
            // 1) scan knows the exact row count in getSize() call,
            // 2) No GroupBY key,
          • // 3) only one agg function (Check if it's count below).
          • // 4) No distinct agg call.
            + // 3) No distinct agg call.
            if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
            && agg.getGroupCount() == 0
          • && agg.getAggCallList().size() == 1
            && !agg.containsDistinctCall())) { return; }
          • AggregateCall aggCall = agg.getAggCallList().get(0);
            -
          • if (aggCall.getAggregation().getName().equals("COUNT") ) {
            -
          • long cnt = 0;
          • // count == > empty arg ==> rowCount
          • // count(Not-null-input) ==> rowCount
          • if (aggCall.getArgList().isEmpty() ||
          • (aggCall.getArgList().size() == 1 &&
          • ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - }

            else if (aggCall.getArgList().size() == 1) {

          • // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
          • int index = aggCall.getArgList().get(0);
            -
          • if (proj != null) {
          • // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
          • // For instance,
          • // Agg - count($0)
          • // \
          • // Proj - Exp={$1}
          • // \
          • // Scan (col1, col2).
          • // return count of "col2" in Scan's metadata, if found.
            -
          • if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - }

            else

            { - return; // do not apply for all other cases. - }
          • }
            + final CountsCollector countsCollector = new CountsCollector(settings);
            + // if counts were not collected, rule won't be applied
            + if (!countsCollector.collect(agg, scan, project)) { + return; + }
          • String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            + final RelDataType scanRowType = constructDataType(agg);
          • cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
          • if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - }
          • } else { - return; // do nothing. - }

            + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
            + buildSchema(scanRowType.getFieldNames()),
            + Collections.singletonList(countsCollector.getCounts()));

          • RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory());
            + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
            + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
          • final ScanPrel newScan = ScanPrel.create(scan,
          • scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt),
          • scanRowType);
            + final ScanPrel newScan = ScanPrel.create(scan,
            + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan,
            + scanRowType);
          • List<RexNode> exprs = Lists.newArrayList();
          • exprs.add(RexInputRef.of(0, scanRowType));
            + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
            + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
          • final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
          • .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType());
            + call.transformTo(newProject);
            + }
          • call.transformTo(newProj);
            + /**
            + * For each aggregate call creates field with "count$" prefix and bigint type.
            + * Constructs record type for created fields.
            + *
            + * @param aggregateRel aggregate relation expression
            + * @return record type
            + */
            + private RelDataType constructDataType(DrillAggregateRel aggregateRel)
            Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); }

          /**

          • * Class to represent the count aggregate result.
            + * Builds schema based on given field names.
            + * Type for each schema is set to long.class.
            + *
            + * @param fieldNames field names
            + * @return schema
            */
          • public static class CountQueryResult {
          • public long count;
            + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames)
            Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName}
          • public CountQueryResult(long cnt) {
          • this.count = cnt;
            + /**
            + * For each field creates row expression.
            + *
            + * @param rowType row type
            + * @return list of row expressions
            + */
            + private List<RexNode> prepareFieldExpressions(RelDataType rowType)
            Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; }
          • private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) {
          • List<RelDataTypeField> fields = Lists.newArrayList();
          • fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT)));
            + /**
            + * Helper class to collect counts based on metadata information.
            + * For example, for parquet files it can be obtained from parquet footer (total row count)
            + * or from parquet metadata files (column counts).
            + */
            + private class CountsCollector {
            +
            + private final PlannerSettings settings;
            + private final Set<String> implicitColumnsNames;
            + private final List<SchemaPath> columns;
            + private final List<Long> counts;
            +
            + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + }
          • return new RelRecordType(fields);
          • }
            + /**
            + * Collects counts for each aggregation call.
            + * Will fail to collect counts if:
            + * <ol>
            + * <li>was not able to determine count for at least one aggregation call</li>
            + * <li>count if used for file system partition column</li>
            + * </ol>
            + *
            + * @param agg aggregate relational expression
            + * @param scan scan relational expression
            + * @param project project relational expression
            + *
            + * @return true if counts were collected, false otherwise
            + */
            + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + }

            +
            + /**
            + * @return list of counts
            + */
            + List<Long> getCounts() {

              • End diff –

          In stead of returning list of Long, can we return either a map from schemaPath to counts, or list of pair schemaPath, count? Returning just the lists of counts make it hard to figure out which count corresponds to which aggregate function.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131447047 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java — @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, // 3) only one agg function (Check if it's count below). // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } AggregateCall aggCall = agg.getAggCallList().get(0); - if (aggCall.getAggregation().getName().equals("COUNT") ) { - long cnt = 0; // count == > empty arg ==> rowCount // count(Not-null-input) ==> rowCount if (aggCall.getArgList().isEmpty() || (aggCall.getArgList().size() == 1 && ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) // \ // Proj - Exp={$1} // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); final ScanPrel newScan = ScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); List<RexNode> exprs = Lists.newArrayList(); exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); } /** * Class to represent the count aggregate result. + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema */ public static class CountQueryResult { public long count; + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName} public CountQueryResult(long cnt) { this.count = cnt; + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List<RexNode> prepareFieldExpressions(RelDataType rowType) Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; } private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { List<RelDataTypeField> fields = Lists.newArrayList(); fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); + /** + * Helper class to collect counts based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + */ + private class CountsCollector { + + private final PlannerSettings settings; + private final Set<String> implicitColumnsNames; + private final List<SchemaPath> columns; + private final List<Long> counts; + + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + } return new RelRecordType(fields); } + /** + * Collects counts for each aggregation call. + * Will fail to collect counts if: + * <ol> + * <li>was not able to determine count for at least one aggregation call</li> + * <li>count if used for file system partition column</li> + * </ol> + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * + * @return true if counts were collected, false otherwise + */ + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + } + + /** + * @return list of counts + */ + List<Long> getCounts() { End diff – In stead of returning list of Long, can we return either a map from schemaPath to counts, or list of pair schemaPath, count? Returning just the lists of counts make it hard to figure out which count corresponds to which aggregate function.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131445381

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java —
          @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
          @Override
          public void onMatch(RelOptRuleCall call) {
          final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);

          • final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
          • final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
            + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
            + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;

          final GroupScan oldGrpScan = scan.getGroupScan();
          final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());

          • // Only apply the rule when :
            + // Only apply the rule when:
            // 1) scan knows the exact row count in getSize() call,
            // 2) No GroupBY key,
          • // 3) only one agg function (Check if it's count below).
          • // 4) No distinct agg call.
            + // 3) No distinct agg call.
            if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
            && agg.getGroupCount() == 0
          • && agg.getAggCallList().size() == 1
            && !agg.containsDistinctCall())) { return; }
          • AggregateCall aggCall = agg.getAggCallList().get(0);
            -
          • if (aggCall.getAggregation().getName().equals("COUNT") ) {
            -
          • long cnt = 0;
          • // count == > empty arg ==> rowCount
          • // count(Not-null-input) ==> rowCount
          • if (aggCall.getArgList().isEmpty() ||
          • (aggCall.getArgList().size() == 1 &&
          • ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - }

            else if (aggCall.getArgList().size() == 1) {

          • // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
          • int index = aggCall.getArgList().get(0);
            -
          • if (proj != null) {
          • // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
          • // For instance,
          • // Agg - count($0)
          • // \
          • // Proj - Exp={$1}
          • // \
          • // Scan (col1, col2).
          • // return count of "col2" in Scan's metadata, if found.
            -
          • if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - }

            else

            { - return; // do not apply for all other cases. - }
          • }
            + final CountsCollector countsCollector = new CountsCollector(settings);
            + // if counts were not collected, rule won't be applied
            + if (!countsCollector.collect(agg, scan, project)) { + return; + }
          • String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            + final RelDataType scanRowType = constructDataType(agg);
          • cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
          • if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - }
          • } else { - return; // do nothing. - }

            + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
            + buildSchema(scanRowType.getFieldNames()),
            + Collections.singletonList(countsCollector.getCounts()));

          • RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory());
            + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
            + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
          • final ScanPrel newScan = ScanPrel.create(scan,
          • scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt),
          • scanRowType);
            + final ScanPrel newScan = ScanPrel.create(scan,
            + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan,
            + scanRowType);
          • List<RexNode> exprs = Lists.newArrayList();
          • exprs.add(RexInputRef.of(0, scanRowType));
            + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
            + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
          • final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
          • .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType());
            + call.transformTo(newProject);
            + }
          • call.transformTo(newProj);
            + /**
            + * For each aggregate call creates field with "count$" prefix and bigint type.
            + * Constructs record type for created fields.
            + *
            + * @param aggregateRel aggregate relation expression
            + * @return record type
            + */
            + private RelDataType constructDataType(DrillAggregateRel aggregateRel) {
            + List<RelDataTypeField> fields = new ArrayList<>();
            + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) {
            + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT));
              • End diff –

          The fieldname in RelDataTypeField "count$0", "count$1" is not very informative. I think we could either get the field name from the aggregator operator, or use "count$colname" to indicate what column's count it represents.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131445381 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java — @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, // 3) only one agg function (Check if it's count below). // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } AggregateCall aggCall = agg.getAggCallList().get(0); - if (aggCall.getAggregation().getName().equals("COUNT") ) { - long cnt = 0; // count == > empty arg ==> rowCount // count(Not-null-input) ==> rowCount if (aggCall.getArgList().isEmpty() || (aggCall.getArgList().size() == 1 && ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) // \ // Proj - Exp={$1} // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); final ScanPrel newScan = ScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); List<RexNode> exprs = Lists.newArrayList(); exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); End diff – The fieldname in RelDataTypeField "count$0", "count$1" is not very informative. I think we could either get the field name from the aggregator operator, or use "count$colname" to indicate what column's count it represents.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131449948

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java —
          @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
          @Override
          public void onMatch(RelOptRuleCall call) {
          final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);

          • final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
          • final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
            + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
            + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;

          final GroupScan oldGrpScan = scan.getGroupScan();
          final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());

          • // Only apply the rule when :
            + // Only apply the rule when:
            // 1) scan knows the exact row count in getSize() call,
            // 2) No GroupBY key,
          • // 3) only one agg function (Check if it's count below).
          • // 4) No distinct agg call.
            + // 3) No distinct agg call.
            if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
            && agg.getGroupCount() == 0
          • && agg.getAggCallList().size() == 1
            && !agg.containsDistinctCall())) { return; }
          • AggregateCall aggCall = agg.getAggCallList().get(0);
            -
          • if (aggCall.getAggregation().getName().equals("COUNT") ) {
            -
          • long cnt = 0;
          • // count == > empty arg ==> rowCount
          • // count(Not-null-input) ==> rowCount
          • if (aggCall.getArgList().isEmpty() ||
          • (aggCall.getArgList().size() == 1 &&
          • ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - }

            else if (aggCall.getArgList().size() == 1) {

          • // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
          • int index = aggCall.getArgList().get(0);
            -
          • if (proj != null) {
          • // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
          • // For instance,
          • // Agg - count($0)
          • // \
          • // Proj - Exp={$1}
          • // \
          • // Scan (col1, col2).
          • // return count of "col2" in Scan's metadata, if found.
            -
          • if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - }

            else

            { - return; // do not apply for all other cases. - }
          • }
            + final CountsCollector countsCollector = new CountsCollector(settings);
            + // if counts were not collected, rule won't be applied
            + if (!countsCollector.collect(agg, scan, project)) { + return; + }
          • String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            + final RelDataType scanRowType = constructDataType(agg);
          • cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
          • if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - }
          • } else { - return; // do nothing. - }

            + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
            + buildSchema(scanRowType.getFieldNames()),
            + Collections.singletonList(countsCollector.getCounts()));

          • RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory());
            + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
            + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
          • final ScanPrel newScan = ScanPrel.create(scan,
          • scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt),
          • scanRowType);
            + final ScanPrel newScan = ScanPrel.create(scan,
            + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan,
            + scanRowType);
          • List<RexNode> exprs = Lists.newArrayList();
          • exprs.add(RexInputRef.of(0, scanRowType));
            + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
            + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
          • final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
          • .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType());
            + call.transformTo(newProject);
            + }
          • call.transformTo(newProj);
            + /**
            + * For each aggregate call creates field with "count$" prefix and bigint type.
            + * Constructs record type for created fields.
            + *
            + * @param aggregateRel aggregate relation expression
            + * @return record type
            + */
            + private RelDataType constructDataType(DrillAggregateRel aggregateRel)
            Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); }

          /**

          • * Class to represent the count aggregate result.
            + * Builds schema based on given field names.
            + * Type for each schema is set to long.class.
            + *
            + * @param fieldNames field names
            + * @return schema
            */
          • public static class CountQueryResult {
          • public long count;
            + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames)
            Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName}
          • public CountQueryResult(long cnt) {
          • this.count = cnt;
            + /**
            + * For each field creates row expression.
            + *
            + * @param rowType row type
            + * @return list of row expressions
            + */
            + private List<RexNode> prepareFieldExpressions(RelDataType rowType)
            Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; }
          • private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) {
          • List<RelDataTypeField> fields = Lists.newArrayList();
          • fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT)));
            + /**
            + * Helper class to collect counts based on metadata information.
            + * For example, for parquet files it can be obtained from parquet footer (total row count)
            + * or from parquet metadata files (column counts).
            + */
            + private class CountsCollector {
            +
            + private final PlannerSettings settings;
            + private final Set<String> implicitColumnsNames;
            + private final List<SchemaPath> columns;
            + private final List<Long> counts;
            +
            + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + }
          • return new RelRecordType(fields);
          • }
            + /**
            + * Collects counts for each aggregation call.
            + * Will fail to collect counts if:
            + * <ol>
            + * <li>was not able to determine count for at least one aggregation call</li>
            + * <li>count if used for file system partition column</li>
            + * </ol>
            + *
            + * @param agg aggregate relational expression
            + * @param scan scan relational expression
            + * @param project project relational expression
            + *
            + * @return true if counts were collected, false otherwise
            + */
            + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + }

            +
            + /**
            + * @return list of counts
            + */
            + List<Long> getCounts()

            { + return counts; + }

            +
            + /**
            + * For each aggregate call if determine if count can be calculated. Collects counts only for COUNT function.
            + * For star, not null expressions and implicit columns sets count to total record number.
            + * For other cases obtains counts from group scan operator.
            + *
            + * @param agg aggregate relational expression
            + * @param scan scan relational expression
            + * @param project project relational expression
            + * @return true if counts were collected, false otherwise
            + */
            + private boolean calculateCounts(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) {
            + final GroupScan oldGrpScan = scan.getGroupScan();
            + final long totalRecordCount = oldGrpScan.getScanStats(settings).getRecordCount();
            +
            + for (AggregateCall aggCall : agg.getAggCallList())

            Unknown macro: { + long cnt; + + // rule can be applied only for count function + if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) { + return false; + }
            +
            + if (containsStarOrNotNullInput(aggCall, agg)) { + cnt = totalRecordCount; + + } else if (aggCall.getArgList().size() == 1) {
            + // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
            + int index = aggCall.getArgList().get(0);
            +
            + if (project != null) {
            + // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
            + // For instance,
            + // Agg - count($0)
            + // \
            + // Proj - Exp={$1}
            + // \
            + // Scan (col1, col2).
            + // return count of "col2" in Scan's metadata, if found.
            + if (!(project.getProjects().get(index) instanceof RexInputRef)) { + return false; // do not apply for all other cases. + }
            +
            + index = ((RexInputRef) project.getProjects().get(index)).getIndex();
            + }
            +
            + String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            +
            + // for implicit column count will the same as total record count
            + if (implicitColumnsNames.contains(columnName)) { + cnt = totalRecordCount; + } else {
            + SchemaPath simplePath = SchemaPath.getSimplePath(columnName);
            + columns.add(simplePath);
            +
            + cnt = oldGrpScan.getColumnValueCount(simplePath);
            + if (cnt == GroupScan.NO_COLUMN_STATS) { + // if column stats is not available don't apply this rule + return false; + }
            + }
            + } else { + return false; + } + counts.add(cnt); + }

            + return true;
            + }

          • private GroupScan getCountDirectScan(long cnt) {
          • CountQueryResult res = new CountQueryResult(cnt);
            + /**
            + * Checks if aggregate call contains star or non-null expression:
            + * <pre>
            + * count == > empty arg ==> rowCount
            + * count(Not-null-input) ==> rowCount
            + * </pre>
            + *
            + * @param aggregateCall aggregate call
            + * @param aggregate aggregate relation expression
            + * @return true of aggregate call contains star or non-null expression
            + */
            + private boolean containsStarOrNotNullInput(AggregateCall aggregateCall, DrillAggregateRel aggregate) { + return aggregateCall.getArgList().isEmpty() || + (aggregateCall.getArgList().size() == 1 && + !aggregate.getInput().getRowType().getFieldList().get(aggregateCall.getArgList().get(0)).getType().isNullable()); + }
          • PojoRecordReader<CountQueryResult> reader = new PojoRecordReader<CountQueryResult>(CountQueryResult.class,
          • Collections.singleton(res).iterator());
            + /**
            + * Checks if stores list of columns contains file system partition columns.
            + *
            + * @return true if contains partition columns, false otherwise
            + */
            + private boolean containsPartitionColumns() {
            + final ColumnExplorer columnExplorer = new ColumnExplorer(settings.getOptions(), columns);
              • End diff –

          Can ColumnExplorer provide an API to tell if one particular column is the partition column? Seems to me that with such API, we could exit from the for loop in line 238, in stead of first building column list, then call this method to check if the column lists contain a file system partition column.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131449948 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java — @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, // 3) only one agg function (Check if it's count below). // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } AggregateCall aggCall = agg.getAggCallList().get(0); - if (aggCall.getAggregation().getName().equals("COUNT") ) { - long cnt = 0; // count == > empty arg ==> rowCount // count(Not-null-input) ==> rowCount if (aggCall.getArgList().isEmpty() || (aggCall.getArgList().size() == 1 && ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) // \ // Proj - Exp={$1} // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); final ScanPrel newScan = ScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); List<RexNode> exprs = Lists.newArrayList(); exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); } /** * Class to represent the count aggregate result. + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema */ public static class CountQueryResult { public long count; + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName} public CountQueryResult(long cnt) { this.count = cnt; + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List<RexNode> prepareFieldExpressions(RelDataType rowType) Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; } private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { List<RelDataTypeField> fields = Lists.newArrayList(); fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); + /** + * Helper class to collect counts based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + */ + private class CountsCollector { + + private final PlannerSettings settings; + private final Set<String> implicitColumnsNames; + private final List<SchemaPath> columns; + private final List<Long> counts; + + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + } return new RelRecordType(fields); } + /** + * Collects counts for each aggregation call. + * Will fail to collect counts if: + * <ol> + * <li>was not able to determine count for at least one aggregation call</li> + * <li>count if used for file system partition column</li> + * </ol> + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * + * @return true if counts were collected, false otherwise + */ + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); + } + + /** + * @return list of counts + */ + List<Long> getCounts() { + return counts; + } + + /** + * For each aggregate call if determine if count can be calculated. Collects counts only for COUNT function. + * For star, not null expressions and implicit columns sets count to total record number. + * For other cases obtains counts from group scan operator. + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * @return true if counts were collected, false otherwise + */ + private boolean calculateCounts(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + final GroupScan oldGrpScan = scan.getGroupScan(); + final long totalRecordCount = oldGrpScan.getScanStats(settings).getRecordCount(); + + for (AggregateCall aggCall : agg.getAggCallList()) Unknown macro: { + long cnt; + + // rule can be applied only for count function + if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) { + return false; + } + + if (containsStarOrNotNullInput(aggCall, agg)) { + cnt = totalRecordCount; + + } else if (aggCall.getArgList().size() == 1) { + // count(columnName) ==> Agg ( Scan )) ==> columnValueCount + int index = aggCall.getArgList().get(0); + + if (project != null) { + // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. + // For instance, + // Agg - count($0) + // \ + // Proj - Exp={$1} + // \ + // Scan (col1, col2). + // return count of "col2" in Scan's metadata, if found. + if (!(project.getProjects().get(index) instanceof RexInputRef)) { + return false; // do not apply for all other cases. + } + + index = ((RexInputRef) project.getProjects().get(index)).getIndex(); + } + + String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + + // for implicit column count will the same as total record count + if (implicitColumnsNames.contains(columnName)) { + cnt = totalRecordCount; + } else { + SchemaPath simplePath = SchemaPath.getSimplePath(columnName); + columns.add(simplePath); + + cnt = oldGrpScan.getColumnValueCount(simplePath); + if (cnt == GroupScan.NO_COLUMN_STATS) { + // if column stats is not available don't apply this rule + return false; + } + } + } else { + return false; + } + counts.add(cnt); + } + return true; + } private GroupScan getCountDirectScan(long cnt) { CountQueryResult res = new CountQueryResult(cnt); + /** + * Checks if aggregate call contains star or non-null expression: + * <pre> + * count == > empty arg ==> rowCount + * count(Not-null-input) ==> rowCount + * </pre> + * + * @param aggregateCall aggregate call + * @param aggregate aggregate relation expression + * @return true of aggregate call contains star or non-null expression + */ + private boolean containsStarOrNotNullInput(AggregateCall aggregateCall, DrillAggregateRel aggregate) { + return aggregateCall.getArgList().isEmpty() || + (aggregateCall.getArgList().size() == 1 && + !aggregate.getInput().getRowType().getFieldList().get(aggregateCall.getArgList().get(0)).getType().isNullable()); + } PojoRecordReader<CountQueryResult> reader = new PojoRecordReader<CountQueryResult>(CountQueryResult.class, Collections.singleton(res).iterator()); + /** + * Checks if stores list of columns contains file system partition columns. + * + * @return true if contains partition columns, false otherwise + */ + private boolean containsPartitionColumns() { + final ColumnExplorer columnExplorer = new ColumnExplorer(settings.getOptions(), columns); End diff – Can ColumnExplorer provide an API to tell if one particular column is the partition column? Seems to me that with such API, we could exit from the for loop in line 238, in stead of first building column list, then call this method to check if the column lists contain a file system partition column.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131450166

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java —
          @@ -0,0 +1,86 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.drill.exec.store.direct;
          +
          +import com.fasterxml.jackson.annotation.JsonTypeName;
          +import org.apache.drill.common.exceptions.ExecutionSetupException;
          +import org.apache.drill.common.expression.SchemaPath;
          +import org.apache.drill.exec.physical.base.GroupScan;
          +import org.apache.drill.exec.physical.base.PhysicalOperator;
          +import org.apache.drill.exec.physical.base.ScanStats;
          +import org.apache.drill.exec.store.RecordReader;
          +
          +import java.util.Collection;
          +import java.util.List;
          +
          +/**
          + * Represents direct scan based on metadata information.
          + * For example, for parquet files it can be obtained from parquet footer (total row count)
          + * or from parquet metadata files (column counts).
          + * Contains reader, statistics and list of scanned files if present.
          + */
          +@JsonTypeName("metadata-direct-scan")
          +public class MetadataDirectGroupScan extends DirectGroupScan {
          +
          + private final Collection<String> files;
          +
          + public MetadataDirectGroupScan(RecordReader reader, Collection<String> files)

          { + super(reader); + this.files = files; + }

          +
          + public MetadataDirectGroupScan(RecordReader reader, Collection<String> files, ScanStats stats)

          { + super(reader, stats); + this.files = files; + }

          +
          + @Override
          + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException

          { + assert children == null || children.isEmpty(); + return new MetadataDirectGroupScan(reader, files, stats); + }

          +
          + @Override
          + public GroupScan clone(List<SchemaPath> columns)

          { + return this; + }

          +
          + /**
          + * <p>
          + * Returns string representation of group scan data.
          + * Includes list of files if present.
          + * </p>
          + *
          + * <p>
          + * Example: [usedMetadata = true, files = [/tmp/0_0_0.parquet], numFiles = 1]
          + * </p>
          + *
          + * @return string representation of group scan data
          + */
          + @Override
          + public String getDigest() {
          + StringBuilder builder = new StringBuilder();
          + builder.append("usedMetadata = true, ");
          — End diff –

          This "useMetadata=true" seems to be redundant, since it's for MetadataDirectGS.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131450166 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java — @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.direct; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.store.RecordReader; + +import java.util.Collection; +import java.util.List; + +/** + * Represents direct scan based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + * Contains reader, statistics and list of scanned files if present. + */ +@JsonTypeName("metadata-direct-scan") +public class MetadataDirectGroupScan extends DirectGroupScan { + + private final Collection<String> files; + + public MetadataDirectGroupScan(RecordReader reader, Collection<String> files) { + super(reader); + this.files = files; + } + + public MetadataDirectGroupScan(RecordReader reader, Collection<String> files, ScanStats stats) { + super(reader, stats); + this.files = files; + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + assert children == null || children.isEmpty(); + return new MetadataDirectGroupScan(reader, files, stats); + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + return this; + } + + /** + * <p> + * Returns string representation of group scan data. + * Includes list of files if present. + * </p> + * + * <p> + * Example: [usedMetadata = true, files = [/tmp/0_0_0.parquet] , numFiles = 1] + * </p> + * + * @return string representation of group scan data + */ + @Override + public String getDigest() { + StringBuilder builder = new StringBuilder(); + builder.append("usedMetadata = true, "); — End diff – This "useMetadata=true" seems to be redundant, since it's for MetadataDirectGS.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on a diff in the pull request:

          https://github.com/apache/drill/pull/882#discussion_r131447579

          — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java —
          @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) {
          @Override
          public void onMatch(RelOptRuleCall call) {
          final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0);

          • final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1);
          • final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;
            + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1);
            + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null;

          final GroupScan oldGrpScan = scan.getGroupScan();
          final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());

          • // Only apply the rule when :
            + // Only apply the rule when:
            // 1) scan knows the exact row count in getSize() call,
            // 2) No GroupBY key,
          • // 3) only one agg function (Check if it's count below).
          • // 4) No distinct agg call.
            + // 3) No distinct agg call.
            if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount()
            && agg.getGroupCount() == 0
          • && agg.getAggCallList().size() == 1
            && !agg.containsDistinctCall())) { return; }
          • AggregateCall aggCall = agg.getAggCallList().get(0);
            -
          • if (aggCall.getAggregation().getName().equals("COUNT") ) {
            -
          • long cnt = 0;
          • // count == > empty arg ==> rowCount
          • // count(Not-null-input) ==> rowCount
          • if (aggCall.getArgList().isEmpty() ||
          • (aggCall.getArgList().size() == 1 &&
          • ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - }

            else if (aggCall.getArgList().size() == 1) {

          • // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
          • int index = aggCall.getArgList().get(0);
            -
          • if (proj != null) {
          • // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field.
          • // For instance,
          • // Agg - count($0)
          • // \
          • // Proj - Exp={$1}
          • // \
          • // Scan (col1, col2).
          • // return count of "col2" in Scan's metadata, if found.
            -
          • if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - }

            else

            { - return; // do not apply for all other cases. - }
          • }
            + final CountsCollector countsCollector = new CountsCollector(settings);
            + // if counts were not collected, rule won't be applied
            + if (!countsCollector.collect(agg, scan, project)) { + return; + }
          • String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
            + final RelDataType scanRowType = constructDataType(agg);
          • cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName));
          • if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - }
          • } else { - return; // do nothing. - }

            + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
            + buildSchema(scanRowType.getFieldNames()),
            + Collections.singletonList(countsCollector.getCounts()));

          • RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory());
            + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
            + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
          • final ScanPrel newScan = ScanPrel.create(scan,
          • scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt),
          • scanRowType);
            + final ScanPrel newScan = ScanPrel.create(scan,
            + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan,
            + scanRowType);
          • List<RexNode> exprs = Lists.newArrayList();
          • exprs.add(RexInputRef.of(0, scanRowType));
            + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
            + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
          • final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
          • .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType());
            + call.transformTo(newProject);
            + }
          • call.transformTo(newProj);
            + /**
            + * For each aggregate call creates field with "count$" prefix and bigint type.
            + * Constructs record type for created fields.
            + *
            + * @param aggregateRel aggregate relation expression
            + * @return record type
            + */
            + private RelDataType constructDataType(DrillAggregateRel aggregateRel)
            Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); }

          /**

          • * Class to represent the count aggregate result.
            + * Builds schema based on given field names.
            + * Type for each schema is set to long.class.
            + *
            + * @param fieldNames field names
            + * @return schema
            */
          • public static class CountQueryResult {
          • public long count;
            + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames)
            Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName}
          • public CountQueryResult(long cnt) {
          • this.count = cnt;
            + /**
            + * For each field creates row expression.
            + *
            + * @param rowType row type
            + * @return list of row expressions
            + */
            + private List<RexNode> prepareFieldExpressions(RelDataType rowType)
            Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; }
          • private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) {
          • List<RelDataTypeField> fields = Lists.newArrayList();
          • fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT)));
            + /**
            + * Helper class to collect counts based on metadata information.
            + * For example, for parquet files it can be obtained from parquet footer (total row count)
            + * or from parquet metadata files (column counts).
            + */
            + private class CountsCollector {
            +
            + private final PlannerSettings settings;
            + private final Set<String> implicitColumnsNames;
            + private final List<SchemaPath> columns;
            + private final List<Long> counts;
            +
            + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + }
          • return new RelRecordType(fields);
          • }
            + /**
            + * Collects counts for each aggregation call.
            + * Will fail to collect counts if:
            + * <ol>
            + * <li>was not able to determine count for at least one aggregation call</li>
            + * <li>count if used for file system partition column</li>
            + * </ol>
            + *
            + * @param agg aggregate relational expression
            + * @param scan scan relational expression
            + * @param project project relational expression
            + *
            + * @return true if counts were collected, false otherwise
            + */
            + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) {
            + return calculateCounts(agg, scan, project) && !containsPartitionColumns();
              • End diff –

          If user calls collect(agg, scan, project), the class should clear/reset all the internal collections like "columns", "counts". Otherwise, what if user uses the same CountsCollector instance, and call collect() twice with different inputs?

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/882#discussion_r131447579 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java — @@ -85,109 +91,231 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, // 3) only one agg function (Check if it's count below). // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } AggregateCall aggCall = agg.getAggCallList().get(0); - if (aggCall.getAggregation().getName().equals("COUNT") ) { - long cnt = 0; // count == > empty arg ==> rowCount // count(Not-null-input) ==> rowCount if (aggCall.getArgList().isEmpty() || (aggCall.getArgList().size() == 1 && ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); - } else if (aggCall.getArgList().size() == 1) { // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) // \ // Proj - Exp={$1} // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. - } } + final CountsCollector countsCollector = new CountsCollector(settings); + // if counts were not collected, rule won't be applied + if (!countsCollector.collect(agg, scan, project)) { + return; + } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); + final RelDataType scanRowType = constructDataType(agg); cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; - } } else { - return; // do nothing. - } + final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList(countsCollector.getCounts())); RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); final ScanPrel newScan = ScanPrel.create(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), scanRowType); + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); List<RexNode> exprs = Lists.newArrayList(); exprs.add(RexInputRef.of(0, scanRowType)); + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); + call.transformTo(newProject); + } call.transformTo(newProj); + /** + * For each aggregate call creates field with "count$" prefix and bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel) Unknown macro: { + List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < aggregateRel.getAggCallList().size(); i++) { + RelDataTypeField field = new RelDataTypeFieldImpl("count$" + i, i, aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); } - + return new RelRecordType(fields); } /** * Class to represent the count aggregate result. + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema */ public static class CountQueryResult { public long count; + private LinkedHashMap<String, Class<?>> buildSchema(List<String> fieldNames) Unknown macro: { + LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>(); + for (String fieldName} public CountQueryResult(long cnt) { this.count = cnt; + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List<RexNode> prepareFieldExpressions(RelDataType rowType) Unknown macro: { + List<RexNode> expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); } + return expressions; } private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { List<RelDataTypeField> fields = Lists.newArrayList(); fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); + /** + * Helper class to collect counts based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + */ + private class CountsCollector { + + private final PlannerSettings settings; + private final Set<String> implicitColumnsNames; + private final List<SchemaPath> columns; + private final List<Long> counts; + + CountsCollector(PlannerSettings settings) { + this.settings = settings; + this.implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + this.counts = new ArrayList<>(); + this.columns = new ArrayList<>(); + } return new RelRecordType(fields); } + /** + * Collects counts for each aggregation call. + * Will fail to collect counts if: + * <ol> + * <li>was not able to determine count for at least one aggregation call</li> + * <li>count if used for file system partition column</li> + * </ol> + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * + * @return true if counts were collected, false otherwise + */ + boolean collect(DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + return calculateCounts(agg, scan, project) && !containsPartitionColumns(); End diff – If user calls collect(agg, scan, project), the class should clear/reset all the internal collections like "columns", "counts". Otherwise, what if user uses the same CountsCollector instance, and call collect() twice with different inputs?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arina-ielchiieva closed the pull request at:

          https://github.com/apache/drill/pull/882

          Show
          githubbot ASF GitHub Bot added a comment - Github user arina-ielchiieva closed the pull request at: https://github.com/apache/drill/pull/882
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user arina-ielchiieva opened a pull request:

          https://github.com/apache/drill/pull/900

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.

          3. MetadataDirectGroupScan class was introduced to indicate which files statistics for used.

          Details in Jira DRILL-4735(https://issues.apache.org/jira/browse/DRILL-4735).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/drill/pull/900.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #900


          commit 031a5b04ae430dbae015d88089c6d10623b9e87a
          Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
          Date: 2017-07-20T16:26:44Z

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.
          CountsCollector class was introduced to encapsulate counts collection logic.

          3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used
          during calculation and for which files it was applied.

          commit f3fa3dc2e2a876a21f1ce51b74dfd2544201f6f6
          Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
          Date: 2017-08-08T13:18:37Z

          DRILL-4735: Changes after code review.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user arina-ielchiieva opened a pull request: https://github.com/apache/drill/pull/900 DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. 3. MetadataDirectGroupScan class was introduced to indicate which files statistics for used. Details in Jira DRILL-4735 ( https://issues.apache.org/jira/browse/DRILL-4735 ). You can merge this pull request into a Git repository by running: $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/900.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #900 commit 031a5b04ae430dbae015d88089c6d10623b9e87a Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com> Date: 2017-07-20T16:26:44Z DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied. commit f3fa3dc2e2a876a21f1ce51b74dfd2544201f6f6 Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com> Date: 2017-08-08T13:18:37Z DRILL-4735 : Changes after code review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arina-ielchiieva commented on the issue:

          https://github.com/apache/drill/pull/882

          New PR was opened - #900

          Show
          githubbot ASF GitHub Bot added a comment - Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/882 New PR was opened - #900
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user arina-ielchiieva commented on the issue:

          https://github.com/apache/drill/pull/900

          @jinfengni addressed code review comments. Please review when possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/900 @jinfengni addressed code review comments. Please review when possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jinfengni commented on the issue:

          https://github.com/apache/drill/pull/900

          +1

          LGTM. Thanks for the PR!

          Show
          githubbot ASF GitHub Bot added a comment - Github user jinfengni commented on the issue: https://github.com/apache/drill/pull/900 +1 LGTM. Thanks for the PR!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/drill/pull/900

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/drill/pull/900
          Hide
          jni Jinfeng Ni added a comment -

          Fixed in commit: 8b5642353505d1001d7ec3590a07ad1144ecf7f3

          Show
          jni Jinfeng Ni added a comment - Fixed in commit: 8b5642353505d1001d7ec3590a07ad1144ecf7f3
          Hide
          khfaraaz Khurram Faraaz added a comment -

          Chun Chang re-assigned Jinfeng as reviewer, I will own this from test and add tests and mark it as closed once tests are reviewed and merged. Thanks.

          Show
          khfaraaz Khurram Faraaz added a comment - Chun Chang re-assigned Jinfeng as reviewer, I will own this from test and add tests and mark it as closed once tests are reviewed and merged. Thanks.
          Hide
          khfaraaz Khurram Faraaz added a comment -

          Verified on Drill 1.12.0 commit aaff1b35b7339fb4e6ab480dd517994ff9f0a5c5.
          three nodes (drillbits) cluster

          0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`;
          +-----------+
          |  EXPR$0   |
          +-----------+
          | 30148545  |
          +-----------+
          1 row selected (74.548 seconds)
          0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`;
          +-----------+
          |  EXPR$0   |
          +-----------+
          | 30144920  |
          +-----------+
          1 row selected (72.543 seconds)
          0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`;
          +-----------+-----------+
          |  EXPR$0   |  EXPR$1   |
          +-----------+-----------+
          | 30148545  | 30144920  |
          +-----------+-----------+
          1 row selected (89.369 seconds)
          
          Show
          khfaraaz Khurram Faraaz added a comment - Verified on Drill 1.12.0 commit aaff1b35b7339fb4e6ab480dd517994ff9f0a5c5. three nodes (drillbits) cluster 0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`; +-----------+ | EXPR$0 | +-----------+ | 30148545 | +-----------+ 1 row selected (74.548 seconds) 0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`; +-----------+ | EXPR$0 | +-----------+ | 30144920 | +-----------+ 1 row selected (72.543 seconds) 0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`; +-----------+-----------+ | EXPR$0 | EXPR$1 | +-----------+-----------+ | 30148545 | 30144920 | +-----------+-----------+ 1 row selected (89.369 seconds)

            People

            • Assignee:
              arina Arina Ielchiieva
              Reporter:
              knguyen Krystal
              Reviewer:
              Jinfeng Ni
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development