Details

      Description

      Selecting a count of dir0, dir1, etc against a parquet directory returns 0 rows.

      select count(dir0) from `min_max_dir`;
      ---------

      EXPR$0

      ---------

      0

      ---------

      select count(dir1) from `min_max_dir`;
      ---------

      EXPR$0

      ---------

      0

      ---------

      If I put both dir0 and dir1 in the same select, it returns expected result:
      select count(dir0), count(dir1) from `min_max_dir`;
      ----------------+

      EXPR$0 EXPR$1

      ----------------+

      600 600

      ----------------+

      Here is the physical plan for count(dir0) query:

      00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {22.0 rows, 22.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1346
      00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1345
      00-02        Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1344
      00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@3da85d3b[columns = null, isStarQuery = false, isSkipQuery = false]]) : rowType = RecordType(BIGINT count): rowcount = 20.0, cumulative cost = {20.0 rows, 20.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1343
      

      Here is part of the explain plan for the count(dir0) and count(dir1) in the same select:

      00-00    Screen : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1206.0 rows, 15606.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1623
      00-01      Project(EXPR$0=[$0], EXPR$1=[$1]) : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1622
      00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[COUNT($1)]) : rowType = RecordType(BIGINT EXPR$0, BIGINT EXPR$1): rowcount = 60.0, cumulative cost = {1200.0 rows, 15600.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1621
      00-03          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1999/Apr/voter20.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1999/MAR/voter15.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1985/jan/voter5.parquet/0_0_0.parquet], ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/1985/apr/voter60.parquet/0_0_0.parquet],..., ReadEntryWithPath [path=maprfs:/drill/testdata/min_max_dir/2014/jul/voter35.parquet/0_0_0.parquet]], selectionRoot=maprfs:/drill/testdata/min_max_dir, numFiles=16, usedMetadataFile=false, columns=[`dir0`, `dir1`]]]) : rowType = RecordType(ANY dir0, ANY dir1): rowcount = 600.0, cumulative cost = {600.0 rows, 1200.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1620
      

      Notice that in the first case, "org.apache.drill.exec.store.pojo.PojoRecordReader" is used.

        Issue Links

          Activity

          Hide
          rkins Rahul Challapalli added a comment -

          Krystal Can you confirm whether this a regression from 1.6 ?

          Show
          rkins Rahul Challapalli added a comment - Krystal Can you confirm whether this a regression from 1.6 ?
          Hide
          jni Jinfeng Ni added a comment - - edited

          I run the query on 1.4.0, and saw the same problem. I have not checked earlier version. But it's likely that this problem has been there for long time.

          This bug also happened on 1.0.0 release.

          Show
          jni Jinfeng Ni added a comment - - edited I run the query on 1.4.0, and saw the same problem. I have not checked earlier version. But it's likely that this problem has been there for long time. This bug also happened on 1.0.0 release.
          Hide
          khfaraaz Khurram Faraaz added a comment -

          Let me know and I can share data, this is from 1.9.0

          0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`;
          +---------+
          | EXPR$0  |
          +---------+
          | 0       |
          +---------+
          1 row selected (17.973 seconds)
          0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir0) from `DRILL_4589`;
          +------+------+
          | text | json |
          +------+------+
          | 00-00    Screen
          00-01      Project(EXPR$0=[$0])
          00-02        Project(EXPR$0=[$0])
          00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@2e50797e[columns = null, isStarQuery = false, isSkipQuery = false]])
          
          0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`;
          +---------+
          | EXPR$0  |
          +---------+
          | 0       |
          +---------+
          1 row selected (11.24 seconds)
          0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir1) from `DRILL_4589`;
          +------+------+
          | text | json |
          +------+------+
          | 00-00    Screen
          00-01      Project(EXPR$0=[$0])
          00-02        Project(EXPR$0=[$0])
          00-03          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]])
          

          Querying both dir0 and dir1 in the same query returns correct results.

          0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`;
          +-----------+-----------+
          |  EXPR$0   |  EXPR$1   |
          +-----------+-----------+
          | 30148545  | 30144920  |
          +-----------+-----------+
          1 row selected (69.032 seconds)
          
          Show
          khfaraaz Khurram Faraaz added a comment - Let me know and I can share data, this is from 1.9.0 0: jdbc:drill:schema=dfs.tmp> select count(dir0) from `DRILL_4589`; +---------+ | EXPR$0 | +---------+ | 0 | +---------+ 1 row selected (17.973 seconds) 0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir0) from `DRILL_4589`; +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(EXPR$0=[$0]) 00-02 Project(EXPR$0=[$0]) 00-03 Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@2e50797e[columns = null, isStarQuery = false, isSkipQuery = false]]) 0: jdbc:drill:schema=dfs.tmp> select count(dir1) from `DRILL_4589`; +---------+ | EXPR$0 | +---------+ | 0 | +---------+ 1 row selected (11.24 seconds) 0: jdbc:drill:schema=dfs.tmp> explain plan for select count(dir1) from `DRILL_4589`; +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(EXPR$0=[$0]) 00-02 Project(EXPR$0=[$0]) 00-03 Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]]) Querying both dir0 and dir1 in the same query returns correct results. 0: jdbc:drill:schema=dfs.tmp> select count(dir0), count(dir1) from `DRILL_4589`; +-----------+-----------+ | EXPR$0 | EXPR$1 | +-----------+-----------+ | 30148545 | 30144920 | +-----------+-----------+ 1 row selected (69.032 seconds)
          Hide
          arina Arina Ielchiieva added a comment -

          Looks like the problem is with ConvertCountToDirectScan rule when we check number of null values in column. oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)) will return 0 if column does not exist,
          It will also return 0 if column has only null values. In case of dir0 or any other file system partition or implicit columns they are not present in columnValueCounts map.
          It’s good idea to convert to Direct Scan when oldGrpScan.getColumnValueCount returns 0, since count will return 0 anyway and we won’t have to spend time reading all table files.
          We might return -1 for the cases when column is not found and read all table files. This will work totally fine for file system partition and implicit columns but if column doesn’t exist for real we’ll read all table files in vein.
          Unfortunately we can’t find out if column is file system partition or implicit in ConvertCountToDirectScan since we don’t have access to session OptionManager where current file system partition and implicit columns names are stored (you know, they can be changed at runtime). In ParquetGroupScan we do have access to OptionManager using formatPlugin.getContext().getOptionManager() but this is system option manager and it doesn’t hold information about session options (current file system partition and implicit columns names can be changed at session level).

          Show
          arina Arina Ielchiieva added a comment - Looks like the problem is with ConvertCountToDirectScan rule when we check number of null values in column . oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)) will return 0 if column does not exist , It will also return 0 if column has only null values. In case of dir0 or any other file system partition or implicit columns they are not present in columnValueCounts map. It’s good idea to convert to Direct Scan when oldGrpScan.getColumnValueCount returns 0, since count will return 0 anyway and we won’t have to spend time reading all table files. We might return -1 for the cases when column is not found and read all table files. This will work totally fine for file system partition and implicit columns but if column doesn’t exist for real we’ll read all table files in vein. Unfortunately we can’t find out if column is file system partition or implicit in ConvertCountToDirectScan since we don’t have access to session OptionManager where current file system partition and implicit columns names are stored (you know, they can be changed at runtime). In ParquetGroupScan we do have access to OptionManager using formatPlugin.getContext().getOptionManager() but this is system option manager and it doesn’t hold information about session options (current file system partition and implicit columns names can be changed at session level).
          Hide
          arina Arina Ielchiieva added a comment -

          From the discussion with Jinfeng Ni:

          Returning -1 for implicit columns would solve this problem, but it would regress for "select count(nonExistCol)". Basically, 4 types of column count statistics:
          1) column exists and meta data has the statistics: return correct stat
          2) column exists and no meta data : return -1
          3) column does not exist ==> count(nonExistCol) = 0, return 0
          4) implicit columns : parquet meta data does not have such column. But such columns do exists : currently return incorrect 0. should return -1.

          The ideal solution is to differentiate case 3 and case 4. If we could not find ideal solution, we then have no choice but consider something else.

          Show
          arina Arina Ielchiieva added a comment - From the discussion with Jinfeng Ni : Returning -1 for implicit columns would solve this problem, but it would regress for "select count(nonExistCol)". Basically, 4 types of column count statistics: 1) column exists and meta data has the statistics: return correct stat 2) column exists and no meta data : return -1 3) column does not exist ==> count(nonExistCol) = 0, return 0 4) implicit columns : parquet meta data does not have such column. But such columns do exists : currently return incorrect 0. should return -1. The ideal solution is to differentiate case 3 and case 4. If we could not find ideal solution, we then have no choice but consider something else.
          Hide
          arina Arina Ielchiieva added a comment - - edited

          Fix will cover the following aspects:

          1. ConvertCountToDirectScan will be able to distinguish between implicit / directory and non-existent columns, relates to current Jira DRILL-4735.
          To achieve this `Agg_on_scan` and `Agg_on_proj_on_scan` rules will take new parameter in constructor OptimizerRulesContext, similar for prune scan rules. It will help to find out if column is implicit / directory or not. OptimizerRulesContext has access to session options through PlannerSettings which are crucial for defining current implicit / directory column names. After we receive list of columns to which rule will be applied, we'll checks if this list contains implicit or directory columns. If contains, we won't apply the rule.

          2. ConvertCountToDirectScan rule to be applicable for 2 or more COUNT aggregates, relates to DRILL-1691.
          When column statistics is available we use PojoRecordReader to return its value. PojoRecordReader requires exact model. In our case we'll need reader that will allow dynamic model usage (the one where we don't know how many columns it will have). For this purpose DynamicPojoRecordReader will be used. Instead of exact model it will accept dynamic model represented by List<LinkedHashMap<String, Object>> records, list of maps where key -> column name, value -> column value. Common logic between PojoRecordReader and DynamicPojoRecordReader will extracted in abstract parent class.

          3. Currently when ConvertCountToDirectScan is applied in plan we see the following:

          Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]])
          

          User has no idea that column statistics was used to calculate the result, if partition pruning took place etc, relates to DRILL-5357 and DRILL-3407.
          Currently we use DirectGroupScan to hold our record reader. To include more information we'll extend DirectGroupScan to MetadataDirectGroupScan which will contain information about read files if any. Also PojoRecordReader and DirectPojoRecordReader toString methods will be overridden to show meaningful information to the user.
          Example:

          Scan(groupscan=[usedMetadata = true, files = [/tpch/nation.parquet], DynamicPojoRecordReader{records=[{count0=25, count1=25, count2=25}]}])
          
          Show
          arina Arina Ielchiieva added a comment - - edited Fix will cover the following aspects: 1. ConvertCountToDirectScan will be able to distinguish between implicit / directory and non-existent columns, relates to current Jira DRILL-4735 . To achieve this `Agg_on_scan` and `Agg_on_proj_on_scan` rules will take new parameter in constructor OptimizerRulesContext , similar for prune scan rules . It will help to find out if column is implicit / directory or not. OptimizerRulesContext has access to session options through PlannerSettings which are crucial for defining current implicit / directory column names. After we receive list of columns to which rule will be applied, we'll checks if this list contains implicit or directory columns. If contains, we won't apply the rule. 2. ConvertCountToDirectScan rule to be applicable for 2 or more COUNT aggregates, relates to DRILL-1691 . When column statistics is available we use PojoRecordReader to return its value. PojoRecordReader requires exact model. In our case we'll need reader that will allow dynamic model usage (the one where we don't know how many columns it will have). For this purpose DynamicPojoRecordReader will be used. Instead of exact model it will accept dynamic model represented by List<LinkedHashMap<String, Object>> records , list of maps where key -> column name, value -> column value. Common logic between PojoRecordReader and DynamicPojoRecordReader will extracted in abstract parent class. 3. Currently when ConvertCountToDirectScan is applied in plan we see the following: Scan(groupscan=[org.apache.drill.exec.store.pojo.PojoRecordReader@60017daf[columns = null, isStarQuery = false, isSkipQuery = false]]) User has no idea that column statistics was used to calculate the result, if partition pruning took place etc, relates to DRILL-5357 and DRILL-3407 . Currently we use DirectGroupScan to hold our record reader. To include more information we'll extend DirectGroupScan to MetadataDirectGroupScan which will contain information about read files if any. Also PojoRecordReader and DirectPojoRecordReader toString methods will be overridden to show meaningful information to the user. Example: Scan(groupscan=[usedMetadata = true, files = [/tpch/nation.parquet], DynamicPojoRecordReader{records=[{count0=25, count1=25, count2=25}]}])
          Hide
          jni Jinfeng Ni added a comment -

          Arina Ielchiieva, the proposal looks good to me. It's great that you went beyond the scope of this JIRA, and is going to fixes couple of related issues. Thanks.

          Show
          jni Jinfeng Ni added a comment - Arina Ielchiieva , the proposal looks good to me. It's great that you went beyond the scope of this JIRA, and is going to fixes couple of related issues. Thanks.
          Hide
          arina Arina Ielchiieva added a comment - - edited

          During implementation some details have changed:
          1. It turned out that we can get access to session options directly in ConvertCountToDirectScan using PrelUtil.getPlannerSettings(call.getPlanner()) so now there is no need to pass OptimizerRulesContext to ConvertCountToDirectScan. We will skip applying this rule if directory column is present in selection, on the contrary for implicit columns, we'll set count result to total records count since, they are based on the files and there is no data without a file. Also there has been done some refactoring in ConvertCountToDirectScan, counts collection logic was encapsulated in CountsCollector class which is a helper class.

          2. We still introduced DynamicPojoRecordReader class but it would accept two parameters. First schema represented by LinkedHashMap<String, Class<?>> and second records itself represented by List<List<T>>. We force user to pass schema to cover the case when there is no records to be read but we still need schema to proceed. If records of the same type, user may set T to that very type, if records contains different types, T should be set to Object.

          3. MetadataDirectGroupScan string representation now includes also number of files:

          [usedMetadata = true, files = [/tpch/nation.parquet], numFiles = 1]
          
          Show
          arina Arina Ielchiieva added a comment - - edited During implementation some details have changed: 1. It turned out that we can get access to session options directly in ConvertCountToDirectScan using PrelUtil.getPlannerSettings(call.getPlanner()) so now there is no need to pass OptimizerRulesContext to ConvertCountToDirectScan . We will skip applying this rule if directory column is present in selection, on the contrary for implicit columns, we'll set count result to total records count since, they are based on the files and there is no data without a file. Also there has been done some refactoring in ConvertCountToDirectScan , counts collection logic was encapsulated in CountsCollector class which is a helper class. 2. We still introduced DynamicPojoRecordReader class but it would accept two parameters. First schema represented by LinkedHashMap<String, Class<?>> and second records itself represented by List<List<T>> . We force user to pass schema to cover the case when there is no records to be read but we still need schema to proceed. If records of the same type, user may set T to that very type, if records contains different types, T should be set to Object . 3. MetadataDirectGroupScan string representation now includes also number of files: [usedMetadata = true, files = [/tpch/nation.parquet], numFiles = 1]
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user arina-ielchiieva opened a pull request:

          https://github.com/apache/drill/pull/882

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.
          CountsCollector class was introduced to encapsulate counts collection logic.

          3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used
          during calculation and for which files it was applied.

          Details in Jira DRILL-4735(https://issues.apache.org/jira/browse/DRILL-4735).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/drill/pull/882.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #882


          commit 7d75460132f41f2f075d9d279bf71e9e43b060b8
          Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
          Date: 2017-07-20T16:26:44Z

          DRILL-4735: ConvertCountToDirectScan rule enhancements

          1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates.
          To achieve this DynamicPojoRecordReader was added which accepts any number of columns,
          on the contrary with PojoRecordReader which depends on class fields.
          AbstractPojoRecordReader class was added to factor out common logic for these two readers.

          2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns.
          For missing columns count will be set 0, for implicit to the total records count
          since implicit columns are based on files and there is no data without a file.
          If directory column will be encountered, rule won't be applied.
          CountsCollector class was introduced to encapsulate counts collection logic.

          3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used
          during calculation and for which files it was applied.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user arina-ielchiieva opened a pull request: https://github.com/apache/drill/pull/882 DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied. Details in Jira DRILL-4735 ( https://issues.apache.org/jira/browse/DRILL-4735 ). You can merge this pull request into a Git repository by running: $ git pull https://github.com/arina-ielchiieva/drill DRILL-4735 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #882 commit 7d75460132f41f2f075d9d279bf71e9e43b060b8 Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com> Date: 2017-07-20T16:26:44Z DRILL-4735 : ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied.

            People

            • Assignee:
              arina Arina Ielchiieva
              Reporter:
              knguyen Krystal
              Reviewer:
              Jinfeng Ni
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Development