Hive
  1. Hive
  2. HIVE-474

Support for distinct selection on two or more columns

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: Query Processor
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      The ability to select distinct several, individual columns as by example:

      select count(distinct user), count(distinct session) from actions;

      Currently returns the following failure:

      FAILED: Error in semantic analysis: line 2:7 DISTINCT on Different Columns not Supported user

      1. patch-474-3.txt
        254 kB
        Amareshwari Sriramadasu
      2. patch-474-2.txt
        254 kB
        Amareshwari Sriramadasu
      3. patch-474-1.txt
        296 kB
        Amareshwari Sriramadasu
      4. patch-474.txt
        294 kB
        Amareshwari Sriramadasu
      5. hive-474.0.4.2rc.patch
        258 kB
        Mafish

        Issue Links

          Activity

          Hide
          Namit Jain added a comment -

          Committed. Thanks Amareshwari

          Show
          Namit Jain added a comment - Committed. Thanks Amareshwari
          Hide
          Namit Jain added a comment -

          running tests

          Show
          Namit Jain added a comment - running tests
          Hide
          Amareshwari Sriramadasu added a comment -

          Patch is updated to trunk.

          Show
          Amareshwari Sriramadasu added a comment - Patch is updated to trunk.
          Hide
          Namit Jain added a comment -

          +1

          Otherwise, the changes look good

          Show
          Namit Jain added a comment - +1 Otherwise, the changes look good
          Hide
          Namit Jain added a comment -

          Can you refresh and regenerate the patch - I am getting some compile errors after applying to trunk ?

          [javac] /data/users/njain/hive-commit1/ql/build.xml:159: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
          [javac] Compiling 622 source files to /data/users/njain/hive-commit1/build/ql/classes
          [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:204: cannot find symbol
          [javac] symbol : class StructField
          [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator
          [javac] List<? extends StructField> sfs =
          [javac] ^
          [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:205: cannot find symbol
          [javac] symbol : class StandardStructObjectInspector
          [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator
          [javac] ((StandardStructObjectInspector) rowInspector).getAllStructFieldRefs();
          [javac] ^
          [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:207: cannot find symbol
          [javac] symbol : class StructField
          [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator
          [javac] StructField keyField = sfs.get(0);
          [javac] ^
          [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:211: cannot find symbol
          [javac] symbol : class StandardStructObjectInspector
          [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator
          [javac] if (keyObjInspector instanceof StandardStructObjectInspector) {
          [javac] ^

          Most probably, some merge issue

          Show
          Namit Jain added a comment - Can you refresh and regenerate the patch - I am getting some compile errors after applying to trunk ? [javac] /data/users/njain/hive-commit1/ql/build.xml:159: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds [javac] Compiling 622 source files to /data/users/njain/hive-commit1/build/ql/classes [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:204: cannot find symbol [javac] symbol : class StructField [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator [javac] List<? extends StructField> sfs = [javac] ^ [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:205: cannot find symbol [javac] symbol : class StandardStructObjectInspector [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator [javac] ((StandardStructObjectInspector) rowInspector).getAllStructFieldRefs(); [javac] ^ [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:207: cannot find symbol [javac] symbol : class StructField [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator [javac] StructField keyField = sfs.get(0); [javac] ^ [javac] /data/users/njain/hive-commit1/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java:211: cannot find symbol [javac] symbol : class StandardStructObjectInspector [javac] location: class org.apache.hadoop.hive.ql.exec.GroupByOperator [javac] if (keyObjInspector instanceof StandardStructObjectInspector) { [javac] ^ Most probably, some merge issue
          Hide
          Amareshwari Sriramadasu added a comment -

          Not a good idea to ignore skew for multiple distincts.

          I agree.

          Updated patch throws error when there are multiple distincts with skew in data. Also, adds negative testcases.

          Show
          Amareshwari Sriramadasu added a comment - Not a good idea to ignore skew for multiple distincts. I agree. Updated patch throws error when there are multiple distincts with skew in data. Also, adds negative testcases.
          Hide
          Namit Jain added a comment -

          Not a good idea to ignore skew for multiple distincts.
          It might be safer to throw an error right now in such a scenario - we can add a better
          technique for handling this scenario later.

          Can you add a negative testcase for the scenario above ?

          Otherwise, it looks good.

          Show
          Namit Jain added a comment - Not a good idea to ignore skew for multiple distincts. It might be safer to throw an error right now in such a scenario - we can add a better technique for handling this scenario later. Can you add a negative testcase for the scenario above ? Otherwise, it looks good.
          Hide
          Amareshwari Sriramadasu added a comment -

          Patch incorporates Namit's comments.

          Show
          Amareshwari Sriramadasu added a comment - Patch incorporates Namit's comments.
          Hide
          Namit Jain added a comment -

          1. add initEvaluators() in Operator.java instead of ReduceSinkOperator.java
          2. ReduceSinkDesc: use numKeys and getNumKeys() or change numKeys to numDistributionKeys -
          You may run into problems with serialization/deserialization
          3. Add some comments in initEvaluatorsAndReturnStruct in ReduceSinkOperator
          – explain that it is same as parent in case of no union for groupby
          4. Can you more comments in GroupByOperator and SemanticAnalyzer also ?
          It looks OK, but it will help if there are more comments.

          Show
          Namit Jain added a comment - 1. add initEvaluators() in Operator.java instead of ReduceSinkOperator.java 2. ReduceSinkDesc: use numKeys and getNumKeys() or change numKeys to numDistributionKeys - You may run into problems with serialization/deserialization 3. Add some comments in initEvaluatorsAndReturnStruct in ReduceSinkOperator – explain that it is same as parent in case of no union for groupby 4. Can you more comments in GroupByOperator and SemanticAnalyzer also ? It looks OK, but it will help if there are more comments.
          Hide
          Namit Jain added a comment -
          Show
          Namit Jain added a comment - Diff at https://review.cloudera.org/r/1052/ for review
          Hide
          Namit Jain added a comment -

          I will take a look

          Show
          Namit Jain added a comment - I will take a look
          Hide
          Amareshwari Sriramadasu added a comment -

          I have reworked on the patch from Mafish so that it works for trunk. Now, the patch takes care of multiple columns in distinct (HIVE-287) also.

          Show
          Amareshwari Sriramadasu added a comment - I have reworked on the patch from Mafish so that it works for trunk. Now, the patch takes care of multiple columns in distinct ( HIVE-287 ) also.
          Hide
          John Sichi added a comment -

          I think you'll see the problem if you make sure the key/value entries in each row of your src table are uncorrelated. If you're using data/files/kv1.txt, the value is just the key prefixed by "val_".

          Show
          John Sichi added a comment - I think you'll see the problem if you make sure the key/value entries in each row of your src table are uncorrelated. If you're using data/files/kv1.txt, the value is just the key prefixed by "val_".
          Hide
          Amareshwari Sriramadasu added a comment -

          I'm trying to understand the real problem we are solving here. As first step, I did the changes in SemanticAnalyzer to allow distinct selection for multiple columns and executed the query:

          select count(key), count(value), count(distinct key), count(distinct value), count(distinct key, value) from src;
          

          It returns corrects results with the current implementation itself.
          Can you explain the problem with current implementation?

          Show
          Amareshwari Sriramadasu added a comment - I'm trying to understand the real problem we are solving here. As first step, I did the changes in SemanticAnalyzer to allow distinct selection for multiple columns and executed the query: select count(key), count(value), count(distinct key), count(distinct value), count(distinct key, value) from src; It returns corrects results with the current implementation itself. Can you explain the problem with current implementation?
          Hide
          Namit Jain added a comment -

          Once HIVE-537 is committed, the general idea is as listed in the example in HIVE-537.

          Say, the query is:

          select a, count(distinct b), count(distinct c) from T group by a

          and the data is:

          a1 b1 c1
          a1 b1 c2
          a1 b2 c2
          a1 b2 c1
          a2 ...

          Mapper will emit a union type:

          a1 0:b1
          a1 1:c1
          a1 0:b1
          a1 1:c2
          a1 0:b2
          a1 1:c2
          a1 0:b2
          a1 1:c1

          Since the sort key is (a, union_tag, (b|c))

          The data will come to the reducer in the following order:

          a1 0:b1
          a1 0:b1
          a1 0:b2
          a1 0:b2
          a1 1:c1
          a1 1:c1
          a1 1:c2
          a1 1:c2

          and then the reducer can stream the distincts

          Show
          Namit Jain added a comment - Once HIVE-537 is committed, the general idea is as listed in the example in HIVE-537 . Say, the query is: select a, count(distinct b), count(distinct c) from T group by a and the data is: a1 b1 c1 a1 b1 c2 a1 b2 c2 a1 b2 c1 a2 ... Mapper will emit a union type: a1 0:b1 a1 1:c1 a1 0:b1 a1 1:c2 a1 0:b2 a1 1:c2 a1 0:b2 a1 1:c1 Since the sort key is (a, union_tag, (b|c)) The data will come to the reducer in the following order: a1 0:b1 a1 0:b1 a1 0:b2 a1 0:b2 a1 1:c1 a1 1:c1 a1 1:c2 a1 1:c2 and then the reducer can stream the distincts
          Hide
          Amareshwari Sriramadasu added a comment -

          Will upload a patch once HIVE-537 is committed.

          Show
          Amareshwari Sriramadasu added a comment - Will upload a patch once HIVE-537 is committed.
          Hide
          John Sichi added a comment -

          Some comments after a brief look:

          • The patch is going to need to be rebased against trunk (I guess after HIVE-537 is committed)?
          • We should make sure that in the case of a single distinct agg, we leave the plan as it is today, and only use the new plan generation when multiple distincts are present. This may already be the case; I couldn't quite tell from the example plans in the test cases (it would be nice to have some simpler queries for that).
          • Regarding moving expression evaluation to the reduce side: in general, this is something that needs cost-based optimization, due to factors like (a) data size before and after expression evaluation and (b) parallelization benefit of spreading out the computation over lots of mappers (assuming many more mappers than reducers).
          Show
          John Sichi added a comment - Some comments after a brief look: The patch is going to need to be rebased against trunk (I guess after HIVE-537 is committed)? We should make sure that in the case of a single distinct agg, we leave the plan as it is today, and only use the new plan generation when multiple distincts are present. This may already be the case; I couldn't quite tell from the example plans in the test cases (it would be nice to have some simpler queries for that). Regarding moving expression evaluation to the reduce side: in general, this is something that needs cost-based optimization, due to factors like (a) data size before and after expression evaluation and (b) parallelization benefit of spreading out the computation over lots of mappers (assuming many more mappers than reducers).
          Hide
          John Sichi added a comment -

          We'll take a look at this one.

          Show
          John Sichi added a comment - We'll take a look at this one.
          Hide
          Venkatesh Seetharam added a comment -

          Any update on this?

          Show
          Venkatesh Seetharam added a comment - Any update on this?
          Hide
          Mafish added a comment -

          for 0.4.2rc

          Show
          Mafish added a comment - for 0.4.2rc
          Hide
          Mafish added a comment -

          I have uploaded a patch originated from 0.4.2rc, please have a review.

          Show
          Mafish added a comment - I have uploaded a patch originated from 0.4.2rc, please have a review.
          Hide
          Mafish added a comment -

          We have implemented this feature using union type, as metioned as "A2" by Zheng.

          Show
          Mafish added a comment - We have implemented this feature using union type, as metioned as "A2" by Zheng.
          Hide
          Min Zhou added a comment - - edited

          I thought there is another special case here. If the query has multiple distinct operations on the same column , we can push down the evaluation of those expressions into reducers.

          Query:
            select a, count(distinct if(codition, b, null)) as col1, count(distinct if(!condition, null, b)) as col2, count(distinct b) as col3
          
          Plan:
            Job :
              Map side:
                Emit: distribution_key: a, sort_key: a, b, value: nothing
              Reduce side:
                Group By
                  a,  count col1, col2, col3 by evaluating their expressions
          
          Show
          Min Zhou added a comment - - edited I thought there is another special case here. If the query has multiple distinct operations on the same column , we can push down the evaluation of those expressions into reducers. Query: select a, count(distinct if (codition, b, null )) as col1, count(distinct if (!condition, null , b)) as col2, count(distinct b) as col3 Plan: Job : Map side: Emit: distribution_key: a, sort_key: a, b, value: nothing Reduce side: Group By a, count col1, col2, col3 by evaluating their expressions
          Hide
          Zheng Shao added a comment - - edited

          There are several approaches to solve this problem:

          A1: separate group-by and join the results.

          SELECT COALESCE(t1.key, t2.key), COALESCE(d_a, 0), , COALESCE(d_b, 0)
          FROM
          (SELECT key, count(distinct a) as d_a ...) t1
          OUTER JOIN
          (SELECT key, count(distinct b) as d_b ...) t2
          ON t1.key = t2.key
          

          A2: Take advantage of union type (HIVE-537).
          See HIVE-537 for details.

          A3: Take advantage of partitioned merge join:
          Here is a different plan. It depends on partitioned merge join.
          Also the 2 jobs have to have the same number of reducers.

          Query:
            select a, count(distinct b), count(distinct c), sum(d)
          
          Plan:
            Job1:
              Map side:
                Emit: distribution_key: a, sort_key: a, b, value: d
                Save a, c to temp_file1
              Reduce side:
                Group By:
                  a, count(distinct b), sum(d)
              Output: temp_file2
            Job 2: Input: temp_file1
              Map side:
                Emit: distribution_key: a, sort_key: a, c, value: nothing
              Reduce side:
                Group By
                  a, count(distinct c)
                Partitioned Merge Join with temp_file2 on a
                  a, count(distinct b), sum(d), count(distinct c)
          
          Show
          Zheng Shao added a comment - - edited There are several approaches to solve this problem: A1: separate group-by and join the results. SELECT COALESCE(t1.key, t2.key), COALESCE(d_a, 0), , COALESCE(d_b, 0) FROM (SELECT key, count(distinct a) as d_a ...) t1 OUTER JOIN (SELECT key, count(distinct b) as d_b ...) t2 ON t1.key = t2.key A2: Take advantage of union type ( HIVE-537 ). See HIVE-537 for details. A3: Take advantage of partitioned merge join: Here is a different plan. It depends on partitioned merge join. Also the 2 jobs have to have the same number of reducers. Query: select a, count(distinct b), count(distinct c), sum(d) Plan: Job1: Map side: Emit: distribution_key: a, sort_key: a, b, value: d Save a, c to temp_file1 Reduce side: Group By: a, count(distinct b), sum(d) Output: temp_file2 Job 2: Input: temp_file1 Map side: Emit: distribution_key: a, sort_key: a, c, value: nothing Reduce side: Group By a, count(distinct c) Partitioned Merge Join with temp_file2 on a a, count(distinct b), sum(d), count(distinct c)

            People

            • Assignee:
              Amareshwari Sriramadasu
              Reporter:
              Alexis Rondeau
            • Votes:
              3 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development