Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1463842) +++ conf/hive-default.xml.template (working copy) @@ -534,6 +534,15 @@ + hive.map.groupby.sorted.testmode + false + If the bucketing/sorting properties of the table exactly match the grouping key, whether to + perform the group by in the mapper by using BucketizedHiveInputFormat. If the test mode is set, the plan + is not converted, but a query property is set to denote the same. + + + + hive.new.job.grouping.set.cardinality 30 Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1463842) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -423,6 +423,7 @@ HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5), HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true), HIVE_MAP_GROUPBY_SORT("hive.map.groupby.sorted", false), + HIVE_MAP_GROUPBY_SORT_TESTMODE("hive.map.groupby.sorted.testmode", false), HIVE_GROUPBY_ORDERBY_POSITION_ALIAS("hive.groupby.orderby.position.alias", false), HIVE_NEW_JOB_GROUPING_SET_CARDINALITY("hive.new.job.grouping.set.cardinality", 30), @@ -765,7 +766,7 @@ // ptf partition constants HIVE_PTF_PARTITION_PERSISTENCE_CLASS("hive.ptf.partition.persistence", "org.apache.hadoop.hive.ql.exec.PTFPersistence$PartitionedByteBasedList"), - HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize", + HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize", (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB ; Index: ql/src/test/results/clientpositive/groupby_sort_test_1.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_sort_test_1.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_sort_test_1.q.out (working copy) @@ -0,0 +1,127 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +PREHOOK: query: -- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 select key, val from T1 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: -- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 select key, val from T1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE outputTbl1(key int, cnt int) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE outputTbl1(key int, cnt int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@outputTbl1 +POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: -- The plan should be converted to a map-side group by if the group by key +-- matches the sorted key. However, in test mode, the group by wont be converted. +EXPLAIN +INSERT OVERWRITE TABLE outputTbl1 +SELECT key, count(1) FROM T1 GROUP BY key +PREHOOK: type: QUERY +POSTHOOK: query: -- The plan should be converted to a map-side group by if the group by key +-- matches the sorted key. However, in test mode, the group by wont be converted. +EXPLAIN +INSERT OVERWRITE TABLE outputTbl1 +SELECT key, count(1) FROM T1 GROUP BY key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: t1.key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1.val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME outputTbl1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL key)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: UDFToInteger(_col1) + type: int + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.outputtbl1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.outputtbl1 + + Stage: Stage-2 + Stats-Aggr Operator + + Index: ql/src/test/queries/clientpositive/groupby_sort_test_1.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_sort_test_1.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_sort_test_1.q (working copy) @@ -0,0 +1,21 @@ +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; +set hive.exec.reducers.max = 10; +set hive.map.groupby.sorted=true; +set hive.map.groupby.sorted.testmode=true; + +CREATE TABLE T1(key STRING, val STRING) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1; + +-- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 select key, val from T1; + +CREATE TABLE outputTbl1(key int, cnt int); + +-- The plan should be converted to a map-side group by if the group by key +-- matches the sorted key. However, in test mode, the group by wont be converted. +EXPLAIN +INSERT OVERWRITE TABLE outputTbl1 +SELECT key, count(1) FROM T1 GROUP BY key; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (revision 1463842) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (working copy) @@ -178,7 +178,7 @@ // Dont remove the operator for distincts if (useMapperSort && !groupByOp.getConf().isDistinct() && (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) { - convertGroupByMapSideSortedGroupBy(groupByOp, depth); + convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth); } else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) || (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) { @@ -455,7 +455,14 @@ // Convert the group by to a map-side group by // The operators specified by depth and removed from the tree. - protected void convertGroupByMapSideSortedGroupBy(GroupByOperator groupByOp, int depth) { + protected void convertGroupByMapSideSortedGroupBy( + HiveConf conf, GroupByOperator groupByOp, int depth) { + // In test mode, dont change the query plan. However, setup a query property + pGraphContext.getQueryProperties().setHasMapGroupBy(true); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) { + return; + } + if (groupByOp.removeChildren(depth)) { // Use bucketized hive input format - that makes sure that one mapper reads the entire file groupByOp.setUseBucketizedHiveInputFormat(true); Index: ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (revision 1463842) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (working copy) @@ -46,6 +46,7 @@ boolean hasDistributeBy = false; boolean hasClusterBy = false; boolean mapJoinRemoved = false; + boolean hasMapGroupBy = false; public boolean hasJoin() { return hasJoin; @@ -134,4 +135,12 @@ public void setMapJoinRemoved(boolean mapJoinRemoved) { this.mapJoinRemoved = mapJoinRemoved; } + + public boolean isHasMapGroupBy() { + return hasMapGroupBy; + } + + public void setHasMapGroupBy(boolean hasMapGroupBy) { + this.hasMapGroupBy = hasMapGroupBy; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1463842) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -352,7 +352,8 @@ listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, - reduceSinkOperatorsAddedByEnforceBucketingSorting); + reduceSinkOperatorsAddedByEnforceBucketingSorting, + queryProperties); } @SuppressWarnings("nls") @@ -8660,7 +8661,7 @@ listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, - reduceSinkOperatorsAddedByEnforceBucketingSorting); + reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties); // Generate table access stats if required if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 1463842) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -113,6 +114,7 @@ private List> rootTasks; private FetchTask fetchTask; + private QueryProperties queryProperties; public ParseContext() { } @@ -180,7 +182,8 @@ HashSet semanticInputs, List> rootTasks, Map> opToPartToSkewedPruner, Map viewAliasToInput, - List reduceSinkOperatorsAddedByEnforceBucketingSorting) { + List reduceSinkOperatorsAddedByEnforceBucketingSorting, + QueryProperties queryProperties) { this.conf = conf; this.qb = qb; this.ast = ast; @@ -212,6 +215,7 @@ this.viewAliasToInput = viewAliasToInput; this.reduceSinkOperatorsAddedByEnforceBucketingSorting = reduceSinkOperatorsAddedByEnforceBucketingSorting; + this.queryProperties = queryProperties; } /** @@ -623,4 +627,12 @@ public Map getViewAliasToInput() { return viewAliasToInput; } + + public QueryProperties getQueryProperties() { + return queryProperties; + } + + public void setQueryProperties(QueryProperties queryProperties) { + this.queryProperties = queryProperties; + } }