Hive
  1. Hive
  2. HIVE-7012

Wrong RS de-duplication in the ReduceSinkDeDuplication Optimizer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.13.0
    • Fix Version/s: 0.14.0
    • Component/s: Query Processor
    • Labels:
      None

      Description

      With HIVE 0.13.0, run the following test case:

      create table src(key bigint, value string);
      
      select  
         count(distinct key) as col0
      from src
      order by col0;
      

      The following exception will be thrown:

      java.lang.RuntimeException: Error in configuring object
      	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
      	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
      	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
      	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:485)
      	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
      	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:396)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
      	at org.apache.hadoop.mapred.Child.main(Child.java:249)
      Caused by: java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
      	... 9 more
      Caused by: java.lang.RuntimeException: Reduce operator initialization failed
      	at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:173)
      	... 14 more
      Caused by: java.lang.RuntimeException: cannot find field _col0 from [0:reducesinkkey0]
      	at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef(ObjectInspectorUtils.java:415)
      	at org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector.getStructFieldRef(StandardStructObjectInspector.java:150)
      	at org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator.initialize(ExprNodeColumnEvaluator.java:79)
      	at org.apache.hadoop.hive.ql.exec.GroupByOperator.initializeOp(GroupByOperator.java:288)
      	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
      	at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
      	... 14 more
      

      This issue is related to HIVE-6455. When hive.optimize.reducededuplication is set to false, then this issue will be gone.

      Logical plan when hive.optimize.reducededuplication=false;

      src 
        TableScan (TS_0)
          alias: src
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Select Operator (SEL_1)
            expressions: key (type: bigint)
            outputColumnNames: key
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Group By Operator (GBY_2)
              aggregations: count(DISTINCT key)
              keys: key (type: bigint)
              mode: hash
              outputColumnNames: _col0, _col1
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              Reduce Output Operator (RS_3)
                istinctColumnIndices:
                key expressions: _col0 (type: bigint)
                DistributionKeys: 0
                sort order: +
                OutputKeyColumnNames: _col0
                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                Group By Operator (GBY_4)
                  aggregations: count(DISTINCT KEY._col0:0._col0)
                  mode: mergepartial
                  outputColumnNames: _col0
                  Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                  Select Operator (SEL_5)
                    expressions: _col0 (type: bigint)
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                    Reduce Output Operator (RS_6)
                      key expressions: _col0 (type: bigint)
                      DistributionKeys: 1
                      sort order: +
                      OutputKeyColumnNames: reducesinkkey0
                      OutputVAlueColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                      value expressions: _col0 (type: bigint)
                      Extract (EX_7)
                        Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                        File Output Operator (FS_8)
                          compressed: false
                          Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                          table:
                              input format: org.apache.hadoop.mapred.TextInputFormat
                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      

      You will see that RS_3 and RS_6 are not merged.

      Logical plan when hive.optimize.reducededuplication=true;

      src 
        TableScan (TS_0)
          alias: src
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Select Operator (SEL_1)
            expressions: key (type: bigint)
            outputColumnNames: key
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Group By Operator (GBY_2)
              aggregations: count(DISTINCT key)
              keys: key (type: bigint)
              mode: hash
              outputColumnNames: _col0, _col1
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              Reduce Output Operator (RS_3)
                istinctColumnIndices:
                key expressions: _col0 (type: bigint)
                DistributionKeys: 1
                sort order: +
                OutputKeyColumnNames: reducesinkkey0
                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                Group By Operator (GBY_4)
                  aggregations: count(DISTINCT KEY._col0:0._col0)
                  mode: mergepartial
                  outputColumnNames: _col0
                  Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                  Select Operator (SEL_5)
                    expressions: _col0 (type: bigint)
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator (FS_8)
                      compressed: false
                      Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      

      You will see that RS_6 has been merged into RS_3. However, Obviously the merge is incorrect because RS_3 and RS_6 have different sort keys. (The sort key for RS_3 is
      key and the sort key for RS_6 is count(distinct key)).

      The problem is that the method sameKeys() returns the result that both RS have same keys. sameKeys() depends ExprNodeDescUtils.backtrack() to backtrack a key expr of cRS to pRS.

      I don't understand the logical behind the following logic in ExprNodeDescUtils:
      Why still backtrack when there is no mapping for the column of the current operator?

        private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator<?> current,
            Operator<?> terminal) throws SemanticException {
          ...
          if (mapping == null || !mapping.containsKey(column.getColumn())) {
            return backtrack((ExprNodeDesc)column, current, terminal);
          }
          ...
        }
      

      The process of backtracking _col0 of cRS to pRS:
      RS_6:_col0 --> SEL_5:_col0 --> GBY_4:_col0 (because the colExprMap is null for GBY_4) --> RS_3:_col0 (No mapping for output column _col0), which is a wrong backtrack.

      1. HIVE-7012.1.patch.txt
        2 kB
        Navis
      2. HIVE-7012.2.patch.txt
        19 kB
        Navis

        Issue Links

          Activity

            People

            • Assignee:
              Navis
              Reporter:
              Sun Rui
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development