Index: ql/src/test/results/clientpositive/groupby_cube1.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_cube1.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_cube1.q.out (working copy) @@ -0,0 +1,539 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +NULL 11 1 +NULL 12 1 +NULL 13 1 +NULL 17 1 +NULL 18 1 +NULL 28 1 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: rand() + type: double + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +NULL 11 1 +NULL 12 1 +NULL 13 1 +NULL 17 1 +NULL 18 1 +NULL 28 1 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_CUBE_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with cube +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 Index: ql/src/test/results/clientpositive/groupby_rollup1.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_rollup1.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_rollup1.q.out (working copy) @@ -0,0 +1,527 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 +PREHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL val)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL val)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + expr: val + type: string + expr: '0' + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: rand() + type: double + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + expr: KEY._col2 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col3 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL NULL 6 +1 NULL 1 +1 11 1 +2 NULL 1 +2 12 1 +3 NULL 1 +3 13 1 +7 NULL 1 +7 17 1 +8 NULL 2 +8 18 1 +8 28 1 +PREHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL val)))) (TOK_ROLLUP_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + expr: val + type: string + outputColumnNames: key, val + Group By Operator + aggregations: + expr: count(DISTINCT val) + bucketGroup: false + keys: + expr: key + type: string + expr: '0' + type: string + expr: val + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + sort order: +++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: string + tag: -1 + value expressions: + expr: _col3 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col2:0._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: partials + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: final + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +NULL 6 +1 1 +2 1 +3 1 +7 1 +8 2 Index: ql/src/test/results/clientnegative/groupby_cube1.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_cube1.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_cube1.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10209]: Grouping sets aggreations (with rollups or cubes) are not allowed if map-side aggregation is turned off. set hive.map.aggr=true if you want to use grouping sets Index: ql/src/test/results/clientnegative/groupby_rollup1.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_rollup1.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_rollup1.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10209]: Grouping sets aggreations (with rollups or cubes) are not allowed if map-side aggregation is turned off. set hive.map.aggr=true if you want to use grouping sets Index: ql/src/test/results/clientnegative/groupby_cube2.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_cube2.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_cube2.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10210]: Grouping sets aggreations (with rollups or cubes) are not allowed if aggregation function parameters overlap with the aggregation functions columns Index: ql/src/test/results/clientnegative/groupby_rollup2.q.out =================================================================== --- ql/src/test/results/clientnegative/groupby_rollup2.q.out (revision 0) +++ ql/src/test/results/clientnegative/groupby_rollup2.q.out (working copy) @@ -0,0 +1 @@ +FAILED: SemanticException [Error 10210]: Grouping sets aggreations (with rollups or cubes) are not allowed if aggregation function parameters overlap with the aggregation functions columns Index: ql/src/test/results/compiler/plan/groupby2.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby2.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby2.q.xml (working copy) @@ -720,6 +720,9 @@ 0.5 + + 1 + @@ -730,6 +733,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby4.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby4.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby4.q.xml (working copy) @@ -438,6 +438,9 @@ 0.5 + + 1 + @@ -445,6 +448,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby6.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby6.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby6.q.xml (working copy) @@ -438,6 +438,9 @@ 0.5 + + 1 + @@ -445,6 +448,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby1.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby1.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby1.q.xml (working copy) @@ -627,6 +627,9 @@ 0.5 + + 1 + @@ -634,6 +637,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby3.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby3.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby3.q.xml (working copy) @@ -922,6 +922,9 @@ + + + 0.9 Index: ql/src/test/results/compiler/plan/groupby5.q.xml =================================================================== --- ql/src/test/results/compiler/plan/groupby5.q.xml (revision 1391855) +++ ql/src/test/results/compiler/plan/groupby5.q.xml (working copy) @@ -493,6 +493,9 @@ 0.5 + + 1 + @@ -500,6 +503,9 @@ + + + 0.9 Index: ql/src/test/queries/clientpositive/groupby_rollup1.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_rollup1.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_rollup1.q (working copy) @@ -0,0 +1,28 @@ +set hive.map.aggr=true; +set hive.groupby.skewindata=false; + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; + +set hive.groupby.skewindata=true; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with rollup; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with rollup; Index: ql/src/test/queries/clientpositive/groupby_cube1.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_cube1.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_cube1.q (working copy) @@ -0,0 +1,28 @@ +set hive.map.aggr=true; +set hive.groupby.skewindata=false; + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; + +set hive.groupby.skewindata=true; + +EXPLAIN +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +SELECT key, val, count(1) FROM T1 GROUP BY key, val with cube; + +EXPLAIN +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; + +SELECT key, count(distinct val) FROM T1 GROUP BY key with cube; Index: ql/src/test/queries/clientnegative/groupby_cube1.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_cube1.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_cube1.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=false; + +SELECT key, count(distinct value) FROM src GROUP BY key with cube; + Index: ql/src/test/queries/clientnegative/groupby_cube2.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_cube2.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_cube2.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=true; + +SELECT key, value, count(distinct value) FROM src GROUP BY key, value with cube; + Index: ql/src/test/queries/clientnegative/groupby_rollup1.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_rollup1.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_rollup1.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=false; + +SELECT key, value, count(1) FROM src GROUP BY key, value with rollup; + Index: ql/src/test/queries/clientnegative/groupby_rollup2.q =================================================================== --- ql/src/test/queries/clientnegative/groupby_rollup2.q (revision 0) +++ ql/src/test/queries/clientnegative/groupby_rollup2.q (working copy) @@ -0,0 +1,4 @@ +set hive.map.aggr=true; + +SELECT key, value, count(key) FROM src GROUP BY key, value with rollup; + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -25,6 +25,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -40,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -144,6 +146,12 @@ private long maxMemory; private float memoryThreshold; + private boolean groupingSetsPresent; + private int groupingSetsPosition; + private List groupingSets; + transient private List groupingSetsBitSet; + transient private List groupingSetKeys; + /** * This is used to store the position and field names for variable length * fields. @@ -191,21 +199,40 @@ heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); countAfterReport = 0; + groupingSetsPresent = conf.isGroupingSetsPresent(); + groupingSets = conf.getListGroupingSets(); + groupingSetsPosition = conf.getGroupingSetPosition(); ObjectInspector rowInspector = inputObjInspectors[0]; // init keyFields - keyFields = new ExprNodeEvaluator[conf.getKeys().size()]; - keyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; - currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; - for (int i = 0; i < keyFields.length; i++) { + int numKeys = conf.getKeys().size(); + + keyFields = new ExprNodeEvaluator[numKeys]; + keyObjectInspectors = new ObjectInspector[numKeys]; + currentKeyObjectInspectors = new ObjectInspector[numKeys]; + for (int i = 0; i < numKeys; i++) { keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i)); keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); currentKeyObjectInspectors[i] = ObjectInspectorUtils - .getStandardObjectInspector(keyObjectInspectors[i], - ObjectInspectorCopyOption.WRITABLE); + .getStandardObjectInspector(keyObjectInspectors[i], + ObjectInspectorCopyOption.WRITABLE); } + // Initialize the constants for the grouping sets, so that they can be re-used for + // each row + if (groupingSetsPresent) { + groupingSetKeys = new ArrayList(); + groupingSetsBitSet = new ArrayList(); + + for (Integer groupingSet: groupingSets) { + ExprNodeEvaluator bitsetEvaluator = + ExprNodeEvaluatorFactory.get(new ExprNodeConstantDesc(String.valueOf(groupingSet))); + groupingSetKeys.add(bitsetEvaluator.evaluate(null)); + groupingSetsBitSet.add(convert(groupingSet)); + } + } + // initialize unionExpr for reduce-side // reduce KEY has union field as the last field if there are distinct // aggregates in group-by. @@ -794,8 +821,9 @@ } // Non-hash aggregation - private void processAggr(Object row, ObjectInspector rowInspector, - KeyWrapper newKeys) throws HiveException { + private void processAggr(Object row, + ObjectInspector rowInspector, + KeyWrapper newKeys) throws HiveException { // Prepare aggs for updating AggregationBuffer[] aggs = null; Object[][] lastInvoke = null; @@ -961,6 +989,19 @@ transient Object[] forwardCache; + public static BitSet convert(int value) { + BitSet bits = new BitSet(); + int index = 0; + while (value != 0) { + if (value % 2 != 0) { + bits.set(index); + } + ++index; + value = value >>> 1; + } + return bits; + } + /** * Forward a record of keys and aggregation results. * @@ -968,21 +1009,52 @@ * The keys in the record * @throws HiveException */ - protected void forward(Object[] keys, AggregationBuffer[] aggs) - throws HiveException { - int totalFields = keys.length+ aggs.length; + protected void forward(Object[] keys, + AggregationBuffer[] aggs) throws HiveException { + + int totalFields = keys.length + aggs.length; if (forwardCache == null) { forwardCache = new Object[totalFields]; } - for (int i = 0; i < keys.length; i++) { - forwardCache[i] = keys[i]; + + if (groupingSetsPresent) { + // For all the rows, set the keys after the grouping set position and aggregation + // function parameters + for (int i = groupingSetsPosition + 1; i < keys.length; i++) { + forwardCache[i] = keys[i]; + } + + for (int i = 0; i < aggs.length; i++) { + forwardCache[keys.length + i] = aggregationEvaluators[i].evaluate(aggs[i]); + } + + for (int bitsetPosition = 0; bitsetPosition < groupingSets.size(); bitsetPosition++) { + for (int i = 0; i < groupingSetsPosition; i++) { + forwardCache[i] = null; + } + + BitSet bitset = groupingSetsBitSet.get(bitsetPosition); + + // Some keys need to be left to null corresponding to that grouping set. + for (int i= bitset.nextSetBit(0); i>=0; i= bitset.nextSetBit(i+1)) { + forwardCache[i] = keys[i]; + } + + forwardCache[groupingSetsPosition] = groupingSetKeys.get(bitsetPosition); + forward(forwardCache, outputObjInspector); + } } - for (int i = 0; i < aggs.length; i++) { - forwardCache[keys.length + i] = aggregationEvaluators[i] - .evaluate(aggs[i]); + else { + for (int i = 0; i < keys.length; i++) { + forwardCache[i] = keys[i]; + } + for (int i = 0; i < aggs.length; i++) { + forwardCache[keys.length + i] = aggregationEvaluators[i] + .evaluate(aggs[i]); + } + + forward(forwardCache, outputObjInspector); } - - forward(forwardCache, outputObjInspector); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; @@ -57,6 +58,9 @@ private boolean bucketGroup; private ArrayList keys; + private List listGroupingSets; + private boolean groupingSetsPresent; + private int groupingSetPosition; private ArrayList aggregators; private ArrayList outputColumnNames; private float groupByMemoryUsage; @@ -70,9 +74,15 @@ final ArrayList outputColumnNames, final ArrayList keys, final ArrayList aggregators, - final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) { + final boolean groupKeyNotReductionKey, + final float groupByMemoryUsage, + final float memoryThreshold, + final List listGroupingSets, + final boolean groupingSetsPresent, + final int groupingSetsPosition) { this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey, - false, groupByMemoryUsage, memoryThreshold); + false, groupByMemoryUsage, memoryThreshold, listGroupingSets, + groupingSetsPresent, groupingSetsPosition); } public GroupByDesc( @@ -80,7 +90,13 @@ final ArrayList outputColumnNames, final ArrayList keys, final ArrayList aggregators, - final boolean groupKeyNotReductionKey, final boolean bucketGroup,float groupByMemoryUsage, float memoryThreshold) { + final boolean groupKeyNotReductionKey, + final boolean bucketGroup, + final float groupByMemoryUsage, + final float memoryThreshold, + final List listGroupingSets, + final boolean groupingSetsPresent, + final int groupingSetsPosition) { this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -89,6 +105,11 @@ this.bucketGroup = bucketGroup; this.groupByMemoryUsage = groupByMemoryUsage; this.memoryThreshold = memoryThreshold; + + // hard-code grouping sets to rollup for now + this.listGroupingSets = listGroupingSets; + this.groupingSetsPresent = groupingSetsPresent; + this.groupingSetPosition = groupingSetsPosition; } public Mode getMode() { @@ -200,4 +221,34 @@ } return true; } + + // Consider a query like: + // select a, b, count(distinct c) from T group by a,b with rollup; + // Assume that hive.map.aggr is set to true and hive.groupby.skewindata is false, + // in which case the group by would execute as a single map-reduce job. + // For the group-by, the group by keys should be: a,b,groupingSet(for rollup), c + // So, the starting position of grouping set need to be known + public List getListGroupingSets() { + return listGroupingSets; + } + + public void setListGroupingSets(final List listGroupingSets) { + this.listGroupingSets = listGroupingSets; + } + + public boolean isGroupingSetsPresent() { + return groupingSetsPresent; + } + + public void setGroupingSetsPresent(boolean groupingSetsPresent) { + this.groupingSetsPresent = groupingSetsPresent; + } + + public int getGroupingSetPosition() { + return groupingSetPosition; + } + + public void setGroupingSetPosition(int groupingSetPosition) { + this.groupingSetPosition = groupingSetPosition; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -160,6 +160,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.ql.util.ObjectPair; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -217,6 +218,8 @@ //Max characters when auto generating the column name with func name private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20; + private static final String GROUPING_SET_KEY = "GROUPING_SET_KEY"; + private static class Phase1Ctx { String dest; int nextNum; @@ -790,6 +793,8 @@ break; case HiveParser.TOK_GROUPBY: + case HiveParser.TOK_ROLLUP_GROUPBY: + case HiveParser.TOK_CUBE_GROUPBY: // Get the groupby aliases - these are aliased to the entries in the // select list queryProperties.setHasGroupBy(true); @@ -802,6 +807,14 @@ } qbp.setGroupByExprForClause(ctx_1.dest, ast); skipRecursion = true; + + // Rollup and Cubes are syntactic sugar on top of grouping sets + if (ast.getToken().getType() == HiveParser.TOK_ROLLUP_GROUPBY) { + qbp.getDestRollups().add(ctx_1.dest); + } + else if (ast.getToken().getType() == HiveParser.TOK_CUBE_GROUPBY) { + qbp.getDestCubes().add(ctx_1.dest); + } break; case HiveParser.TOK_HAVING: @@ -1979,6 +1992,54 @@ } } + private List getGroupingSetsForRollup(int size) { + List groupingSetKeys = new ArrayList(); + for (int i = 0; i <= size; i++) { + int groupingSet = 0; + for (int pos = 0; pos < i; pos++) { + groupingSet += Math.pow(2, pos); + } + + groupingSetKeys.add(groupingSet); + } + + return groupingSetKeys; + } + + private List getGroupingSetsForCube(int size) { + List results = new ArrayList(); + getGroupingSetsForCube(results, 0, size-1); + return results; + } + + private void getGroupingSetsForCube(List results, int initValue, int size) { + if (size == 0) { + results.add(initValue); + results.add(initValue + 1); + return; + } + + getGroupingSetsForCube(results, initValue, size-1); + getGroupingSetsForCube(results, initValue + (int)Math.pow(2, size), size-1); + } + + // This function returns the grouping sets along with the grouping expressions + // Even if rollups and cubes are present in the query, they are converted to + // grouping sets at this point + private ObjectPair, List> getGroupByGroupingSetsForClause( + QBParseInfo parseInfo, String dest) { + List groupingSets = new ArrayList(); + List groupByExprs = getGroupByForClause(parseInfo, dest); + if (parseInfo.getDestRollups().contains(dest)) { + groupingSets = getGroupingSetsForRollup(groupByExprs.size()); + } + else if (parseInfo.getDestCubes().contains(dest)) { + groupingSets = getGroupingSetsForCube(groupByExprs.size()); + } + + return new ObjectPair, List>(groupByExprs, groupingSets); + } + /** * This function is a wrapper of parseInfo.getGroupByForClause which * automatically translates SELECT DISTINCT a,b,c to SELECT a,b,c GROUP BY @@ -2595,14 +2656,77 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } + // Add the grouping set key to the group by operator. + // This is not the first group by operator, but it is a subsequent group by operator + // which is forwarding the grouping keys introduced by the grouping sets. + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for GroupBy2 to pass the additional grouping keys introduced by + // GroupBy1 for the grouping set (corresponding to the rollup). + private void addGroupingSetKey( + List groupByKeys, + RowResolver groupByInputRowResolver, + RowResolver groupByOutputRowResolver, + List outputColumnNames, + Map colExprMap) throws SemanticException { + // For grouping sets, add a dummy grouping key + String groupingSetColumnName = + groupByInputRowResolver.get(null, GROUPING_SET_KEY).getInternalName(); + ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + groupingSetColumnName, null, false); + groupByKeys.add(inputExpr); + + String field = getColumnInternalName(groupByKeys.size() - 1); + outputColumnNames.add(field); + groupByOutputRowResolver.put(null, GROUPING_SET_KEY, + new ColumnInfo(field, TypeInfoFactory.stringTypeInfo, "", false)); + colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); + } + + // Process grouping set for the reduce sink operator + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for ReduceSink to add the additional grouping keys introduced by + // GroupBy1 into the reduce keys. + private void processGroupingSetReduceSinkOperator( + RowResolver reduceSinkInputRowResolver, + RowResolver reduceSinkOutputRowResolver, + List reduceKeys, + List outputKeyColumnNames, + Map colExprMap) throws SemanticException { + // add a key for reduce sink + String groupingSetColumnName = + reduceSinkInputRowResolver.get(null, GROUPING_SET_KEY).getInternalName(); + ExprNodeDesc inputExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + groupingSetColumnName, null, false); + reduceKeys.add(inputExpr); + + outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1)); + String field = Utilities.ReduceField.KEY.toString() + "." + + getColumnInternalName(reduceKeys.size() - 1); + ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get( + reduceKeys.size() - 1).getTypeInfo(), null, false); + reduceSinkOutputRowResolver.put(null, GROUPING_SET_KEY, colInfo); + colExprMap.put(colInfo.getInternalName(), inputExpr); + } + + /** * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)). * The new GroupByOperator will be a child of the reduceSinkOperatorInfo. @@ -2618,9 +2742,10 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanGroupByOperator1(QBParseInfo parseInfo, - String dest, Operator reduceSinkOperatorInfo, GroupByDesc.Mode mode, - Map genericUDAFEvaluators, - boolean distPartAgg) throws SemanticException { + String dest, Operator reduceSinkOperatorInfo, GroupByDesc.Mode mode, + Map genericUDAFEvaluators, + boolean distPartAgg, + boolean groupingSetsPresent) throws SemanticException { ArrayList outputColumnNames = new ArrayList(); RowResolver groupByInputRowResolver = opParseCtx .get(reduceSinkOperatorInfo).getRowResolver(); @@ -2648,6 +2773,16 @@ colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + // For grouping sets, add a dummy grouping key + if (groupingSetsPresent) { + addGroupingSetKey( + groupByKeys, + groupByInputRowResolver, + groupByOutputRowResolver, + outputColumnNames, + colExprMap); + } + HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); // get the last colName for the reduce KEY @@ -2667,7 +2802,7 @@ ArrayList aggParameters = new ArrayList(); boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI); - // If the function is distinct, partial aggregartion has not been done on + // If the function is distinct, partial aggregation has not been done on // the client side. // If distPartAgg is set, the client is letting us know that partial // aggregation has not been done. @@ -2744,9 +2879,14 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + // Nothing special needs to be done for grouping sets. + // This is the final group by operator, so multiple rows corresponding to the + // grouping sets have been generated upstream. Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - distPartAgg,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver + distPartAgg,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver .getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -2767,10 +2907,14 @@ * @return the new GroupByOperator */ @SuppressWarnings("nls") - private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, - Operator inputOperatorInfo, GroupByDesc.Mode mode, - Map genericUDAFEvaluators) - throws SemanticException { + private Operator genGroupByPlanMapGroupByOperator(QB qb, + String dest, + List grpByExprs, + Operator inputOperatorInfo, + GroupByDesc.Mode mode, + Map genericUDAFEvaluators, + List groupingSetKeys, + boolean groupingSetsPresent) throws SemanticException { RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); @@ -2781,7 +2925,6 @@ ArrayList outputColumnNames = new ArrayList(); ArrayList aggregations = new ArrayList(); Map colExprMap = new HashMap(); - List grpByExprs = getGroupByForClause(parseInfo, dest); for (int i = 0; i < grpByExprs.size(); ++i) { ASTNode grpbyExpr = grpByExprs.get(i); ExprNodeDesc grpByExprNode = genExprNodeDesc(grpbyExpr, @@ -2795,30 +2938,52 @@ colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + // The grouping set key is present after the grouping keys, before the distinct keys + int groupingSetsPosition = groupByKeys.size(); + + // For grouping sets, add a dummy grouping key + // This dummy key needs to be added as a reduce key + // For eg: consider: select key, value, count(1) from T group by key, value with rollup. + // Assuming map-side aggregation and no skew, the plan would look like: + // + // TableScan --> Select --> GroupBy1 --> ReduceSink --> GroupBy2 --> Select --> FileSink + // + // This function is called for GroupBy1 to create an additional grouping key + // for the grouping set (corresponding to the rollup). + if (groupingSetsPresent) { + // The value for the constant does not matter. It is replaced by the grouping set + // value for the actual implementation + ExprNodeConstantDesc constant = new ExprNodeConstantDesc("0"); + groupByKeys.add(constant); + String field = getColumnInternalName(groupByKeys.size() - 1); + outputColumnNames.add(field); + groupByOutputRowResolver.put(null, GROUPING_SET_KEY, + new ColumnInfo(field, TypeInfoFactory.stringTypeInfo, "", false)); + colExprMap.put(field, constant); + } + // If there is a distinctFuncExp, add all parameters to the reduceKeys. if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) { List list = parseInfo.getDistinctFuncExprsForClause(dest); - int numDistn = 0; for(ASTNode value: list) { // 0 is function name for (int i = 1; i < value.getChildCount(); i++) { ASTNode parameter = (ASTNode) value.getChild(i); if (groupByOutputRowResolver.getExpression(parameter) == null) { ExprNodeDesc distExprNode = genExprNodeDesc(parameter, - groupByInputRowResolver); + groupByInputRowResolver); groupByKeys.add(distExprNode); - numDistn++; - String field = getColumnInternalName(grpByExprs.size() + numDistn - - 1); + String field = getColumnInternalName(groupByKeys.size()-1); outputColumnNames.add(field); groupByOutputRowResolver.putExpression(parameter, new ColumnInfo( - field, distExprNode.getTypeInfo(), "", false)); + field, distExprNode.getTypeInfo(), "", false)); colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } } } } + // For each aggregation HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); @@ -2833,7 +2998,7 @@ for (int i = 1; i < value.getChildCount(); i++) { ASTNode paraExpr = (ASTNode) value.getChild(i); ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, - groupByInputRowResolver); + groupByInputRowResolver); aggParameters.add(paraExprNode); } @@ -2864,9 +3029,11 @@ float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), - inputOperatorInfo), groupByOutputRowResolver); + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, + groupingSetKeys, groupingSetsPresent, groupingSetsPosition), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), + inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; } @@ -2886,9 +3053,15 @@ * @throws SemanticException */ @SuppressWarnings("nls") - private Operator genGroupByPlanReduceSinkOperator(QB qb, String dest, - Operator inputOperatorInfo, int numPartitionFields, int numReducers, - boolean mapAggrDone) throws SemanticException { + private Operator genGroupByPlanReduceSinkOperator(QB qb, + String dest, + Operator inputOperatorInfo, + List grpByExprs, + int numPartitionFields, + boolean changeNumPartitionFields, + int numReducers, + boolean mapAggrDone, + boolean groupingSetsPresent) throws SemanticException { RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo) .getRowResolver(); @@ -2900,12 +3073,26 @@ List outputKeyColumnNames = new ArrayList(); List outputValueColumnNames = new ArrayList(); - List grpByExprs = getGroupByForClause(parseInfo, dest); ArrayList reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest, - reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, + reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, + colExprMap); + + // add a key for reduce sink + if (groupingSetsPresent) { + // Process grouping set for the reduce sink operator + processGroupingSetReduceSinkOperator( + reduceSinkInputRowResolver, + reduceSinkOutputRowResolver, + reduceKeys, + outputKeyColumnNames, colExprMap); + if (changeNumPartitionFields) { + numPartitionFields++; + } + } + List> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest, reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames); @@ -2936,11 +3123,15 @@ } ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - grpByExprs.size(), reduceValues, distinctColIndices, - outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, - numReducers), new RowSchema(reduceSinkOutputRowResolver - .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver); + OperatorFactory.getAndMakeChild( + PlanUtils.getReduceSinkDesc(reduceKeys, + groupingSetsPresent ? grpByExprs.size() + 1 : grpByExprs.size(), + reduceValues, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields, + numReducers), + new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), + inputOperatorInfo), + reduceSinkOutputRowResolver); rsOp.setColumnExprMap(colExprMap); return rsOp; } @@ -2955,7 +3146,7 @@ for (int i = 0; i < grpByExprs.size(); ++i) { ASTNode grpbyExpr = grpByExprs.get(i); ExprNodeDesc inputExpr = genExprNodeDesc(grpbyExpr, - reduceSinkInputRowResolver); + reduceSinkInputRowResolver); reduceKeys.add(inputExpr); if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) { outputKeyColumnNames.add(getColumnInternalName(reduceKeys.size() - 1)); @@ -2975,7 +3166,7 @@ } private List> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest, - ArrayList reduceKeys, RowResolver reduceSinkInputRowResolver, + List reduceKeys, RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver, List outputKeyColumnNames) throws SemanticException { @@ -3165,8 +3356,11 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanReduceSinkOperator2MR(QBParseInfo parseInfo, - String dest, Operator groupByOperatorInfo, int numPartitionFields, - int numReducers) throws SemanticException { + String dest, + Operator groupByOperatorInfo, + int numPartitionFields, + int numReducers, + boolean groupingSetsPresent) throws SemanticException { RowResolver reduceSinkInputRowResolver2 = opParseCtx.get( groupByOperatorInfo).getRowResolver(); RowResolver reduceSinkOutputRowResolver2 = new RowResolver(); @@ -3190,6 +3384,19 @@ reduceSinkOutputRowResolver2.putExpression(grpbyExpr, colInfo); colExprMap.put(colInfo.getInternalName(), inputExpr); } + + // add a key for reduce sink + if (groupingSetsPresent) { + // Note that partitioning fields dont need to change, since it is either + // partitioned randomly, or by all grouping keys + distinct keys + processGroupingSetReduceSinkOperator( + reduceSinkInputRowResolver2, + reduceSinkOutputRowResolver2, + reduceKeys, + outputColumnNames, + colExprMap); + } + // Get partial aggregation results and store in reduceValues ArrayList reduceValues = new ArrayList(); int inputField = reduceKeys.size(); @@ -3235,9 +3442,12 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanGroupByOperator2MR(QBParseInfo parseInfo, - String dest, Operator reduceSinkOperatorInfo2, GroupByDesc.Mode mode, - Map genericUDAFEvaluators) - throws SemanticException { + String dest, + Operator reduceSinkOperatorInfo2, + GroupByDesc.Mode mode, + Map genericUDAFEvaluators, + boolean groupingSetsPresent) throws SemanticException { + RowResolver groupByInputRowResolver2 = opParseCtx.get( reduceSinkOperatorInfo2).getRowResolver(); RowResolver groupByOutputRowResolver2 = new RowResolver(); @@ -3263,6 +3473,17 @@ new ColumnInfo(field, exprInfo.getType(), "", false)); colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1)); } + + // For grouping sets, add a dummy grouping key + if (groupingSetsPresent) { + addGroupingSetKey( + groupByKeys, + groupByInputRowResolver2, + groupByOutputRowResolver2, + outputColumnNames, + colExprMap); + } + HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); for (Map.Entry entry : aggregationTrees.entrySet()) { @@ -3304,9 +3525,12 @@ } float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), + false,groupByMemoryUsage,memoryThreshold, null, + false, 0), + new RowSchema(groupByOutputRowResolver2.getColumnInfos()), reduceSinkOperatorInfo2), groupByOutputRowResolver2); op.setColumnExprMap(colExprMap); return op; @@ -3344,14 +3568,32 @@ QBParseInfo parseInfo = qb.getParseInfo(); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + if (grpByExprs.isEmpty()) { numReducers = 1; } + // Grouping sets are not allowed + if (!groupingSets.isEmpty()) { + throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg()); + } + // ////// 1. Generate ReduceSinkOperator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, input, grpByExprs.size(), numReducers, false); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + input, + grpByExprs, + grpByExprs.size(), + false, + numReducers, + false, + false); // ////// 2. Generate GroupbyOperator Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo, @@ -3502,19 +3744,19 @@ // ////// 2. Generate GroupbyOperator Operator groupByOperatorInfo = genGroupByPlanGroupByOperator1(parseInfo, - dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true); + dest, input, GroupByDesc.Mode.HASH, genericUDAFEvaluators, true, false); int numReducers = -1; List grpByExprs = getGroupByForClause(parseInfo, dest); // ////// 3. Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false); // ////// 4. Generate GroupbyOperator2 Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + genericUDAFEvaluators, false); return groupByOperatorInfo2; } @@ -3566,6 +3808,19 @@ QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + + // Grouping sets are not allowed + // This restriction can be lifted in future. + // HIVE-3508 has been filed for this + if (!groupingSets.isEmpty()) { + throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg()); + } + // ////// 1. Generate ReduceSinkOperator // There is a special case when we want the rows to be randomly distributed // to @@ -3573,31 +3828,37 @@ // DISTINCT // operator. We set the numPartitionColumns to -1 for this purpose. This is // captured by WritableComparableHiveObject.hashCode() function. - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, input, (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? - -1 : Integer.MAX_VALUE), -1, false); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + input, + grpByExprs, + (parseInfo.getDistinctFuncExprsForClause(dest).isEmpty() ? -1 : Integer.MAX_VALUE), + false, + -1, + false, + false); // ////// 2. Generate GroupbyOperator Map genericUDAFEvaluators = new LinkedHashMap(); GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanGroupByOperator( - parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1, - genericUDAFEvaluators); + parseInfo, dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIAL1, + genericUDAFEvaluators); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// 3. Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo, grpByExprs.size(), numReducers, false); // ////// 4. Generate GroupbyOperator2 Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(parseInfo, - dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + dest, reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, + genericUDAFEvaluators, false); return groupByOperatorInfo2; } @@ -3615,6 +3876,87 @@ return true; } + private void extractColumns( + Set colNamesExprs, + ExprNodeDesc exprNode) throws SemanticException { + if (exprNode instanceof ExprNodeColumnDesc) { + colNamesExprs.add(((ExprNodeColumnDesc)exprNode).getColumn()); + return; + } + + if (exprNode instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc)exprNode; + for (ExprNodeDesc childExpr: funcDesc.getChildExprs()) { + extractColumns(colNamesExprs, childExpr); + } + } + } + + private boolean intersect(Set set1, Set set2) { + for (String elem1 : set1) { + if (set2.contains(elem1)) { + return true; + } + } + + return false; + } + + private void checkExpressionsForGroupingSet( + List grpByExprs, + List distinctGrpByExprs, + Map aggregationTrees, + RowResolver inputRowResolver + ) + throws SemanticException { + + Set colNamesGroupByExprs = new HashSet(); + Set colNamesGroupByDistinctExprs = new HashSet(); + Set colNamesAggregateParameters = new HashSet(); + + // The columns in the group by expressions should not intersect with the columns in the + // distinct expressions + for (ASTNode grpByExpr : grpByExprs) { + extractColumns(colNamesGroupByExprs, genExprNodeDesc(grpByExpr, inputRowResolver)); + } + + // If there is a distinctFuncExp, add all parameters to the reduceKeys. + if (!distinctGrpByExprs.isEmpty()) { + for(ASTNode value: distinctGrpByExprs) { + // 0 is function name + for (int i = 1; i < value.getChildCount(); i++) { + ASTNode parameter = (ASTNode) value.getChild(i); + ExprNodeDesc distExprNode = genExprNodeDesc(parameter, inputRowResolver); + // extract all the columns + extractColumns(colNamesGroupByDistinctExprs, distExprNode); + } + + if (intersect(colNamesGroupByExprs, colNamesGroupByDistinctExprs)) { + throw + new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg()); + } + } + } + + for (Map.Entry entry : aggregationTrees.entrySet()) { + ASTNode value = entry.getValue(); + ArrayList aggParameters = new ArrayList(); + // 0 is the function name + for (int i = 1; i < value.getChildCount(); i++) { + ASTNode paraExpr = (ASTNode) value.getChild(i); + ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, inputRowResolver); + + // extract all the columns + extractColumns(colNamesAggregateParameters, paraExprNode); + } + + if (intersect(colNamesGroupByExprs, colNamesAggregateParameters)) { + throw + new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID.getMsg()); + } + } + } + /** * Generate a Group-By plan using 1 map-reduce job. First perform a map-side * partial aggregation (to reduce the amount of data), at this point of time, @@ -3635,31 +3977,58 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanMapAggr1MR(String dest, QB qb, - Operator inputOperatorInfo) throws SemanticException { + Operator inputOperatorInfo) throws SemanticException { QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + boolean groupingSetsPresent = !groupingSets.isEmpty(); + + if (groupingSetsPresent) { + checkExpressionsForGroupingSet(grpByExprs, + parseInfo.getDistinctFuncExprsForClause(dest), + parseInfo.getAggregationExprsForClause(dest), + opParseCtx.get(inputOperatorInfo).getRowResolver()); + } + // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = new LinkedHashMap(); - GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( - qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, - genericUDAFEvaluators); + GroupByOperator groupByOperatorInfo = + (GroupByOperator) genGroupByPlanMapGroupByOperator( + qb, + dest, + grpByExprs, + inputOperatorInfo, + GroupByDesc.Mode.HASH, + genericUDAFEvaluators, + groupingSets, + groupingSetsPresent); groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get( - inputOperatorInfo).getRowResolver().getTableNames()); + inputOperatorInfo).getRowResolver().getTableNames()); int numReducers = -1; // Optimize the scenario when there are no grouping keys - only 1 reducer is // needed - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, grpByExprs.size(), numReducers, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + grpByExprs.size(), + true, + numReducers, + true, + groupingSetsPresent); // This is a 1-stage map-reduce processing of the groupby. Tha map-side // aggregates was just used to @@ -3669,8 +4038,8 @@ // used, and merge is invoked // on the reducer. return genGroupByPlanGroupByOperator1(parseInfo, dest, - reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL, - genericUDAFEvaluators, false); + reduceSinkOperatorInfo, GroupByDesc.Mode.MERGEPARTIAL, + genericUDAFEvaluators, false, groupingSetsPresent); } /** @@ -3706,16 +4075,31 @@ */ @SuppressWarnings("nls") private Operator genGroupByPlanMapAggr2MR(String dest, QB qb, - Operator inputOperatorInfo) throws SemanticException { + Operator inputOperatorInfo) throws SemanticException { QBParseInfo parseInfo = qb.getParseInfo(); + ObjectPair, List> grpByExprsGroupingSets = + getGroupByGroupingSetsForClause(parseInfo, dest); + + List grpByExprs = grpByExprsGroupingSets.getFirst(); + List groupingSets = grpByExprsGroupingSets.getSecond(); + boolean groupingSetsPresent = !groupingSets.isEmpty(); + + if (groupingSetsPresent) { + checkExpressionsForGroupingSet(grpByExprs, + parseInfo.getDistinctFuncExprsForClause(dest), + parseInfo.getAggregationExprsForClause(dest), + opParseCtx.get(inputOperatorInfo).getRowResolver()); + } + // ////// Generate GroupbyOperator for a map-side partial aggregation Map genericUDAFEvaluators = new LinkedHashMap(); - GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator( - qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH, - genericUDAFEvaluators); + GroupByOperator groupByOperatorInfo = + (GroupByOperator) genGroupByPlanMapGroupByOperator( + qb, dest, grpByExprs, inputOperatorInfo, GroupByDesc.Mode.HASH, + genericUDAFEvaluators, groupingSets, groupingSetsPresent); groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get( inputOperatorInfo).getRowResolver().getTableNames()); @@ -3723,40 +4107,57 @@ // map-reduce jobs are not needed // For eg: select count(1) from T where t.ds = .... if (!optimizeMapAggrGroupBy(dest, qb)) { + List distinctFuncExprs = parseInfo.getDistinctFuncExprsForClause(dest); // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, (parseInfo - .getDistinctFuncExprsForClause(dest).isEmpty() ? -1 - : Integer.MAX_VALUE), -1, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + distinctFuncExprs.isEmpty() ? -1 : Integer.MAX_VALUE, + false, + -1, + true, + groupingSetsPresent); // ////// Generate GroupbyOperator for a partial aggregation Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, - dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS, - genericUDAFEvaluators, false); + dest, reduceSinkOperatorInfo, GroupByDesc.Mode.PARTIALS, + genericUDAFEvaluators, false, groupingSetsPresent); int numReducers = -1; - List grpByExprs = getGroupByForClause(parseInfo, dest); if (grpByExprs.isEmpty()) { numReducers = 1; } // ////// Generate ReduceSinkOperator2 Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR( - parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers); + parseInfo, dest, groupByOperatorInfo2, grpByExprs.size(), numReducers, + groupingSetsPresent); // ////// Generate GroupbyOperator3 return genGroupByPlanGroupByOperator2MR(parseInfo, dest, - reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, - genericUDAFEvaluators); + reduceSinkOperatorInfo2, GroupByDesc.Mode.FINAL, + genericUDAFEvaluators, groupingSetsPresent); } else { + // If there are no grouping keys, grouping sets cannot be present + assert !groupingSetsPresent; + // ////// Generate ReduceSink Operator - Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(qb, - dest, groupByOperatorInfo, getGroupByForClause(parseInfo, dest) - .size(), 1, true); + Operator reduceSinkOperatorInfo = + genGroupByPlanReduceSinkOperator(qb, + dest, + groupByOperatorInfo, + grpByExprs, + grpByExprs.size(), + false, + 1, + true, + groupingSetsPresent); return genGroupByPlanGroupByOperator2MR(parseInfo, dest, - reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators); + reduceSinkOperatorInfo, GroupByDesc.Mode.FINAL, genericUDAFEvaluators, false); } } @@ -5187,8 +5588,9 @@ float groupByMemoryUsage = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); float memoryThreshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( - new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false,groupByMemoryUsage,memoryThreshold), new RowSchema(groupByOutputRowResolver.getColumnInfos()), + new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, + false,groupByMemoryUsage,memoryThreshold, null, false, 0), + new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -5824,6 +6226,11 @@ List oldASTList = null; for (String dest : ks) { + // If a grouping set aggregation is present, common processing is not possible + if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) { + return null; + } + // If a filter is present, common processing is not possible if (qbp.getWhrForClause(dest) != null) { return null; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -35,7 +35,6 @@ * Implementation of the parse information related to a query block. * **/ - public class QBParseInfo { private final boolean isSubQ; @@ -49,6 +48,8 @@ private final Map destToSelExpr; private final HashMap destToWhereExpr; private final HashMap destToGroupby; + private final Set destRollups; + private final Set destCubes; private final Map destToHaving; private final HashSet insertIntoTables; @@ -105,6 +106,8 @@ destToOrderby = new HashMap(); destToLimit = new HashMap(); insertIntoTables = new HashSet(); + destRollups = new HashSet(); + destCubes = new HashSet(); destToAggregationExprs = new LinkedHashMap>(); destToDistinctFuncExprs = new HashMap>(); @@ -241,6 +244,14 @@ return destToGroupby.get(clause); } + public Set getDestRollups() { + return destRollups; + } + + public Set getDestCubes() { + return destCubes; + } + public HashMap getDestToGroupBy() { return destToGroupby; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -74,6 +74,8 @@ TOK_EXPLIST; TOK_ALIASLIST; TOK_GROUPBY; +TOK_ROLLUP_GROUPBY; +TOK_CUBE_GROUPBY; TOK_HAVING; TOK_ORDERBY; TOK_CLUSTERBY; @@ -1792,6 +1794,9 @@ KW_GROUP KW_BY groupByExpression ( COMMA groupByExpression )* + ((rollup=KW_WITH KW_ROLLUP)? | (cube=KW_WITH KW_CUBE)?) + -> {rollup != null}? ^(TOK_ROLLUP_GROUPBY groupByExpression+) + -> {cube != null}? ^(TOK_CUBE_GROUPBY groupByExpression+) -> ^(TOK_GROUPBY groupByExpression+) ; @@ -2417,6 +2422,8 @@ KW_RESTRICT: 'RESTRICT'; KW_CASCADE: 'CASCADE'; KW_SKEWED: 'SKEWED'; +KW_ROLLUP: 'ROLLUP'; +KW_CUBE: 'CUBE'; // Operators Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1391855) +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy) @@ -270,6 +270,13 @@ HIVE_INTERNAL_DDL_LIST_BUCKETING_DISABLED(10208, "List Bucketing DDL is not allowed to use since feature is not completed yet."), + HIVE_GROUPING_SETS_AGGR_NOMAPAGGR(10209, + "Grouping sets aggreations (with rollups or cubes) are not allowed if map-side " + + " aggregation is turned off. set hive.map.aggr=true if you want to use grouping sets"), + HIVE_GROUPING_SETS_AGGR_EXPRESSION_INVALID(10210, + "Grouping sets aggreations (with rollups or cubes) are not allowed if aggregation function " + + "parameters overlap with the aggregation functions columns"), + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."),