Hive
  1. Hive
  2. HIVE-5358

ReduceSinkDeDuplication should ignore column orders when check overlapping part of keys between parent and child

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Query Processor
    • Labels:
      None

      Description

      select key, value from (select key, value from src group by key, value) t group by key, value;
      

      This can be optimized by ReduceSinkDeDuplication

      select key, value from (select key, value from src group by key, value) t group by value, key;
      

      However the sql above can't be optimized by ReduceSinkDeDuplication currently due to different column orders of parent and child operator.

      1. HIVE-5358.patch
        39 kB
        Chun Chen
      2. D13113.1.patch
        39 kB
        Phabricator
      3. HIVE-5358.2.patch
        39 kB
        Chun Chen
      4. HIVE-5358.3.patch
        107 kB
        Chun Chen

        Activity

        Hide
        Hive QA added a comment -

        Overall: -1 at least one tests failed

        Here are the results of testing the latest attachment:
        https://issues.apache.org/jira/secure/attachment/12608892/HIVE-5358.3.patch

        ERROR: -1 due to 1 failed/errored test(s), 4416 tests executed
        Failed tests:

        org.apache.hive.hcatalog.mapreduce.TestMultiOutputFormat.testMultiOutputFormatWithReduce
        

        Test results: https://builds.apache.org/job/PreCommit-HIVE-Build/1157/testReport
        Console output: https://builds.apache.org/job/PreCommit-HIVE-Build/1157/console

        Messages:

        Executing org.apache.hive.ptest.execution.PrepPhase
        Executing org.apache.hive.ptest.execution.ExecutionPhase
        Executing org.apache.hive.ptest.execution.ReportingPhase
        Tests failed with: TestsFailedException: 1 tests failed
        

        This message is automatically generated.

        Show
        Hive QA added a comment - Overall : -1 at least one tests failed Here are the results of testing the latest attachment: https://issues.apache.org/jira/secure/attachment/12608892/HIVE-5358.3.patch ERROR: -1 due to 1 failed/errored test(s), 4416 tests executed Failed tests: org.apache.hive.hcatalog.mapreduce.TestMultiOutputFormat.testMultiOutputFormatWithReduce Test results: https://builds.apache.org/job/PreCommit-HIVE-Build/1157/testReport Console output: https://builds.apache.org/job/PreCommit-HIVE-Build/1157/console Messages: Executing org.apache.hive.ptest.execution.PrepPhase Executing org.apache.hive.ptest.execution.ExecutionPhase Executing org.apache.hive.ptest.execution.ReportingPhase Tests failed with: TestsFailedException: 1 tests failed This message is automatically generated.
        Hide
        Chun Chen added a comment -
        Show
        Chun Chen added a comment - review https://reviews.facebook.net/D13509
        Hide
        Chun Chen added a comment -

        Thanks, Yin Huai. I got it. Seems like the first method "adjust the first GBY to construct its key from both key and value of the reduce input" is easier and don't have to waste extra resources to sort the rows.

        Show
        Chun Chen added a comment - Thanks, Yin Huai . I got it. Seems like the first method "adjust the first GBY to construct its key from both key and value of the reduce input" is easier and don't have to waste extra resources to sort the rows.
        Hide
        Yin Huai added a comment -

        My last example was not good... Let me try another example. The query may not make much sense, but I hope it can make the problem clear.

        select c3, c2 from (select c1, c2, c3, c4 from t2 group by c1, c2, c3, c4) t group by c3, c2;
        

        For the first GBY, we want to group rows based on [c1, c2, c3, c4] and then we want to group the output of the firs GBY based on [c3, c2]. We can use [c2, c3] as the partitioning columns to make sure rows will be distributed in a correct way. Then, if we use [c3, c2] as the sorting columns (key columns in RS), c1 and c4 will be in the value columns of RS. Seems we need to also adjust the first GBY to construct its key from both key and value of the reduce input. If we use [c1, c2, c3, c4] as the sorting columns, seems we need to introduce a sort operator to generate row groups based on [c3, c2].

        I am also attaching the plan generated by your .2 patch

        STAGE DEPENDENCIES:
          Stage-1 is a root stage
          Stage-0 is a root stage
        
        STAGE PLANS:
          Stage: Stage-1
            Map Reduce
              Alias -> Map Operator Tree:
                t:t2 
                  TableScan
                    alias: t2
                    Select Operator
                      expressions:
                            expr: c1
                            type: int
                            expr: c2
                            type: int
                            expr: c3
                            type: int
                            expr: c4
                            type: int
                      outputColumnNames: c1, c2, c3, c4
                      Group By Operator
                        bucketGroup: false
                        keys:
                              expr: c1
                              type: int
                              expr: c2
                              type: int
                              expr: c3
                              type: int
                              expr: c4
                              type: int
                        mode: hash
                        outputColumnNames: _col0, _col1, _col2, _col3
                        Reduce Output Operator
                          key expressions:
                                expr: _col0
                                type: int
                                expr: _col1
                                type: int
                                expr: _col2
                                type: int
                                expr: _col3
                                type: int
                          sort order: ++++
                          Map-reduce partition columns:
                                expr: _col2
                                type: int
                                expr: _col1
                                type: int
                          tag: -1
              Reduce Operator Tree:
                Group By Operator
                  bucketGroup: false
                  keys:
                        expr: KEY._col0
                        type: int
                        expr: KEY._col1
                        type: int
                        expr: KEY._col2
                        type: int
                        expr: KEY._col3
                        type: int
                  mode: mergepartial
                  outputColumnNames: _col0, _col1, _col2, _col3
                  Select Operator
                    expressions:
                          expr: _col2
                          type: int
                          expr: _col1
                          type: int
                    outputColumnNames: _col2, _col1
                    Group By Operator
                      bucketGroup: false
                      keys:
                            expr: _col2
                            type: int
                            expr: _col1
                            type: int
                      mode: complete
                      outputColumnNames: _col0, _col1
                      Select Operator
                        expressions:
                              expr: _col0
                              type: int
                              expr: _col1
                              type: int
                        outputColumnNames: _col0, _col1
                        File Output Operator
                          compressed: false
                          GlobalTableId: 0
                          table:
                              input format: org.apache.hadoop.mapred.TextInputFormat
                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
        
          Stage: Stage-0
            Fetch Operator
              limit: -1
        
        Show
        Yin Huai added a comment - My last example was not good... Let me try another example. The query may not make much sense, but I hope it can make the problem clear. select c3, c2 from (select c1, c2, c3, c4 from t2 group by c1, c2, c3, c4) t group by c3, c2; For the first GBY, we want to group rows based on [c1, c2, c3, c4] and then we want to group the output of the firs GBY based on [c3, c2] . We can use [c2, c3] as the partitioning columns to make sure rows will be distributed in a correct way. Then, if we use [c3, c2] as the sorting columns (key columns in RS), c1 and c4 will be in the value columns of RS. Seems we need to also adjust the first GBY to construct its key from both key and value of the reduce input. If we use [c1, c2, c3, c4] as the sorting columns, seems we need to introduce a sort operator to generate row groups based on [c3, c2] . I am also attaching the plan generated by your .2 patch STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: t:t2 TableScan alias: t2 Select Operator expressions: expr: c1 type: int expr: c2 type: int expr: c3 type: int expr: c4 type: int outputColumnNames: c1, c2, c3, c4 Group By Operator bucketGroup: false keys: expr: c1 type: int expr: c2 type: int expr: c3 type: int expr: c4 type: int mode: hash outputColumnNames: _col0, _col1, _col2, _col3 Reduce Output Operator key expressions: expr: _col0 type: int expr: _col1 type: int expr: _col2 type: int expr: _col3 type: int sort order: ++++ Map-reduce partition columns: expr: _col2 type: int expr: _col1 type: int tag: -1 Reduce Operator Tree: Group By Operator bucketGroup: false keys: expr: KEY._col0 type: int expr: KEY._col1 type: int expr: KEY._col2 type: int expr: KEY._col3 type: int mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3 Select Operator expressions: expr: _col2 type: int expr: _col1 type: int outputColumnNames: _col2, _col1 Group By Operator bucketGroup: false keys: expr: _col2 type: int expr: _col1 type: int mode: complete outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 type: int expr: _col1 type: int outputColumnNames: _col0, _col1 File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1
        Hide
        Chun Chen added a comment -

        Sorry for the misunderstand the intention of checkExprs in ReduceSinkDeDuplication.
        Ashutosh Chauhan I will try to preserve the order of key Columns on RS in those test cases.

        select c3, c2 from (select c1, c2, c3 from t1 order by c1, c2, c3) t group by c3, c2;
        

        Yin Huai I don't understand what you mean about the above sql. If we use [c3, c2] as key columns, what's the problem of that?

        Show
        Chun Chen added a comment - Sorry for the misunderstand the intention of checkExprs in ReduceSinkDeDuplication. Ashutosh Chauhan I will try to preserve the order of key Columns on RS in those test cases. select c3, c2 from (select c1, c2, c3 from t1 order by c1, c2, c3) t group by c3, c2; Yin Huai I don't understand what you mean about the above sql. If we use [c3, c2] as key columns, what's the problem of that?
        Hide
        Hive QA added a comment -

        Overall: -1 at least one tests failed

        Here are the results of testing the latest attachment:
        https://issues.apache.org/jira/secure/attachment/12605197/HIVE-5358.2.patch

        ERROR: -1 due to 2 failed/errored test(s), 3162 tests executed
        Failed tests:

        org.apache.hadoop.hive.cli.TestCliDriver.testCliDriver_groupby_grouping_id2
        org.apache.hadoop.hive.cli.TestCliDriver.testCliDriver_reduce_deduplicate_extended
        

        Test results: https://builds.apache.org/job/PreCommit-HIVE-Build/917/testReport
        Console output: https://builds.apache.org/job/PreCommit-HIVE-Build/917/console

        Messages:

        Executing org.apache.hive.ptest.execution.PrepPhase
        Executing org.apache.hive.ptest.execution.ExecutionPhase
        Executing org.apache.hive.ptest.execution.ReportingPhase
        Tests failed with: TestsFailedException: 2 tests failed
        

        This message is automatically generated.

        Show
        Hive QA added a comment - Overall : -1 at least one tests failed Here are the results of testing the latest attachment: https://issues.apache.org/jira/secure/attachment/12605197/HIVE-5358.2.patch ERROR: -1 due to 2 failed/errored test(s), 3162 tests executed Failed tests: org.apache.hadoop.hive.cli.TestCliDriver.testCliDriver_groupby_grouping_id2 org.apache.hadoop.hive.cli.TestCliDriver.testCliDriver_reduce_deduplicate_extended Test results: https://builds.apache.org/job/PreCommit-HIVE-Build/917/testReport Console output: https://builds.apache.org/job/PreCommit-HIVE-Build/917/console Messages: Executing org.apache.hive.ptest.execution.PrepPhase Executing org.apache.hive.ptest.execution.ExecutionPhase Executing org.apache.hive.ptest.execution.ReportingPhase Tests failed with: TestsFailedException: 2 tests failed This message is automatically generated.
        Hide
        Yin Huai added a comment -

        Ashutosh Chauhan I think that for those two cases with hive.optimize.correlation=true, the ordering of key columns does not matter. Because in those queries, we only need to group rows, either [key, value] or [value, key] should be fine for the RS. The reason that I preserved the ordering in Correlation Optimizer is ReduceSinkDeDuplication can merge the RS for ORDER BY with another RS (for example, GROUP BY). In this case, ordering matters. When Correlation Optimizer gets the operator tree, it does not know if the key columns in a RS is only used for grouping or those columns are also used for ordering. I think it may be better to annotate what columns are used for grouping and what columns are used for sorting.

        Chun Chen For your change, what will be the plan for the following query?

        select c3, c2 from (select c1, c2, c3 from t1 order by c1, c2, c3) t group by c3, c2;
        

        If we use [c1, c2, c3] as the key columns, rows with the same [c3, c2] are not grouped at the reduce side.

        Based on my understanding, right now, the checkExprs in ReduceSinkDeDuplication only wants to handle cases that ckeys starts with pkeys, or pkeys starts with ckeys. For example, pkeys = [c1, c2, c3], and ckeys = [c1, c2].

        Show
        Yin Huai added a comment - Ashutosh Chauhan I think that for those two cases with hive.optimize.correlation=true, the ordering of key columns does not matter. Because in those queries, we only need to group rows, either [key, value] or [value, key] should be fine for the RS. The reason that I preserved the ordering in Correlation Optimizer is ReduceSinkDeDuplication can merge the RS for ORDER BY with another RS (for example, GROUP BY). In this case, ordering matters. When Correlation Optimizer gets the operator tree, it does not know if the key columns in a RS is only used for grouping or those columns are also used for ordering. I think it may be better to annotate what columns are used for grouping and what columns are used for sorting. Chun Chen For your change, what will be the plan for the following query? select c3, c2 from (select c1, c2, c3 from t1 order by c1, c2, c3) t group by c3, c2; If we use [c1, c2, c3] as the key columns, rows with the same [c3, c2] are not grouped at the reduce side. Based on my understanding, right now, the checkExprs in ReduceSinkDeDuplication only wants to handle cases that ckeys starts with pkeys, or pkeys starts with ckeys. For example, pkeys = [c1, c2, c3] , and ckeys = [c1, c2] .
        Hide
        Ashutosh Chauhan added a comment -

        For queries like:

        select key, value from (select key, value from src group by key, value) t order by value, key;
        select key, value from (select key, value from src order by key, value) t group by value, key;
        

        In these case also two RSs can be merged but order of key Columns on RS becomes important and must be preserved. Can you add a test case for this to make sure this works correctly.

        For queries like:

        set hive.optimize.correlation=true;   
        select key, value from (select s1.key, s2.value from src s1 join src s2 on s1.key = s2.key and s1.value=s2.value) t group by  key,value;
        select key, value from (select s1.key, s2.value from src s1 join src s2 on s1.key = s2.key and s1.value=s2.value) t group by  value,key;
        

        For these cases also, ordering of key columns in RS is important.

        cc: Navis Yin Huai

        Show
        Ashutosh Chauhan added a comment - For queries like: select key, value from (select key, value from src group by key, value) t order by value, key; select key, value from (select key, value from src order by key, value) t group by value, key; In these case also two RSs can be merged but order of key Columns on RS becomes important and must be preserved. Can you add a test case for this to make sure this works correctly. For queries like: set hive.optimize.correlation= true ; select key, value from (select s1.key, s2.value from src s1 join src s2 on s1.key = s2.key and s1.value=s2.value) t group by key,value; select key, value from (select s1.key, s2.value from src s1 join src s2 on s1.key = s2.key and s1.value=s2.value) t group by value,key; For these cases also, ordering of key columns in RS is important. cc: Navis Yin Huai
        Hide
        Phabricator added a comment -

        chenchun requested code review of "HIVE-5358 [jira] ReduceSinkDeDuplication should ignore column orders when check overlapping part of keys between parent and child".

        Reviewers: JIRA

        HIVE-5358

        select key, value from (select key, value from src group by key, value) t group by key, value;

        This can be optimized by ReduceSinkDeDuplication

        select key, value from (select key, value from src group by key, value) t group by value, key;

        However the sql above can't be optimized by ReduceSinkDeDuplication currently due to different column orders of parent and child operator.

        TEST PLAN
        EMPTY

        REVISION DETAIL
        https://reviews.facebook.net/D13113

        AFFECTED FILES
        ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
        ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnListDesc.java
        ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q
        ql/src/test/results/clientpositive/reduce_deduplicate_extended.q.out

        MANAGE HERALD RULES
        https://reviews.facebook.net/herald/view/differential/

        WHY DID I GET THIS EMAIL?
        https://reviews.facebook.net/herald/transcript/38295/

        To: JIRA, chenchun

        Show
        Phabricator added a comment - chenchun requested code review of " HIVE-5358 [jira] ReduceSinkDeDuplication should ignore column orders when check overlapping part of keys between parent and child". Reviewers: JIRA HIVE-5358 select key, value from (select key, value from src group by key, value) t group by key, value; This can be optimized by ReduceSinkDeDuplication select key, value from (select key, value from src group by key, value) t group by value, key; However the sql above can't be optimized by ReduceSinkDeDuplication currently due to different column orders of parent and child operator. TEST PLAN EMPTY REVISION DETAIL https://reviews.facebook.net/D13113 AFFECTED FILES ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnListDesc.java ql/src/test/queries/clientpositive/reduce_deduplicate_extended.q ql/src/test/results/clientpositive/reduce_deduplicate_extended.q.out MANAGE HERALD RULES https://reviews.facebook.net/herald/view/differential/ WHY DID I GET THIS EMAIL? https://reviews.facebook.net/herald/transcript/38295/ To: JIRA, chenchun

          People

          • Assignee:
            Chun Chen
            Reporter:
            Chun Chen
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development