Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1393603) +++ ivy/libraries.properties (working copy) @@ -64,3 +64,4 @@ slf4j-log4j12.version=1.6.1 velocity.version=1.5 zookeeper.version=3.4.3 +javolution.version=5.5.1 Index: ql/ivy.xml =================================================================== --- ql/ivy.xml (revision 1393603) +++ ql/ivy.xml (working copy) @@ -73,7 +73,8 @@ transitive="false"/> - + + Index: ql/src/test/results/clientpositive/groupby_cube1.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_cube1.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_cube1.q.out (working copy) @@ -0,0 +1,547 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +NULL 11 1 +NULL 12 1 +NULL 13 1 +NULL 17 1 +NULL 18 1 +NULL 28 1 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: rand() + type: double + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +NULL 11 1 +NULL 12 1 +NULL 13 1 +NULL 17 1 +NULL 18 1 +NULL 28 1 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 Index: ql/src/test/results/clientpositive/groupby_rollup1.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_rollup1.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_rollup1.q.out (working copy) @@ -0,0 +1,535 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: rand() + type: double + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 Index: ql/src/test/results/clientnegative/groupby_cube1.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_cube1.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_cube1.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10209]: Grouping sets aggregations (with rollups or cubes) are not allowed if map-side aggregation is turned off. Set hive.map.aggr=true if you want to use grouping sets Index: ql/src/test/results/clientnegative/groupby_rollup1.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_rollup1.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_rollup1.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10209]: Grouping sets aggregations (with rollups or cubes) are not allowed if map-side aggregation is turned off. Set hive.map.aggr=true if you want to use grouping sets Index: ql/src/test/results/clientnegative/groupby_cube2.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_cube2.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_cube2.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10210]: Grouping sets aggregations (with rollups or cubes) are not allowed if aggregation function parameters overlap with the aggregation functions columns Index: ql/src/test/results/clientnegative/groupby_rollup2.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_rollup2.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_rollup2.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10210]: Grouping sets aggregations (with rollups or cubes) are not allowed if aggregation function parameters overlap with the aggregation functions columns Index: ql/src/test/results/compiler/plan/groupby2.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby2.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby2.q.xml (working copy) @@ -720,6 +720,9 @@ 0.5 + + 1 + @@ -730,6 +733,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby4.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby4.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby4.q.xml (working copy) @@ -438,6 +438,9 @@ 0.5 + + 1 + @@ -445,6 +448,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby6.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby6.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby6.q.xml (working copy) @@ -438,6 +438,9 @@ 0.5 + + 1 + @@ -445,6 +448,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby1.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby1.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby1.q.xml (working copy) @@ -627,6 +627,9 @@ 0.5 + + 1 + @@ -634,6 +637,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby3.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby3.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby3.q.xml (working copy) @@ -922,6 +922,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby5.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby5.q.xml (revision 1393603) +++ ql/src/test/results/compiler/plan/groupby5.q.xml (working copy) @@ -493,6 +493,9 @@ 0.5 + + 1 + @@ -500,6 +503,9 @@ + + + 0.9 Index: ql/src/test/queries/clientpositive/groupby_rollup1.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_rollup1.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_rollup1.q (working copy) @@ -0,0 +1,32 @@ +set hive.map.aggr=true; +set hive.groupby.skewindata=false; + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key; + +set hive.groupby.skewindata=true; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +ORDER BY key, val; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +ORDER BY key; Index: ql/src/test/queries/clientpositive/groupby_cube1.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_cube1.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_cube1.q (working copy) @@ -0,0 +1,32 @@ +set hive.map.aggr=true; +set hive.groupby.skewindata=false; + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key; + +set hive.groupby.skewindata=true; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +ORDER BY key, val; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +ORDER BY key; Index: ql/src/test/queries/clientnegative/groupby_cube1.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_cube1.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_cube1.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=false; + +SELECT key, count(distinct value) FROM src GROUP BY key with cube; + Index: ql/src/test/queries/clientnegative/groupby_cube2.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_cube2.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_cube2.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=true; + +SELECT key, value, count(distinct value) FROM src GROUP BY key, value with cube; + Index: ql/src/test/queries/clientnegative/groupby_rollup1.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_rollup1.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_rollup1.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=false; + +SELECT key, value, count(1) FROM src GROUP BY key, value with rollup; + Index: ql/src/test/queries/clientnegative/groupby_rollup2.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_rollup2.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_rollup2.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=true; + +SELECT key, value, count(key) FROM src GROUP BY key, value with rollup; + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -32,6 +32,8 @@ import java.util.Map; import java.util.Set; +import javolution.util.FastBitSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -106,7 +109,6 @@ protected transient ArrayList objectInspectors; transient ArrayList fieldNames; - transient KeyWrapperFactory keyWrapperFactory; // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, // MERGEPARTIAL protected transient KeyWrapper currentKeys; @@ -144,6 +146,12 @@ private long maxMemory; private float memoryThreshold; + private boolean groupingSetsPresent; + private int groupingSetsPosition; + private List groupingSets; + private List groupingSetsBitSet; + transient private List newKeysGroupingSets; + /** * This is used to store the position and field names for variable length * fields. @@ -182,6 +190,19 @@ transient int countAfterReport; transient int heartbeatInterval; + public static FastBitSet groupingSet2BitSet(int value) { + FastBitSet bits = new FastBitSet(); + int index = 0; + while (value != 0) { + if (value % 2 != 0) { + bits.set(index); + } + ++index; + value = value >>> 1; + } + return bits; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { totalMemory = Runtime.getRuntime().totalMemory(); @@ -191,21 +212,42 @@ heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); countAfterReport = 0; + groupingSetsPresent = conf.isGroupingSetsPresent(); + groupingSets = conf.getListGroupingSets(); + groupingSetsPosition = conf.getGroupingSetPosition(); ObjectInspector rowInspector = inputObjInspectors[0]; // init keyFields - keyFields = new ExprNodeEvaluator[conf.getKeys().size()]; - keyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; - currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; - for (int i = 0; i < keyFields.length; i++) { + int numKeys = conf.getKeys().size(); + + keyFields = new ExprNodeEvaluator[numKeys]; + keyObjectInspectors = new ObjectInspector[numKeys]; + currentKeyObjectInspectors = new ObjectInspector[numKeys]; + for (int i = 0; i < numKeys; i++) { keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i)); keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); currentKeyObjectInspectors[i] = ObjectInspectorUtils - .getStandardObjectInspector(keyObjectInspectors[i], - ObjectInspectorCopyOption.WRITABLE); + .getStandardObjectInspector(keyObjectInspectors[i], + ObjectInspectorCopyOption.WRITABLE); } + // Initialize the constants for the grouping sets, so that they can be re-used for + // each row + if (groupingSetsPresent) { + newKeysGroupingSets = new ArrayList(); + groupingSetsBitSet = new ArrayList(); + + for (Integer groupingSet: groupingSets) { + // Create the mapping corresponding to the grouping set + ExprNodeEvaluator groupingSetValueEvaluator = + ExprNodeEvaluatorFactory.get(new ExprNodeConstantDesc(String.valueOf(groupingSet))); + + newKeysGroupingSets.add(groupingSetValueEvaluator.evaluate(null)); + groupingSetsBitSet.add(groupingSet2BitSet(groupingSet)); + } + } + // initialize unionExpr for reduce-side // reduce KEY has union field as the last field if there are distinct // aggregates in group-by. @@ -328,7 +370,8 @@ } aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][]; - if (conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) { + if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) && + (!groupingSetsPresent)) { aggregations = newAggregations(); hashAggr = false; } else { @@ -371,7 +414,8 @@ outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(fieldNames, objectInspectors); - keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors); + KeyWrapperFactory keyWrapperFactory = + new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors); newKeys = keyWrapperFactory.getKeyWrapper(); @@ -699,6 +743,24 @@ } } + private void processKey(Object row, + ObjectInspector rowInspector) throws HiveException { + if (hashAggr) { + newKeys.setHashKey(); + processHashAggr(row, rowInspector, newKeys); + } else { + processAggr(row, rowInspector, newKeys); + } + + firstRowInGroup = false; + + if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0 + && (reporter != null)) { + reporter.progress(); + countAfterReport = 0; + } + } + @Override public void processOp(Object row, int tag) throws HiveException { firstRow = false; @@ -728,21 +790,32 @@ try { countAfterReport++; - newKeys.getNewKey(row, rowInspector); - if (hashAggr) { - newKeys.setHashKey(); - processHashAggr(row, rowInspector, newKeys); - } else { - processAggr(row, rowInspector, newKeys); - } - firstRowInGroup = false; + if (groupingSetsPresent) { + Object[] newKeysArray = newKeys.getKeyArray(); + Object[] cloneNewKeysArray = new Object[newKeysArray.length]; + for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) { + cloneNewKeysArray[keyPos] = newKeysArray[keyPos]; + } - if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0 - && (reporter != null)) { - reporter.progress(); - countAfterReport = 0; + for (int groupingSetPos = 0; groupingSetPos < groupingSets.size(); groupingSetPos++) { + for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) { + newKeysArray[keyPos] = null; + } + + FastBitSet bitset = groupingSetsBitSet.get(groupingSetPos); + // Some keys need to be left to null corresponding to that grouping set. + for (int keyPos = bitset.nextSetBit(0); keyPos >= 0; + keyPos = bitset.nextSetBit(keyPos+1)) { + newKeysArray[keyPos] = cloneNewKeysArray[keyPos]; + } + + newKeysArray[groupingSetsPosition] = newKeysGroupingSets.get(groupingSetPos); + processKey(row, rowInspector); + } + } else { + processKey(row, rowInspector); } } catch (HiveException e) { throw e; @@ -794,7 +867,8 @@ } // Non-hash aggregation - private void processAggr(Object row, ObjectInspector rowInspector, + private void processAggr(Object row, + ObjectInspector rowInspector, KeyWrapper newKeys) throws HiveException { // Prepare aggs for updating AggregationBuffer[] aggs = null; @@ -968,18 +1042,19 @@ * The keys in the record * @throws HiveException */ - protected void forward(Object[] keys, AggregationBuffer[] aggs) - throws HiveException { - int totalFields = keys.length+ aggs.length; + protected void forward(Object[] keys, + AggregationBuffer[] aggs) throws HiveException { + + int totalFields = keys.length + aggs.length; if (forwardCache == null) { forwardCache = new Object[totalFields]; } + for (int i = 0; i < keys.length; i++) { forwardCache[i] = keys[i]; } for (int i = 0; i < aggs.length; i++) { - forwardCache[keys.length + i] = aggregationEvaluators[i] - .evaluate(aggs[i]); + forwardCache[keys.length + i] = aggregationEvaluators[i].evaluate(aggs[i]); } forward(forwardCache, outputObjInspector); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; @@ -57,6 +58,9 @@ private boolean bucketGroup; private ArrayList keys; + private List listGroupingSets; + private boolean groupingSetsPresent; + private int groupingSetPosition; private ArrayList aggregators; private ArrayList outputColumnNames; private float groupByMemoryUsage; @@ -70,9 +74,15 @@ final ArrayList outputColumnNames, final ArrayList keys, final ArrayList aggregators, - final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) { + final boolean groupKeyNotReductionKey, + final float groupByMemoryUsage, + final float memoryThreshold, + final List listGroupingSets, + final boolean groupingSetsPresent, + final int groupingSetsPosition) { this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey, - false, groupByMemoryUsage, memoryThreshold); + false, groupByMemoryUsage, memoryThreshold, listGroupingSets, + groupingSetsPresent, groupingSetsPosition); } public GroupByDesc( @@ -80,7 +90,13 @@ final ArrayList outputColumnNames, final ArrayList keys, final ArrayList aggregators, - final boolean groupKeyNotReductionKey, final boolean bucketGroup,float groupByMemoryUsage, float memoryThreshold) { + final boolean groupKeyNotReductionKey, + final boolean bucketGroup, + final float groupByMemoryUsage, + final float memoryThreshold, + final List listGroupingSets, + final boolean groupingSetsPresent, + final int groupingSetsPosition) { this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -89,6 +105,9 @@ this.bucketGroup = bucketGroup; this.groupByMemoryUsage = groupByMemoryUsage; this.memoryThreshold = memoryThreshold; + this.listGroupingSets = listGroupingSets; + this.groupingSetsPresent = groupingSetsPresent; + this.groupingSetPosition = groupingSetsPosition; } public Mode getMode() { @@ -200,4 +219,34 @@ } return true; } + + // Consider a query like: + // select a, b, count(distinct c) from T group by a,b with rollup; + // Assume that hive.map.aggr is set to true and hive.groupby.skewindata is false, + // in which case the group by would execute as a single map-reduce job. + // For the group-by, the group by keys should be: a,b,groupingSet(for rollup), c + // So, the starting position of grouping set need to be known + public List getListGroupingSets() { + return listGroupingSets; + } + + public void setListGroupingSets(final List listGroupingSets) { + this.listGroupingSets = listGroupingSets; + } + + public boolean isGroupingSetsPresent() { + return groupingSetsPresent; + } + + public void setGroupingSetsPresent(boolean groupingSetsPresent) { + this.groupingSetsPresent = groupingSetsPresent; + } + + public int getGroupingSetPosition() { + return groupingSetPosition; + } + + public void setGroupingSetPosition(int groupingSetPosition) { + this.groupingSetPosition = groupingSetPosition; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -160,6 +160,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.ql.util.ObjectPair; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -218,6 +219,8 @@ //Max characters when auto generating the column name with func name private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20; + private static final String GROUPING_SET_KEY = "GROUPING_SET_KEY"; + private static class Phase1Ctx { String dest; int nextNum; @@ -791,6 +794,8 @@ break; case HiveParser.TOK_GROUPBY: + case HiveParser.TOK_ROLLUP_GROUPBY: + case HiveParser.TOK_CUBE_GROUPBY: // Get the groupby aliases - these are aliased to the entries in the // select list queryProperties.setHasGroupBy(true); @@ -803,6 +808,13 @@ } qbp.setGroupByExprForClause(ctx_1.dest, ast); skipRecursion = true; + + // Rollup and Cubes are syntactic sugar on top of grouping sets + if (ast.getToken().getType() == HiveParser.TOK_ROLLUP_GROUPBY) { + qbp.getDestRollups().add(ctx_1.dest); + } else if (ast.getToken().getType() == HiveParser.TOK_CUBE_GROUPBY) { + qbp.getDestCubes().add(ctx_1.dest); + } break; case HiveParser.TOK_HAVING: @@ -1980,6 +1992,53 @@ } } + private List getGroupingSetsForRollup(int size) { + List groupingSetKeys = new ArrayList(); + for (int i = 0; i <= size; i++) { + int groupingSet = 0; + for (int pos = 0; pos < i; pos++) { + groupingSet += Math.pow(2, pos); + } + + groupingSetKeys.add(groupingSet); + } + + return groupingSetKeys; + } + + private List getGroupingSetsForCube(int size) { + List results = new ArrayList(); + getGroupingSetsForCube(results, 0, size-1); + return results; + } + + private void getGroupingSetsForCube(List results, int initValue, int size) { + if (size == 0) { + results.add(initValue); + results.add(initValue + 1); + return; + } + + getGroupingSetsForCube(results, initValue, size-1); + getGroupingSetsForCube(results, initValue + (int)Math.pow(2, size), size-1); + } + + // This function returns the grouping sets along with the grouping expressions + // Even if rollups and cubes are present in the query, they are converted to + // grouping sets at this point + private ObjectPair, List> getGroupByGroupingSetsForClause( + QBParseInfo parseInfo, String dest) { + List groupingSets = new ArrayList(); + List groupByExprs = getGroupByForClause(parseInfo, dest); + if (parseInfo.getDestRollups().contains(dest)) { + groupingSets = getGroupingSetsForRollup(groupByExprs.size()); + } else if (parseInfo.getDestCubes().contains(dest)) { + groupingSets = getGroupingSetsForCube(groupByExprs.size()); + } + + return new ObjectPair, List>(groupByExprs, groupingSets); + } + /** * This function is a wrapper of parseInfo.getGroupByForClause which * automatically translates SELECT DISTINCT a,b,c to SELECT a,b,c GROUP BY @@ -2652,14 +2711,75 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - reduceSinkOperatorInfo), groupByOutputRowResolver); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } + // Add the grouping set key to the group by operator. + // This is not the first group by operator, but it is a subsequent group by operator + // which is forwarding the grouping keys introduced by the grouping sets. + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for GroupBy2 to pass the additional grouping keys introduced by + // GroupBy1 for the grouping set (corresponding to the rollup). + private void addGroupingSetKey(List groupByKeys, + RowResolver groupByInputRowResolver, + RowResolver groupByOutputRowResolver, + List outputColumnNames, + Map colExprMap) throws SemanticException { + // For grouping sets, add a dummy grouping key + String groupingSetColumnName = + groupByInputRowResolver.get(null, GROUPING_SET_KEY).getInternalName(); + ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + groupingSetColumnName, null, false); + groupByKeys.add(inputExpr); + + String field = getColumnInternalName(groupByKeys.size() - 1); + outputColumnNames.add(field); + groupByOutputRowResolver.put(null, GROUPING_SET_KEY, + new ColumnInfo(field, TypeInfoFactory.stringTypeInfo, "", false)); + colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); + } + + // Process grouping set for the reduce sink operator + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for ReduceSink to add the additional grouping keys introduced by + // GroupBy1 into the reduce keys. + private void processGroupingSetReduceSinkOperator(RowResolver reduceSinkInputRowResolver, + RowResolver reduceSinkOutputRowResolver, + List reduceKeys, + List outputKeyColumnNames, + Map colExprMap) throws SemanticException { + // add a key for reduce sink + String groupingSetColumnName = + reduceSinkInputRowResolver.get(null, GROUPING_SET_KEY).getInternalName(); + ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + groupingSetColumnName, null, false); + reduceKeys.add(inputExpr); + + outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1)); + String field = Utilities.ReduceField.KEY.toString() + "." + + getColumnInternalName(reduceKeys.size() - 1); + ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get( + reduceKeys.size() - 1).getTypeInfo(), null, false); + reduceSinkOutputRowResolver.put(null, GROUPING_SET_KEY, colInfo); + colExprMap.put(colInfo.getInternalName(), inputExpr); + } + + /** * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)). * The new GroupByOperator will be a child of the reduceSinkOperatorInfo. @@ -2677,7 +2797,8 @@ private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo, GroupByDesc.Mode mode, Map genericUDAFEvaluators, - boolean distPartAgg) throws SemanticException { + boolean distPartAgg, + boolean groupingSetsPresent) throws SemanticException { ArrayList outputColumnNames = new ArrayList(); RowResolver groupByInputRowResolver = opParseCtx .get(reduceSinkOperatorInfo).getRowResolver(); @@ -2705,6 +2826,16 @@ colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + // For grouping sets, add a dummy grouping key + if (groupingSetsPresent) { + addGroupingSetKey( + groupByKeys, + groupByInputRowResolver, + groupByOutputRowResolver, + outputColumnNames, + colExprMap); + } + HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); // get the last colName for the reduce KEY @@ -2726,7 +2857,7 @@ ArrayList aggParameters = new ArrayList(); boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI); - // If the function is distinct, partial aggregartion has not been done on + // If the function is distinct, partial aggregation has not been done on // the client side. // If distPartAgg is set, the client is letting us know that partial // aggregation has not been done. @@ -2814,11 +2945,15 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + // Nothing special needs to be done for grouping sets. + // This is the final group by operator, so multiple rows corresponding to the + // grouping sets have been generated upstream. Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - distPartAgg,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver - .getColumnInfos()), reduceSinkOperatorInfo), - groupByOutputRowResolver); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + distPartAgg,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo), + groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2837,10 +2972,14 @@ * @return the new GroupByOperator */ @SuppressWarnings("nls") - private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, - Operator inputOperatorInfo, GroupByDesc.Mode mode, - Map genericUDAFEvaluators) - throws SemanticException { + private Operator genGroupByPlanMapGroupByOperator(QB qb, + String dest, + List grpByExprs, + Operator inputOperatorInfo, + GroupByDesc.Mode mode, + Map genericUDAFEvaluators, + List groupingSetKeys, + boolean groupingSetsPresent) throws SemanticException { RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); @@ -2851,44 +2990,65 @@ ArrayList outputColumnNames = new ArrayList(); ArrayList aggregations = new ArrayList(); Map colExprMap = new HashMap(); - List grpByExprs = getGroupByForClause(parseInfo, dest); for (int i = 0; i < grpByExprs.size(); ++i) { ASTNode grpbyExpr = grpByExprs.get(i); ExprNodeDesc grpByExprNode = genExprNodeDesc(grpbyExpr, - groupByInputRowResolver); + groupByInputRowResolver); groupByKeys.add(grpByExprNode); String field = getColumnInternalName(i); outputColumnNames.add(field); groupByOutputRowResolver.putExpression(grpbyExpr, - new ColumnInfo(field, grpByExprNode.getTypeInfo(), "", false)); + new ColumnInfo(field, grpByExprNode.getTypeInfo(), "", false)); colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + // The grouping set key is present after the grouping keys, before the distinct keys + int groupingSetsPosition = groupByKeys.size(); + + // For grouping sets, add a dummy grouping key + // This dummy key needs to be added as a reduce key + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for GroupBy1 to create an additional grouping key + // for the grouping set (corresponding to the rollup). + if (groupingSetsPresent) { + // The value for the constant does not matter. It is replaced by the grouping set + // value for the actual implementation + ExprNodeConstantDesc constant = new ExprNodeConstantDesc("0"); + groupByKeys.add(constant); + String field = getColumnInternalName(groupByKeys.size() - 1); + outputColumnNames.add(field); + groupByOutputRowResolver.put(null, GROUPING_SET_KEY, + new ColumnInfo(field, TypeInfoFactory.stringTypeInfo, "", false)); + colExprMap.put(field, constant); + } + // If there is a distinctFuncExp, add all parameters to the reduceKeys. if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) { List list = parseInfo.getDistinctFuncExprsForClause(dest); - int numDistn = 0; for(ASTNode value: list) { // 0 is function name for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (groupByOutputRowResolver.getExpression(parameter) == null) { ExprNodeDesc distExprNode = genExprNodeDesc(parameter, - groupByInputRowResolver); + groupByInputRowResolver); groupByKeys.add(distExprNode); - numDistn++; - String field = getColumnInternalName(grpByExprs.size() + numDistn - - 1); + String field = getColumnInternalName(groupByKeys.size()-1); outputColumnNames.add(field); groupByOutputRowResolver.putExpression(parameter, new ColumnInfo( - field, distExprNode.getTypeInfo(), "", false)); + field, distExprNode.getTypeInfo(), "", false)); colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } } } } + // For each aggregation HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); @@ -2903,7 +3063,7 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode paraExpr = (ASTNode) value.getChild(i); ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, - groupByInputRowResolver); + groupByInputRowResolver); aggParameters.add(paraExprNode); } @@ -2934,9 +3094,11 @@ float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - inputOperatorInfo), groupByOutputRowResolver); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, + groupingSetKeys, groupingSetsPresent, groupingSetsPosition), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2956,9 +3118,15 @@ * @throws SemanticException */ @SuppressWarnings("nls") - private Operator genGroupByPlanReduceSinkOperator(QB qb, String dest, - Operator inputOperatorInfo, int numPartitionFields, int numReducers, - boolean mapAggrDone) throws SemanticException { + private Operator genGroupByPlanReduceSinkOperator(QB qb, + String dest, + Operator inputOperatorInfo, + List grpByExprs, + int numPartitionFields, + boolean changeNumPartitionFields, + int numReducers, + boolean mapAggrDone, + boolean groupingSetsPresent) throws SemanticException { RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); @@ -2970,12 +3138,26 @@ List outputKeyColumnNames = new ArrayList(); List outputValueColumnNames = new ArrayList(); - List grpByExprs = getGroupByForClause(parseInfo, dest); ArrayList reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest, - reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, + reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, + colExprMap); + + // add a key for reduce sink + if (groupingSetsPresent) { + // Process grouping set for the reduce sink operator + processGroupingSetReduceSinkOperator( + reduceSinkInputRowResolver, + reduceSinkOutputRowResolver, + reduceKeys, + outputKeyColumnNames, colExprMap); + if (changeNumPartitionFields) { + numPartitionFields++; + } + } + List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); @@ -3006,11 +3188,14 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - grpByExprs.size(), reduceValues, distinctColIndices, - outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, - numReducers), new RowSchema(reduceSinkOutputRowResolver - .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); + OperatorFactory.getAndMakeChild( + PlanUtils.getReduceSinkDesc(reduceKeys, + groupingSetsPresent ? grpByExprs.size() + 1 : grpByExprs.size(), + reduceValues, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, + numReducers), + new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), inputOperatorInfo), + reduceSinkOutputRowResolver); rsOp.setColumnExprMap(colExprMap); return rsOp; } @@ -3025,7 +3210,7 @@ for (int i = 0; i < grpByExprs.size(); ++i) { ASTNode grpbyExpr = grpByExprs.get(i); ExprNodeDesc inputExpr = genExprNodeDesc(grpbyExpr, - reduceSinkInputRowResolver); + reduceSinkInputRowResolver); reduceKeys.add(inputExpr); if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) { outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1)); @@ -3045,7 +3230,7 @@ } private List> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest, - ArrayList reduceKeys, RowResolver reduceSinkInputRowResolver, + List reduceKeys, RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames) throws SemanticException { @@ -3235,8 +3420,11 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanReduceSinkOperator2MR(QBParseInfo parseInfo, - String dest, Operator groupByOperatorInfo, int numPartitionFields, - int numReducers) throws SemanticException { + String dest, + Operator groupByOperatorInfo, + int numPartitionFields, + int numReducers, + boolean groupingSetsPresent) throws SemanticException { RowResolver reduceSinkInputRowResolver2 = opParseCtx.get( groupByOperatorInfo).getRowResolver(); RowResolver reduceSinkOutputRowResolver2 = new RowResolver(); @@ -3260,6 +3448,19 @@ reduceSinkOutputRowResolver2.putExpression(grpbyExpr, colInfo); colExprMap.put(colInfo.getInternalName(), inputExpr); } + + // add a key for reduce sink + if (groupingSetsPresent) { + // Note that partitioning fields dont need to change, since it is either + // partitioned randomly, or by all grouping keys + distinct keys + processGroupingSetReduceSinkOperator( + reduceSinkInputRowResolver2, + reduceSinkOutputRowResolver2, + reduceKeys, + outputColumnNames, + colExprMap); + } + // Get partial aggregation results and store in reduceValues ArrayList reduceValues = new ArrayList(); int inputField = reduceKeys.size(); @@ -3305,9 +3506,12 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo, - String dest, Operator reduceSinkOperatorInfo2, GroupByDesc.Mode mode, - Map genericUDAFEvaluators) - throws SemanticException { + String dest, + Operator reduceSinkOperatorInfo2, + GroupByDesc.Mode mode, + Map genericUDAFEvaluators, + boolean groupingSetsPresent) throws SemanticException { + RowResolver groupByInputRowResolver2 = opParseCtx.get( reduceSinkOperatorInfo2).getRowResolver(); RowResolver groupByOutputRowResolver2 = new RowResolver(); @@ -3333,6 +3537,17 @@ new ColumnInfo(field, exprInfo.getType(), "", false)); colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + + // For grouping sets, add a dummy grouping key + if (groupingSetsPresent) { + addGroupingSetKey( + groupByKeys, + groupByInputRowResolver2, + groupByOutputRowResolver2, + outputColumnNames, + colExprMap); + } + HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); for (Map.Entry entry : aggregationTrees.entrySet()) { @@ -3374,10 +3589,12 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), - reduceSinkOperatorInfo2), groupByOutputRowResolver2); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver2.getColumnInfos()), + reduceSinkOperatorInfo2), groupByOutputRowResolver2); op.setColumnExprMap(colExprMap); return op; } @@ -3414,14 +3631,32 @@ QBParseInfo parseInfo = qb.getParseInfo(); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + if (grpByExprs.isEmpty()) { numReducers = 1; } + // Grouping sets are not allowed + if (!groupingSets.isEmpty()) { + throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg()); + } + // ////// 1. Generate ReduceSinkOperator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, input, grpByExprs.size(), numReducers, false); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + input, + grpByExprs, + grpByExprs.size(), + false, + numReducers, + false, + false); // ////// 2. Generate GroupbyOperator Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo, @@ -3572,19 +3807,19 @@ // ////// 2. Generate GroupbyOperator Operator groupByOperatorInfo = genGroupByPlanGroupByOperator1(parseInfo, - dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true); + dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true, false); int numReducers = -1; List grpByExprs = getGroupByForClause(parseInfo, dest); // ////// 3. Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false); // ////// 4. Generate GroupbyOperator2 Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + genericUDAFEvaluators, false); return groupByOperatorInfo2; } @@ -3636,6 +3871,19 @@ QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + + // Grouping sets are not allowed + // This restriction can be lifted in future. + // HIVE-3508 has been filed for this + if (!groupingSets.isEmpty()) { + throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg()); + } + // ////// 1. Generate ReduceSinkOperator // There is a special case when we want the rows to be randomly distributed // to @@ -3643,31 +3891,37 @@ // DISTINCT // operator. We set the numPartitionColumns to -1 for this purpose. This is // captured by WritableComparableHiveObject.hashCode() function. - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, input, (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? - -1 : Integer.MAX_VALUE), -1, false); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + input, + grpByExprs, + (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? -1 : Integer.MAX_VALUE), + false, + -1, + false, + false); // ////// 2. Generate GroupbyOperator Map genericUDAFEvaluators = new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanGroupByOperator( - parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1, - genericUDAFEvaluators); + parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1, + genericUDAFEvaluators); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// 3. Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false); // ////// 4. Generate GroupbyOperator2 Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo, - dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, + genericUDAFEvaluators, false); return groupByOperatorInfo2; } @@ -3685,6 +3939,83 @@ return true; } + static private void extractColumns(Set colNamesExprs, + ExprNodeDesc exprNode) throws SemanticException { + if (exprNode instanceof ExprNodeColumnDesc) { + colNamesExprs.add(((ExprNodeColumnDesc)exprNode).getColumn()); + return; + } + + if (exprNode instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc)exprNode; + for (ExprNodeDesc childExpr: funcDesc.getChildExprs()) { + extractColumns(colNamesExprs, childExpr); + } + } + } + + static private boolean hasCommonElement(Set set1, Set set2) { + for (String elem1 : set1) { + if (set2.contains(elem1)) { + return true; + } + } + + return false; + } + + private void checkExpressionsForGroupingSet(List grpByExprs, + List distinctGrpByExprs, + Map aggregationTrees, + RowResolver inputRowResolver) throws SemanticException { + + Set colNamesGroupByExprs = new HashSet(); + Set colNamesGroupByDistinctExprs = new HashSet(); + Set colNamesAggregateParameters = new HashSet(); + + // The columns in the group by expressions should not intersect with the columns in the + // distinct expressions + for (ASTNode grpByExpr : grpByExprs) { + extractColumns(colNamesGroupByExprs, genExprNodeDesc(grpByExpr, inputRowResolver)); + } + + // If there is a distinctFuncExp, add all parameters to the reduceKeys. + if (!distinctGrpByExprs.isEmpty()) { + for(ASTNode value: distinctGrpByExprs) { + // 0 is function name + for (int i = 1; i < value.getChildCount(); i++) { + ASTNode parameter = (ASTNode) value.getChild(i); + ExprNodeDesc distExprNode = genExprNodeDesc(parameter, inputRowResolver); + // extract all the columns + extractColumns(colNamesGroupByDistinctExprs, distExprNode); + } + + if (hasCommonElement(colNamesGroupByExprs, colNamesGroupByDistinctExprs)) { + throw + new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg()); + } + } + } + + for (Map.Entry entry : aggregationTrees.entrySet()) { + ASTNode value = entry.getValue(); + ArrayList aggParameters = new ArrayList(); + // 0 is the function name + for (int i = 1; i < value.getChildCount(); i++) { + ASTNode paraExpr = (ASTNode) value.getChild(i); + ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, inputRowResolver); + + // extract all the columns + extractColumns(colNamesAggregateParameters, paraExprNode); + } + + if (hasCommonElement(colNamesGroupByExprs, colNamesAggregateParameters)) { + throw + new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg()); + } + } + } + /** * Generate a Group-By plan using 1 map-reduce job. First perform a map-side * partial aggregation (to reduce the amount of data), at this point of time, @@ -3708,28 +4039,55 @@ Operator inputOperatorInfo) throws SemanticException { QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + boolean groupingSetsPresent = !groupingSets.isEmpty(); + + if (groupingSetsPresent) { + checkExpressionsForGroupingSet(grpByExprs, + parseInfo.getDistinctFuncExprsForClause(dest), + parseInfo.getAggregationExprsForClause(dest), + opParseCtx.get(inputOperatorInfo).getRowResolver()); + } + // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = new LinkedHashMap(); - GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( - qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, - genericUDAFEvaluators); + GroupByOperator groupByOperatorInfo = + (GroupByOperator) genGroupByPlanMapGroupByOperator( + qb, + dest, + grpByExprs, + inputOperatorInfo, + GroupByDesc.Mode.HASH, + genericUDAFEvaluators, + groupingSets, + groupingSetsPresent); groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get( - inputOperatorInfo).getRowResolver().getTableNames()); + inputOperatorInfo).getRowResolver().getTableNames()); int numReducers = -1; // Optimize the scenario when there are no grouping keys - only 1 reducer is // needed - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, grpByExprs.size(), numReducers, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + grpByExprs.size(), + true, + numReducers, + true, + groupingSetsPresent); // This is a 1-stage map-reduce processing of the groupby. Tha map-side // aggregates was just used to @@ -3739,8 +4097,8 @@ // used, and merge is invoked // on the reducer. return genGroupByPlanGroupByOperator1(parseInfo, dest, - reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL, - genericUDAFEvaluators, false); + reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL, + genericUDAFEvaluators, false, groupingSetsPresent); } /** @@ -3780,12 +4138,27 @@ QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + boolean groupingSetsPresent = !groupingSets.isEmpty(); + + if (groupingSetsPresent) { + checkExpressionsForGroupingSet(grpByExprs, + parseInfo.getDistinctFuncExprsForClause(dest), + parseInfo.getAggregationExprsForClause(dest), + opParseCtx.get(inputOperatorInfo).getRowResolver()); + } + // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = new LinkedHashMap(); - GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( - qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, - genericUDAFEvaluators); + GroupByOperator groupByOperatorInfo = + (GroupByOperator) genGroupByPlanMapGroupByOperator( + qb, dest, grpByExprs, inputOperatorInfo, GroupByDesc.Mode.HASH, + genericUDAFEvaluators, groupingSets, groupingSetsPresent); groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get( inputOperatorInfo).getRowResolver().getTableNames()); @@ -3793,40 +4166,57 @@ // map-reduce jobs are not needed // For eg: select count(1) from T where t.ds = .... if (!optimizeMapAggrGroupBy(dest, qb)) { + List distinctFuncExprs = parseInfo.getDistinctFuncExprsForClause(dest); // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, (parseInfo - .getDistinctFuncExprsForClause(dest).isEmpty() ? -1 - : Integer.MAX_VALUE), -1, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + distinctFuncExprs.isEmpty() ? -1 : Integer.MAX_VALUE, + false, + -1, + true, + groupingSetsPresent); // ////// Generate GroupbyOperator for a partial aggregation Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, - dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS, - genericUDAFEvaluators, false); + dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS, + genericUDAFEvaluators, false, groupingSetsPresent); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers, + groupingSetsPresent); // ////// Generate GroupbyOperator3 return genGroupByPlanGroupByOperator2MR(parseInfo, dest, - reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, + genericUDAFEvaluators, groupingSetsPresent); } else { + // If there are no grouping keys, grouping sets cannot be present + assert !groupingSetsPresent; + // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, getGroupByForClause(parseInfo, dest) - .size(), 1, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + grpByExprs.size(), + false, + 1, + true, + groupingSetsPresent); return genGroupByPlanGroupByOperator2MR(parseInfo, dest, - reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators); + reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators, false); } } @@ -5257,9 +5647,10 @@ float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - inputOperatorInfo), groupByOutputRowResolver); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; @@ -5894,6 +6285,11 @@ List oldASTList = null; for (String dest : ks) { + // If a grouping set aggregation is present, common processing is not possible + if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) { + return null; + } + // If a filter is present, common processing is not possible if (qbp.getWhrForClause(dest) != null) { return null; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -35,7 +35,6 @@ * Implementation of the parse information related to a query block. * **/ - public class QBParseInfo { private final boolean isSubQ; @@ -49,6 +48,8 @@ private final Map destToSelExpr; private final HashMap destToWhereExpr; private final HashMap destToGroupby; + private final Set destRollups; + private final Set destCubes; private final Map destToHaving; private final HashSet insertIntoTables; @@ -105,6 +106,8 @@ destToOrderby = new HashMap(); destToLimit = new HashMap(); insertIntoTables = new HashSet(); + destRollups = new HashSet(); + destCubes = new HashSet(); destToAggregationExprs = new LinkedHashMap>(); destToDistinctFuncExprs = new HashMap>(); @@ -241,6 +244,14 @@ return destToGroupby.get(clause); } + public Set getDestRollups() { + return destRollups; + } + + public Set getDestCubes() { + return destCubes; + } + public HashMap getDestToGroupBy() { return destToGroupby; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -74,6 +74,8 @@ TOK_EXPLIST; TOK_ALIASLIST; TOK_GROUPBY; +TOK_ROLLUP_GROUPBY; +TOK_CUBE_GROUPBY; TOK_HAVING; TOK_ORDERBY; TOK_CLUSTERBY; @@ -1792,6 +1794,9 @@ KW_GROUP KW_BY groupByExpression ( COMMA groupByExpression )* + ((rollup=KW_WITH KW_ROLLUP) | (cube=KW_WITH KW_CUBE)) ? + -> {rollup != null}? ^(TOK_ROLLUP_GROUPBY groupByExpression+) + -> {cube != null}? ^(TOK_CUBE_GROUPBY groupByExpression+) -> ^(TOK_GROUPBY groupByExpression+) ; @@ -2431,6 +2436,8 @@ KW_RESTRICT: 'RESTRICT'; KW_CASCADE: 'CASCADE'; KW_SKEWED: 'SKEWED'; +KW_ROLLUP: 'ROLLUP'; +KW_CUBE: 'CUBE'; // Operators Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1393603) +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy) @@ -270,6 +270,13 @@ HIVE_INTERNAL_DDL_LIST_BUCKETING_DISABLED(10208, "List Bucketing DDL is not allowed to use since feature is not completed yet."), + HIVE_GROUPING_SETS_AGGR_NOMAPAGGR(10209, + "Grouping sets aggregations (with rollups or cubes) are not allowed if map-side " + + " aggregation is turned off. Set hive.map.aggr=true if you want to use grouping sets"), + HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID(10210, + "Grouping sets aggregations (with rollups or cubes) are not allowed if aggregation function " + + "parameters overlap with the aggregation functions columns"), + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."), Index: ql/build.xml =================================================================== --- ql/build.xml (revision 1393603) +++ ql/build.xml (working copy) @@ -207,6 +207,13 @@ + + + + + + + @@ -219,6 +226,7 @@ +