diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 4d9d1da..288dc6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -27,7 +27,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.mapred.OutputCollector; public class OperatorUtils { @@ -147,4 +150,109 @@ private static void iterateParents(Operator operator, Function> f public static boolean sameRowSchema(Operator operator1, Operator operator2) { return operator1.getSchema().equals(operator2.getSchema()); } + + /** + * Returns if order of rows is imposed starting at the operator. + * Order may be imposed by any RS operator. + * + * @return true if order is imposed, false otherwise + */ + public static boolean existsOrdering(Operator op) { + Set reduceSinkOps = findOperators(op, ReduceSinkOperator.class); + for (ReduceSinkOperator reduceSinkOp : reduceSinkOps) { + ReduceSinkDesc reduceSinkConf = reduceSinkOp.getConf(); + if (reduceSinkConf.getOrder() != null && !reduceSinkConf.getOrder().isEmpty()) { + return true; + } + } + return false; + } + + /** + * Returns if any partitioning exists starting at the operator. + * Partitioning may be imposed by any RS operator. + * + * @return true if partitioning exists, false otherwise + */ + public static boolean existsPartitioning(Operator op) { + Set reduceSinkOps = findOperators(op, ReduceSinkOperator.class); + for (ReduceSinkOperator reduceSinkOp : reduceSinkOps) { + ReduceSinkDesc reduceSinkConf = reduceSinkOp.getConf(); + if (reduceSinkConf.getPartitionCols() != null && + !reduceSinkConf.getPartitionCols().isEmpty()) { + return true; + } + } + return false; + } + + /** + * Returns if any aggregation is done starting at the operator. + * Aggregation functions are applied in GBY operators. + * + * @return true if there is at least an aggregation, false otherwise + */ + public static boolean existsAggregate(Operator op) { + Set groupByOps = OperatorUtils.findOperators(op, GroupByOperator.class); + for (GroupByOperator groupByOp : groupByOps) { + GroupByDesc groupByConf = groupByOp.getConf(); + if (groupByConf.getAggregators() != null && + !groupByConf.getAggregators().isEmpty()) { + return true; + } + } + return false; + } + + /** + * Returns if there is a distinct aggregate starting at the operator. + * + * @return true if there is at least a distinct, false otherwise + */ + public static boolean existsDistinctAggregate(Operator op) { + Set groupByOps = OperatorUtils.findOperators(op, GroupByOperator.class); + for (GroupByOperator groupByOp : groupByOps) { + GroupByDesc groupByConf = groupByOp.getConf(); + if (groupByConf.isDistinct()) { + return true; + } + } + return false; + } + + /** + * Returns if there is a filter with a sampling predicate starting + * at the operator. + * + * @return true if there is at least a sampling predicate, false otherwise + */ + public static boolean existsSamplingFilter(Operator op) { + Set filterOps = OperatorUtils.findOperators(op, FilterOperator.class); + for (FilterOperator filterOp : filterOps) { + FilterDesc filterOpConf = filterOp.getConf(); + if (filterOpConf.getIsSamplingPred()) { + return true; + } + } + return false; + } + + /** + * Returns the limit (if any) of returned rows starting at the operator. + * + * @return if there is one and only one limit starting at op, return the limit + * if there is no limit, return 0 + * otherwise, return null + */ + public static Integer findSingleLimit(Operator op) { + Set limitOps = OperatorUtils.findOperators( + op, LimitOperator.class); + if (limitOps.size() == 1) { + return limitOps.iterator().next().getConf().getLimit(); + } + else if (limitOps.size() == 0) { + return 0; + } + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java index c9848da..c0b9b45 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -32,8 +34,6 @@ import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -63,9 +63,6 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { Map opToPartPruner = pctx.getOpToPartPruner(); Map nameToSplitSample = pctx.getNameToSplitSample(); - QB qb = pctx.getQB(); - QBParseInfo qbParseInfo = qb.getParseInfo(); - // determine the query qualifies reduce input size for LIMIT // The query only qualifies when there are only one top operator // and there is no transformer or UDTF and no block sampling @@ -86,15 +83,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // FROM ... LIMIT... // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2); // - Integer tempGlobalLimit = checkQbpForGlobalLimit(qb); + TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; + Integer tempGlobalLimit = checkQbpForGlobalLimit(ts); // query qualify for the optimization if (tempGlobalLimit != null && tempGlobalLimit != 0) { - TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; Table tab = ts.getConf().getTableMetadata(); if (!tab.isPartitioned()) { - if (qbParseInfo.getDestToWhereExpr().isEmpty()) { + Set filterOps = + OperatorUtils.findOperators(ts, FilterOperator.class); + if (filterOps.size() == 0) { globalLimitCtx.enableOpt(tempGlobalLimit); } } else { @@ -130,49 +129,33 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } /** - * Recursively check the limit number in all sub queries + * Check the limit number in all sub queries * - * @param qbParseInfo * @return if there is one and only one limit for all subqueries, return the limit * if there is no limit, return 0 * otherwise, return null */ - private Integer checkQbpForGlobalLimit(QB localQb) { - QBParseInfo qbParseInfo = localQb.getParseInfo(); - if (localQb.getNumSelDi() == 0 && qbParseInfo.getDestToClusterBy().isEmpty() - && qbParseInfo.getDestToDistributeBy().isEmpty() - && qbParseInfo.getDestToOrderBy().isEmpty() - && qbParseInfo.getDestToSortBy().isEmpty() - && qbParseInfo.getDestToAggregationExprs().size() <= 1 - && qbParseInfo.getDestToDistinctFuncExprs().size() <= 1 - && qbParseInfo.getNameToSample().isEmpty()) { - if ((qbParseInfo.getDestToAggregationExprs().size() < 1 || - qbParseInfo.getDestToAggregationExprs().values().iterator().next().isEmpty()) && - (qbParseInfo.getDestToDistinctFuncExprs().size() < 1 || - qbParseInfo.getDestToDistinctFuncExprs().values().iterator().next().isEmpty()) - && qbParseInfo.getDestToLimit().size() <= 1) { - Integer retValue; - if (qbParseInfo.getDestToLimit().size() == 0) { - retValue = 0; - } else { - retValue = qbParseInfo.getDestToLimit().values().iterator().next(); - } - - for (String alias : localQb.getSubqAliases()) { - Integer limit = checkQbpForGlobalLimit(localQb.getSubqForAlias(alias).getQB()); - if (limit == null) { - return null; - } else if (retValue > 0 && limit > 0) { - // Any query has more than one LIMITs shown in the query is not - // qualified to this optimization - return null; - } else if (limit > 0) { - retValue = limit; - } - } - return retValue; - } + private static Integer checkQbpForGlobalLimit(TableScanOperator ts) { + // To apply this optimization, in the input query: + // - There cannot exist any order by/sort by clause, + // thus existsOrdering should be false. + // - There cannot exist any distribute by clause, thus + // existsPartitioning should be false. + // - There cannot exist any cluster by clause, thus + // existsOrdering AND existsPartitioning should be false. + if (OperatorUtils.existsOrdering(ts) || + OperatorUtils.existsPartitioning(ts)) { + return null; + } + // - There cannot exist any (distinct) aggregate. + if (OperatorUtils.existsAggregate(ts) || + OperatorUtils.existsDistinctAggregate(ts)) { + return null; + } + // - There cannot exist any sampling predicate. + if (OperatorUtils.existsSamplingFilter(ts)) { + return null; } - return null; + return OperatorUtils.findSingleLimit(ts); } }