diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java index c9848da..223a17d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -32,12 +37,13 @@ import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; /** * This optimizer is used to reduce the input size for the query for queries which are @@ -63,9 +69,6 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { Map opToPartPruner = pctx.getOpToPartPruner(); Map nameToSplitSample = pctx.getNameToSplitSample(); - QB qb = pctx.getQB(); - QBParseInfo qbParseInfo = qb.getParseInfo(); - // determine the query qualifies reduce input size for LIMIT // The query only qualifies when there are only one top operator // and there is no transformer or UDTF and no block sampling @@ -86,15 +89,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // FROM ... LIMIT... // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2); // - Integer tempGlobalLimit = checkQbpForGlobalLimit(qb); + TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; + Integer tempGlobalLimit = checkQbpForGlobalLimit(ts); // query qualify for the optimization if (tempGlobalLimit != null && tempGlobalLimit != 0) { - TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; Table tab = ts.getConf().getTableMetadata(); if (!tab.isPartitioned()) { - if (qbParseInfo.getDestToWhereExpr().isEmpty()) { + Set filterOps = + OperatorUtils.findOperators(ts, FilterOperator.class); + if (filterOps.size() == 0) { globalLimitCtx.enableOpt(tempGlobalLimit); } } else { @@ -130,48 +135,53 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } /** - * Recursively check the limit number in all sub queries + * Check the limit number in all sub queries * - * @param qbParseInfo * @return if there is one and only one limit for all subqueries, return the limit * if there is no limit, return 0 * otherwise, return null */ - private Integer checkQbpForGlobalLimit(QB localQb) { - QBParseInfo qbParseInfo = localQb.getParseInfo(); - if (localQb.getNumSelDi() == 0 && qbParseInfo.getDestToClusterBy().isEmpty() - && qbParseInfo.getDestToDistributeBy().isEmpty() - && qbParseInfo.getDestToOrderBy().isEmpty() - && qbParseInfo.getDestToSortBy().isEmpty() - && qbParseInfo.getDestToAggregationExprs().size() <= 1 - && qbParseInfo.getDestToDistinctFuncExprs().size() <= 1 - && qbParseInfo.getNameToSample().isEmpty()) { - if ((qbParseInfo.getDestToAggregationExprs().size() < 1 || - qbParseInfo.getDestToAggregationExprs().values().iterator().next().isEmpty()) && - (qbParseInfo.getDestToDistinctFuncExprs().size() < 1 || - qbParseInfo.getDestToDistinctFuncExprs().values().iterator().next().isEmpty()) - && qbParseInfo.getDestToLimit().size() <= 1) { - Integer retValue; - if (qbParseInfo.getDestToLimit().size() == 0) { - retValue = 0; - } else { - retValue = qbParseInfo.getDestToLimit().values().iterator().next(); + private static Integer checkQbpForGlobalLimit( + TableScanOperator ts) { + Set reduceSinkOps = + OperatorUtils.findOperators(ts, ReduceSinkOperator.class); + for (ReduceSinkOperator reduceSinkOp : reduceSinkOps) { + ReduceSinkDesc reduceSinkConf = reduceSinkOp.getConf(); + if (reduceSinkConf.getPartitionCols() != null && + !reduceSinkConf.getPartitionCols().isEmpty()) { + return null; } - - for (String alias : localQb.getSubqAliases()) { - Integer limit = checkQbpForGlobalLimit(localQb.getSubqForAlias(alias).getQB()); - if (limit == null) { + if (reduceSinkConf.getOrder() != null && !reduceSinkConf.getOrder().isEmpty()) { return null; - } else if (retValue > 0 && limit > 0) { - // Any query has more than one LIMITs shown in the query is not - // qualified to this optimization + } + } + Set groupByOps = + OperatorUtils.findOperators(ts, GroupByOperator.class); + for (GroupByOperator groupByOp : groupByOps) { + GroupByDesc groupByConf = groupByOp.getConf(); + if (groupByConf.getAggregators() != null && + !groupByConf.getAggregators().isEmpty()) { return null; - } else if (limit > 0) { - retValue = limit; - } } - return retValue; - } + if (groupByConf.isDistinct() || groupByConf.isDistinctLike()) { + return null; + } + } + Set filterOps = + OperatorUtils.findOperators(ts, FilterOperator.class); + for (FilterOperator filterOp : filterOps) { + FilterDesc filterOpConf = filterOp.getConf(); + if (filterOpConf.getIsSamplingPred()) { + return null; + } + } + Set limitOps = + OperatorUtils.findOperators(ts, LimitOperator.class); + if (limitOps.size() == 1) { + return limitOps.iterator().next().getConf().getLimit(); + } + else if (limitOps.size() == 0) { + return 0; } return null; }