diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 4d9d1da..49d64ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -30,6 +31,9 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.mapred.OutputCollector; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; + public class OperatorUtils { private static final Log LOG = LogFactory.getLog(OperatorUtils.class); @@ -145,6 +149,70 @@ private static void iterateParents(Operator operator, Function> f } public static boolean sameRowSchema(Operator operator1, Operator operator2) { - return operator1.getSchema().equals(operator2.getSchema()); + return operator1.getSchema().equals(operator2.getSchema()); + } + + /** + * Given an operator and a set of classes, it classifies the operators it finds + * in the stream depending on the classes it instantiates. + * + * @param start the start operator + * @param classes the set of classes + * @return a multimap from each of the classes to the operators that instantiate + * them + */ + public static Multimap>, Operator> classifyOperators( + Operator start, Set>> classes) { + ImmutableMultimap.Builder>, Operator> resultMap = + new ImmutableMultimap.Builder>, Operator>(); + List> ops = new ArrayList>(); + ops.add(start); + while (!ops.isEmpty()) { + List> allChildren = new ArrayList>(); + for (Operator op: ops) { + for (Class> clazz: classes) { + if (clazz.isInstance(op)) { + resultMap.put(clazz, op); + } + } + if (op.getChildOperators() != null) { + allChildren.addAll(op.getChildOperators()); + } + } + ops = allChildren; + } + return resultMap.build(); + } + + /** + * Given an operator and a set of classes, it classifies the operators it finds + * upstream depending on the classes it instantiates. + * + * @param start the start operator + * @param classes the set of classes + * @return a multimap from each of the classes to the operators that instantiate + * them + */ + public static Multimap>, Operator> classifyOperatorsUpstream( + Operator start, Set>> classes) { + ImmutableMultimap.Builder>, Operator> resultMap = + new ImmutableMultimap.Builder>, Operator>(); + List> ops = new ArrayList>(); + ops.add(start); + while (!ops.isEmpty()) { + List> allParent = new ArrayList>(); + for (Operator op: ops) { + for (Class> clazz: classes) { + if (clazz.isInstance(op)) { + resultMap.put(clazz, op); + } + } + if (op.getParentOperators() != null) { + allParent.addAll(op.getParentOperators()); + } + } + ops = allParent; + } + return resultMap.build(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java index c9848da..c0a2592 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java @@ -18,13 +18,19 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.Collection; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; @@ -32,12 +38,16 @@ import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.QB; -import org.apache.hadoop.hive.ql.parse.QBParseInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; /** * This optimizer is used to reduce the input size for the query for queries which are @@ -63,9 +73,6 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { Map opToPartPruner = pctx.getOpToPartPruner(); Map nameToSplitSample = pctx.getNameToSplitSample(); - QB qb = pctx.getQB(); - QBParseInfo qbParseInfo = qb.getParseInfo(); - // determine the query qualifies reduce input size for LIMIT // The query only qualifies when there are only one top operator // and there is no transformer or UDTF and no block sampling @@ -86,15 +93,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // FROM ... LIMIT... // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2); // - Integer tempGlobalLimit = checkQbpForGlobalLimit(qb); + TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; + Integer tempGlobalLimit = checkQbpForGlobalLimit(ts); // query qualify for the optimization if (tempGlobalLimit != null && tempGlobalLimit != 0) { - TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; Table tab = ts.getConf().getTableMetadata(); if (!tab.isPartitioned()) { - if (qbParseInfo.getDestToWhereExpr().isEmpty()) { + Set filterOps = + OperatorUtils.findOperators(ts, FilterOperator.class); + if (filterOps.size() == 0) { globalLimitCtx.enableOpt(tempGlobalLimit); } } else { @@ -130,49 +139,59 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } /** - * Recursively check the limit number in all sub queries + * Check the limit number in all sub queries * - * @param qbParseInfo * @return if there is one and only one limit for all subqueries, return the limit * if there is no limit, return 0 * otherwise, return null */ - private Integer checkQbpForGlobalLimit(QB localQb) { - QBParseInfo qbParseInfo = localQb.getParseInfo(); - if (localQb.getNumSelDi() == 0 && qbParseInfo.getDestToClusterBy().isEmpty() - && qbParseInfo.getDestToDistributeBy().isEmpty() - && qbParseInfo.getDestToOrderBy().isEmpty() - && qbParseInfo.getDestToSortBy().isEmpty() - && qbParseInfo.getDestToAggregationExprs().size() <= 1 - && qbParseInfo.getDestToDistinctFuncExprs().size() <= 1 - && qbParseInfo.getNameToSample().isEmpty()) { - if ((qbParseInfo.getDestToAggregationExprs().size() < 1 || - qbParseInfo.getDestToAggregationExprs().values().iterator().next().isEmpty()) && - (qbParseInfo.getDestToDistinctFuncExprs().size() < 1 || - qbParseInfo.getDestToDistinctFuncExprs().values().iterator().next().isEmpty()) - && qbParseInfo.getDestToLimit().size() <= 1) { - Integer retValue; - if (qbParseInfo.getDestToLimit().size() == 0) { - retValue = 0; - } else { - retValue = qbParseInfo.getDestToLimit().values().iterator().next(); - } - - for (String alias : localQb.getSubqAliases()) { - Integer limit = checkQbpForGlobalLimit(localQb.getSubqForAlias(alias).getQB()); - if (limit == null) { - return null; - } else if (retValue > 0 && limit > 0) { - // Any query has more than one LIMITs shown in the query is not - // qualified to this optimization - return null; - } else if (limit > 0) { - retValue = limit; - } - } - return retValue; + private static Integer checkQbpForGlobalLimit(TableScanOperator ts) { + Set>> searchedClasses = + new ImmutableSet.Builder>>() + .add(ReduceSinkOperator.class) + .add(GroupByOperator.class) + .add(FilterOperator.class) + .add(LimitOperator.class) + .build(); + Multimap>, Operator> ops = + OperatorUtils.classifyOperators(ts, searchedClasses); + // To apply this optimization, in the input query: + // - There cannot exist any order by/sort by clause, + // thus existsOrdering should be false. + // - There cannot exist any distribute by clause, thus + // existsPartitioning should be false. + // - There cannot exist any cluster by clause, thus + // existsOrdering AND existsPartitioning should be false. + for (Operator op : ops.get(ReduceSinkOperator.class)) { + ReduceSinkDesc reduceSinkConf = ((ReduceSinkOperator) op).getConf(); + if (reduceSinkConf.isOrdering() || reduceSinkConf.isPartitioning()) { + return null; } } + // - There cannot exist any (distinct) aggregate. + for (Operator op : ops.get(GroupByOperator.class)) { + GroupByDesc groupByConf = ((GroupByOperator) op).getConf(); + if (groupByConf.isAggregate() || groupByConf.isDistinct()) { + return null; + } + } + // - There cannot exist any sampling predicate. + for (Operator op : ops.get(FilterOperator.class)) { + FilterDesc filterConf = ((FilterOperator) op).getConf(); + if (filterConf.getIsSamplingPred()) { + return null; + } + } + // If there is one and only one limit starting at op, return the limit + // If there is no limit, return 0 + // Otherwise, return null + Collection> limitOps = ops.get(LimitOperator.class); + if (limitOps.size() == 1) { + return ((LimitOperator) limitOps.iterator().next()).getConf().getLimit(); + } + else if (limitOps.size() == 0) { + return 0; + } return null; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index f031b28..8804258 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hive.common.util.AnnotationUtils; @@ -222,6 +223,13 @@ public void setAggregators( this.aggregators = aggregators; } + public boolean isAggregate() { + if (this.aggregators != null && !this.aggregators.isEmpty()) { + return true; + } + return false; + } + public boolean getGroupKeyNotReductionKey() { return groupKeyNotReductionKey; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 43f8321..28cb3ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -238,6 +238,13 @@ public void setPartitionCols( this.partitionCols = partitionCols; } + public boolean isPartitioning() { + if (partitionCols != null && !partitionCols.isEmpty()) { + return true; + } + return false; + } + @Explain(displayName = "tag", normalExplain = false) public int getTag() { return tag; @@ -338,6 +345,13 @@ public void setOrder(String orderStr) { orderStr); } + public boolean isOrdering() { + if (this.getOrder() != null && !this.getOrder().isEmpty()) { + return true; + } + return false; + } + public List> getDistinctColumnIndices() { return distinctColumnIndices; }