diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
new file mode 100644
index 0000000..bce92d5
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBParseInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This optimizer is used to reduce the input size for the query for queries which are
+ * specifying a limit.
+ *
+ * For eg. for a query of type:
+ *
+ * select expr from T where limit 100;
+ *
+ * Most probably, the whole table T need not be scanned.
+ * Chances are that even if we scan the first file of T, we would get the 100 rows
+ * needed by this query.
+ * This optimizer step populates the GlobalLimitCtx which is used later on to prune the inputs.
+ */
+public class GlobalLimitOptimizer implements Transform {
+
+ private final Log LOG = LogFactory.getLog(GlobalLimitOptimizer.class.getName());
+
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ Context ctx = pctx.getContext();
+ Map> topOps = pctx.getTopOps();
+ GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx();
+ Map opToPartPruner = pctx.getOpToPartPruner();
+ Map opToPartList = pctx.getOpToPartList();
+ Map prunedPartitions = pctx.getPrunedPartitions();
+ Map nameToSplitSample = pctx.getNameToSplitSample();
+ Map topToTable = pctx.getTopToTable();
+
+ QB qb = pctx.getQB();
+ HiveConf conf = pctx.getConf();
+ QBParseInfo qbParseInfo = qb.getParseInfo();
+
+ // determine the query qualifies reduce input size for LIMIT
+ // The query only qualifies when there are only one top operator
+ // and there is no transformer or UDTF and no block sampling
+ // is used.
+ if (ctx.getTryCount() == 0 && topOps.size() == 1
+ && !globalLimitCtx.ifHasTransformOrUDTF() &&
+ nameToSplitSample.isEmpty()) {
+
+ // Here we recursively check:
+ // 1. whether there are exact one LIMIT in the query
+ // 2. whether there is no aggregation, group-by, distinct, sort by,
+ // distributed by, or table sampling in any of the sub-query.
+ // The query only qualifies if both conditions are satisfied.
+ //
+ // Example qualified queries:
+ // CREATE TABLE ... AS SELECT col1, col2 FROM tbl LIMIT ..
+ // INSERT OVERWRITE TABLE ... SELECT col1, hash(col2), split(col1)
+ // FROM ... LIMIT...
+ // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2);
+ //
+ Integer tempGlobalLimit = checkQbpForGlobalLimit(qb);
+
+ // query qualify for the optimization
+ if (tempGlobalLimit != null && tempGlobalLimit != 0) {
+ TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];
+ Table tab = topToTable.get(ts);
+
+ if (!tab.isPartitioned()) {
+ if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
+ globalLimitCtx.enableOpt(tempGlobalLimit);
+ }
+ } else {
+ // check if the pruner only contains partition columns
+ if (PartitionPruner.onlyContainsPartnCols(tab,
+ opToPartPruner.get(ts))) {
+
+ PrunedPartitionList partsList = null;
+ try {
+ partsList = opToPartList.get(ts);
+ if (partsList == null) {
+ partsList = PartitionPruner.prune(tab,
+ opToPartPruner.get(ts), conf, (String) topOps.keySet()
+ .toArray()[0], prunedPartitions);
+ opToPartList.put(ts, partsList);
+ }
+ } catch (HiveException e) {
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+
+ // If there is any unknown partition, create a map-reduce job for
+ // the filter to prune correctly
+ if ((partsList.getUnknownPartns().size() == 0)) {
+ globalLimitCtx.enableOpt(tempGlobalLimit);
+ }
+ }
+ }
+ if (globalLimitCtx.isEnable()) {
+ LOG.info("Qualify the optimize that reduces input size for 'limit' for limit "
+ + globalLimitCtx.getGlobalLimit());
+ }
+ }
+ }
+ return pctx;
+ }
+
+ /**
+ * Recursively check the limit number in all sub queries
+ *
+ * @param qbParseInfo
+ * @return if there is one and only one limit for all subqueries, return the limit
+ * if there is no limit, return 0
+ * otherwise, return null
+ */
+ private Integer checkQbpForGlobalLimit(QB localQb) {
+ QBParseInfo qbParseInfo = localQb.getParseInfo();
+ if (localQb.getNumSelDi() == 0 && qbParseInfo.getDestToClusterBy().isEmpty()
+ && qbParseInfo.getDestToDistributeBy().isEmpty()
+ && qbParseInfo.getDestToOrderBy().isEmpty()
+ && qbParseInfo.getDestToSortBy().isEmpty()
+ && qbParseInfo.getDestToAggregationExprs().size() <= 1
+ && qbParseInfo.getDestToDistinctFuncExprs().size() <= 1
+ && qbParseInfo.getNameToSample().isEmpty()) {
+ if ((qbParseInfo.getDestToAggregationExprs().size() < 1 ||
+ qbParseInfo.getDestToAggregationExprs().values().iterator().next().isEmpty()) &&
+ (qbParseInfo.getDestToDistinctFuncExprs().size() < 1 ||
+ qbParseInfo.getDestToDistinctFuncExprs().values().iterator().next().isEmpty())
+ && qbParseInfo.getDestToLimit().size() <= 1) {
+ Integer retValue;
+ if (qbParseInfo.getDestToLimit().size() == 0) {
+ retValue = 0;
+ } else {
+ retValue = qbParseInfo.getDestToLimit().values().iterator().next();
+ }
+
+ for (String alias : localQb.getSubqAliases()) {
+ Integer limit = checkQbpForGlobalLimit(localQb.getSubqForAlias(alias).getQB());
+ if (limit == null) {
+ return null;
+ } else if (retValue > 0 && limit > 0) {
+ // Any query has more than one LIMITs shown in the query is not
+ // qualified to this optimization
+ return null;
+ } else if (limit > 0) {
+ retValue = limit;
+ }
+ }
+ return retValue;
+ }
+ }
+ return null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 9fd1869..94a5037 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -74,6 +74,9 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
transformations.add(new ReduceSinkDeDuplication());
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) {
+ transformations.add(new GlobalLimitOptimizer());
+ }
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java
new file mode 100644
index 0000000..9e67a85
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+
+/**
+ *
+ */
+public class GlobalLimitCtx {
+
+ private boolean enable = false;
+ private int globalLimit = -1;
+ private boolean hasTransformOrUDTF = false;
+ private LimitDesc lastReduceLimitDesc = null;
+
+ public int getGlobalLimit() {
+ return globalLimit;
+ }
+
+ public boolean ifHasTransformOrUDTF() {
+ return hasTransformOrUDTF;
+ }
+
+ public void setHasTransformOrUDTF(boolean hasTransformOrUDTF) {
+ this.hasTransformOrUDTF = hasTransformOrUDTF;
+ }
+
+ public LimitDesc getLastReduceLimitDesc() {
+ return lastReduceLimitDesc;
+ }
+
+ public void setLastReduceLimitDesc(LimitDesc lastReduceLimitDesc) {
+ this.lastReduceLimitDesc = lastReduceLimitDesc;
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void enableOpt(int globalLimit) {
+ this.enable = true;
+ this.globalLimit = globalLimit;
+ }
+
+ public void disableOpt() {
+ this.enable = false;
+ this.globalLimit = -1;
+ this.lastReduceLimitDesc = null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 8157bcd..ea5e3de 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -95,7 +95,7 @@ public class ParseContext {
// a map-reduce job
private boolean hasNonPartCols;
- private SemanticAnalyzer.GlobalLimitCtx globalLimitCtx;
+ private GlobalLimitCtx globalLimitCtx;
private HashSet semanticInputs;
private List> rootTasks;
@@ -159,7 +159,7 @@ public class ParseContext {
Map> groupOpToInputTables,
Map prunedPartitions,
HashMap opToSamplePruner,
- SemanticAnalyzer.GlobalLimitCtx globalLimitCtx,
+ GlobalLimitCtx globalLimitCtx,
HashMap nameToSplitSample,
HashSet semanticInputs, List> rootTasks) {
this.conf = conf;
@@ -516,11 +516,11 @@ public class ParseContext {
this.mapJoinContext = mapJoinContext;
}
- public SemanticAnalyzer.GlobalLimitCtx getGlobalLimitCtx() {
+ public GlobalLimitCtx getGlobalLimitCtx() {
return globalLimitCtx;
}
- public void setGlobalLimitCtx(SemanticAnalyzer.GlobalLimitCtx globalLimitCtx) {
+ public void setGlobalLimitCtx(GlobalLimitCtx globalLimitCtx) {
this.globalLimitCtx = globalLimitCtx;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 7a87e80..4d32d46 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -216,49 +216,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
//Max characters when auto generating the column name with func name
private static final int AUTOGEN_COLALIAS_PRFX_MAXLENGTH = 20;
- public static class GlobalLimitCtx {
- private boolean enable = false;
- private int globalLimit = -1;
- private boolean hasTransformOrUDTF = false;
- private LimitDesc lastReduceLimitDesc = null;
-
- public int getGlobalLimit() {
- return globalLimit;
- }
-
- public boolean ifHasTransformOrUDTF() {
- return hasTransformOrUDTF;
- }
-
- public void setHasTransformOrUDTF(boolean hasTransformOrUDTF) {
- this.hasTransformOrUDTF = hasTransformOrUDTF;
- }
-
- public LimitDesc getLastReduceLimitDesc() {
- return lastReduceLimitDesc;
- }
-
- public void setLastReduceLimitDesc(LimitDesc lastReduceLimitDesc) {
- this.lastReduceLimitDesc = lastReduceLimitDesc;
- }
-
-
- public boolean isEnable() {
- return enable;
- }
-
- public void enableOpt(int globalLimit) {
- this.enable = true;
- this.globalLimit = globalLimit;
- }
-
- public void disableOpt() {
- this.enable = false;
- this.globalLimit = -1;
- this.lastReduceLimitDesc = null;
- }
- }
-
private static class Phase1Ctx {
String dest;
int nextNum;
@@ -6939,53 +6896,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
-
- /**
- * Recursively check the limit number in all sub queries
- * @param qbParseInfo
- * @return if there is one and only one limit for all subqueries, return the limit
- * if there is no limit, return 0
- * otherwise, return null
- */
- private Integer checkQbpForGlobalLimit(QB localQb) {
- QBParseInfo qbParseInfo = localQb.getParseInfo();
- if (localQb.getNumSelDi() == 0 && qbParseInfo.getDestToClusterBy().isEmpty()
- && qbParseInfo.getDestToDistributeBy().isEmpty()
- && qbParseInfo.getDestToOrderBy().isEmpty()
- && qbParseInfo.getDestToSortBy().isEmpty()
- && qbParseInfo.getDestToAggregationExprs().size() <= 1
- && qbParseInfo.getDestToDistinctFuncExprs().size() <= 1
- && qbParseInfo.getNameToSample().isEmpty()) {
- if ((qbParseInfo.getDestToAggregationExprs().size() < 1 ||
- qbParseInfo.getDestToAggregationExprs().values().iterator().next().isEmpty()) &&
- (qbParseInfo.getDestToDistinctFuncExprs().size() < 1 ||
- qbParseInfo.getDestToDistinctFuncExprs().values().iterator().next().isEmpty())
- && qbParseInfo.getDestToLimit().size() <= 1) {
- Integer retValue;
- if (qbParseInfo.getDestToLimit().size() == 0) {
- retValue = 0;
- } else {
- retValue = qbParseInfo.getDestToLimit().values().iterator().next().intValue();
- }
-
- for (String alias : localQb.getSubqAliases()) {
- Integer limit = checkQbpForGlobalLimit(localQb.getSubqForAlias(alias).getQB());
- if (limit == null) {
- return null;
- } else if (retValue > 0 && limit > 0) {
- // Any query has more than one LIMITs shown in the query is not
- // qualified to this optimization
- return null;
- } else if (limit > 0) {
- retValue = limit;
- }
- }
- return retValue;
- }
- }
- return null;
- }
-
@SuppressWarnings("nls")
private void genMapRedTasks(QB qb) throws SemanticException {
FetchWork fetch = null;
@@ -7076,73 +6986,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- // determine the query qualifies reduce input size for LIMIT
- // The query only qualifies when there are only one top operator
- // and there is no transformer or UDTF and no block sampling
- // is used.
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVELIMITOPTENABLE)
- && ctx.getTryCount() == 0 && topOps.size() == 1
- && !globalLimitCtx.ifHasTransformOrUDTF() &&
- nameToSplitSample.isEmpty()) {
-
- // Here we recursively check:
- // 1. whether there are exact one LIMIT in the query
- // 2. whether there is no aggregation, group-by, distinct, sort by,
- // distributed by, or table sampling in any of the sub-query.
- // The query only qualifies if both conditions are satisfied.
- //
- // Example qualified queries:
- // CREATE TABLE ... AS SELECT col1, col2 FROM tbl LIMIT ..
- // INSERT OVERWRITE TABLE ... SELECT col1, hash(col2), split(col1)
- // FROM ... LIMIT...
- // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2);
- //
- Integer tempGlobalLimit = checkQbpForGlobalLimit(qb);
-
- // query qualify for the optimization
- if (tempGlobalLimit != null && tempGlobalLimit != 0) {
- TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0];
- Table tab = topToTable.get(ts);
-
- if (!tab.isPartitioned()) {
- if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
- globalLimitCtx.enableOpt(tempGlobalLimit);
- }
- } else {
- // check if the pruner only contains partition columns
- if (PartitionPruner.onlyContainsPartnCols(tab,
- opToPartPruner.get(ts))) {
-
- PrunedPartitionList partsList = null;
- try {
- partsList = opToPartList.get(ts);
- if (partsList == null) {
- partsList = PartitionPruner.prune(tab,
- opToPartPruner.get(ts), conf, (String) topOps.keySet()
- .toArray()[0], prunedPartitions);
- opToPartList.put(ts, partsList);
- }
- } catch (HiveException e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
- }
-
- // If there is any unknown partition, create a map-reduce job for
- // the filter to prune correctly
- if ((partsList.getUnknownPartns().size() == 0)) {
- globalLimitCtx.enableOpt(tempGlobalLimit);
- }
- }
- }
- if (globalLimitCtx.isEnable()) {
- LOG.info("Qualify the optimize that reduces input size for 'limit' for limit "
- + globalLimitCtx.getGlobalLimit());
- }
- }
- }
-
// In case of a select, use a fetch task instead of a move task
if (qb.getIsQuery()) {
if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) {