Index: ql/src/test/results/clientpositive/groupby_sort_8.q.out =================================================================== --- ql/src/test/results/clientpositive/groupby_sort_8.q.out (revision 0) +++ ql/src/test/results/clientpositive/groupby_sort_8.q.out (working copy) @@ -0,0 +1,124 @@ +PREHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1') +PREHOOK: type: LOAD +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@t1 +POSTHOOK: Output: default@t1@ds=1 +PREHOOK: query: -- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t1@ds=1 +PREHOOK: Output: default@t1@ds=1 +POSTHOOK: query: -- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@ds=1 +POSTHOOK: Output: default@t1@ds=1 +POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns and grouping +-- columns match, the user is issueing a distinct +EXPLAIN +select count(distinct key) from T1 +PREHOOK: type: QUERY +POSTHOOK: query: -- The plan is not converted to a map-side, since although the sorting columns and grouping +-- columns match, the user is issueing a distinct +EXPLAIN +select count(distinct key) from T1 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME T1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t1 + TableScan + alias: t1 + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count(DISTINCT key) + bucketGroup: true + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(DISTINCT KEY._col0:0._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select count(distinct key) from T1 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t1@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct key) from T1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t1@ds=1 +#### A masked pattern was here #### +POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] +5 +PREHOOK: query: DROP TABLE T1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: DROP TABLE T1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +POSTHOOK: Lineage: t1 PARTITION(ds=1).key SIMPLE [(t1)t1.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1 PARTITION(ds=1).val SIMPLE [(t1)t1.FieldSchema(name:val, type:string, comment:null), ] Index: ql/src/test/queries/clientpositive/groupby_sort_8.q =================================================================== --- ql/src/test/queries/clientpositive/groupby_sort_8.q (revision 0) +++ ql/src/test/queries/clientpositive/groupby_sort_8.q (working copy) @@ -0,0 +1,20 @@ +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; +set hive.exec.reducers.max = 10; +set hive.map.groupby.sorted=true; + +CREATE TABLE T1(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1 PARTITION (ds='1'); + +-- perform an insert to make sure there are 2 files +INSERT OVERWRITE TABLE T1 PARTITION (ds='1') select key, val from T1 where ds = '1'; + +-- The plan is not converted to a map-side, since although the sorting columns and grouping +-- columns match, the user is issueing a distinct +EXPLAIN +select count(distinct key) from T1; +select count(distinct key) from T1; + +DROP TABLE T1; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (revision 1454644) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (working copy) @@ -175,7 +175,9 @@ boolean useMapperSort = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT); - if (useMapperSort && (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) { + // Dont remove the operator for distincts + if (useMapperSort && !groupByOp.getConf().isDistinct() && + (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) { convertGroupByMapSideSortedGroupBy(groupByOp, depth); } else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) || Index: ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (revision 1454644) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (working copy) @@ -65,6 +65,7 @@ private ArrayList outputColumnNames; private float groupByMemoryUsage; private float memoryThreshold; + transient private boolean isDistinct; public GroupByDesc() { } @@ -79,10 +80,11 @@ final float memoryThreshold, final List listGroupingSets, final boolean groupingSetsPresent, - final int groupingSetsPosition) { + final int groupingSetsPosition, + final boolean isDistinct) { this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey, false, groupByMemoryUsage, memoryThreshold, listGroupingSets, - groupingSetsPresent, groupingSetsPosition); + groupingSetsPresent, groupingSetsPosition, isDistinct); } public GroupByDesc( @@ -96,7 +98,8 @@ final float memoryThreshold, final List listGroupingSets, final boolean groupingSetsPresent, - final int groupingSetsPosition) { + final int groupingSetsPosition, + final boolean isDistinct) { this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -108,6 +111,7 @@ this.listGroupingSets = listGroupingSets; this.groupingSetsPresent = groupingSetsPresent; this.groupingSetPosition = groupingSetsPosition; + this.isDistinct = isDistinct; } public Mode getMode() { @@ -249,4 +253,8 @@ public void setGroupingSetPosition(int groupingSetPosition) { this.groupingSetPosition = groupingSetPosition; } + + public boolean isDistinct() { + return isDistinct; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1454644) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -2849,7 +2849,7 @@ Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, 0), + false, groupByMemoryUsage, memoryThreshold, null, false, 0, numDistinctUDFs > 0), new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -3023,11 +3023,13 @@ reduceValues = ((ReduceSinkDesc) reduceSinkOperatorInfo.getConf()).getValueCols(); } int numDistinctUDFs = 0; + boolean containsDistinctAggr = false; for (Map.Entry entry : aggregationTrees.entrySet()) { ASTNode value = entry.getValue(); String aggName = unescapeIdentifier(value.getChild(0).getText()); ArrayList aggParameters = new ArrayList(); boolean isDistinct = (value.getType() == HiveParser.TOK_FUNCTIONDI); + containsDistinctAggr = containsDistinctAggr || isDistinct; // If the function is distinct, partial aggregation has not been done on // the client side. @@ -3129,7 +3131,7 @@ distPartAgg, groupByMemoryUsage, memoryThreshold, groupingSets, groupingSetsPresent && groupingSetsNeedAdditionalMRJob, - groupingSetsPosition), + groupingSetsPosition, containsDistinctAggr), new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -3250,6 +3252,7 @@ .getAggregationExprsForClause(dest); assert (aggregationTrees != null); + boolean containsDistinctAggr = false; for (Map.Entry entry : aggregationTrees.entrySet()) { ASTNode value = entry.getValue(); String aggName = unescapeIdentifier(value.getChild(0).getText()); @@ -3265,6 +3268,7 @@ } boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI; + containsDistinctAggr = containsDistinctAggr || isDistinct; boolean isAllColumns = value.getType() == HiveParser.TOK_FUNCTIONSTAR; Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); @@ -3293,7 +3297,7 @@ Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, false, groupByMemoryUsage, memoryThreshold, - groupingSetKeys, groupingSetsPresent, groupingSetsPosition), + groupingSetKeys, groupingSetsPresent, groupingSetsPosition, containsDistinctAggr), new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -3752,6 +3756,7 @@ HashMap aggregationTrees = parseInfo .getAggregationExprsForClause(dest); + boolean containsDistinctAggr = false; for (Map.Entry entry : aggregationTrees.entrySet()) { ArrayList aggParameters = new ArrayList(); ASTNode value = entry.getValue(); @@ -3768,6 +3773,7 @@ String aggName = unescapeIdentifier(value.getChild(0).getText()); boolean isDistinct = value.getType() == HiveParser.TOK_FUNCTIONDI; + containsDistinctAggr = containsDistinctAggr || isDistinct; boolean isStar = value.getType() == HiveParser.TOK_FUNCTIONSTAR; Mode amode = groupByDescModeToUDAFMode(mode, isDistinct); GenericUDAFEvaluator genericUDAFEvaluator = genericUDAFEvaluators @@ -3795,7 +3801,7 @@ Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, 0), + false, groupByMemoryUsage, memoryThreshold, null, false, 0, containsDistinctAggr), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), reduceSinkOperatorInfo2), groupByOutputRowResolver2); op.setColumnExprMap(colExprMap); @@ -5998,7 +6004,7 @@ .getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false, groupByMemoryUsage, memoryThreshold, null, false, 0), + false, groupByMemoryUsage, memoryThreshold, null, false, 0, false), new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver);