diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java index 650792b..247641d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java @@ -39,7 +39,6 @@ boolean noScanAnalyzeCommand; boolean analyzeRewrite; boolean ctas; - int outerQueryLimit; boolean hasJoin = false; boolean hasGroupBy = false; @@ -116,14 +115,6 @@ public void setCTAS(boolean ctas) { this.ctas = ctas; } - public int getOuterQueryLimit() { - return outerQueryLimit; - } - - public void setOuterQueryLimit(int outerQueryLimit) { - this.outerQueryLimit = outerQueryLimit; - } - public boolean hasJoin() { return (noOfJoins > 0); } @@ -282,7 +273,6 @@ public void clear() { noScanAnalyzeCommand = false; analyzeRewrite = false; ctas = false; - outerQueryLimit = -1; hasJoin = false; hasGroupBy = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..96471e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -533,10 +533,10 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set retPathList = new ArrayList(); @@ -618,7 +618,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set * For eg. for a query of type: *

@@ -66,6 +73,49 @@ @Override public ParseContext transform(ParseContext pctx) throws SemanticException { + // 1) Set last limit and offset if operator is present + setGlobalQueryLimit(pctx); + + // 2) If we have enabled input pruning optimization, we verify whether it is applicable + if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVELIMITOPTENABLE)) { + pruneInputsOptimization(pctx); + } + return pctx; + } + + private void setGlobalQueryLimit(ParseContext pctx) throws SemanticException { + Map topOps = pctx.getTopOps(); + Set found = OperatorUtils.findOperators( + Lists.>newArrayList(topOps.values()), FileSinkOperator.class); + if (found.size() != 1) { + // We bail out + return; + } + // We traverse down the plan checking whether we find a limit that validates + // for global limit + Set>> searchedClasses = + new ImmutableSet.Builder>>() + .add(SelectOperator.class) + .add(FilterOperator.class) + .add(LimitOperator.class) + .build(); + Operator child = found.iterator().next().getParentOperators().get(0); + while (searchedClasses.contains(child.getClass())) { + if (child instanceof LimitOperator) { + break; + } + child = child.getParentOperators().get(0); + } + if (child instanceof LimitOperator) { + // We set the offset/limit + LimitOperator globalLimit = (LimitOperator) child; + Integer tempOffset = globalLimit.getConf().getOffset(); + pctx.getGlobalLimitCtx().setGlobalLimitOffset(globalLimit.getConf().getLimit(), + (tempOffset == null) ? 0 : tempOffset); + } + } + + private void pruneInputsOptimization(ParseContext pctx) throws SemanticException { Context ctx = pctx.getContext(); Map topOps = pctx.getTopOps(); GlobalLimitCtx globalLimitCtx = pctx.getGlobalLimitCtx(); @@ -76,7 +126,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // and there is no transformer or UDTF and no block sampling // is used. if (ctx.getTryCount() == 0 && topOps.size() == 1 - && !globalLimitCtx.ifHasTransformOrUDTF() && + && !globalLimitCtx.isHasTransformOrUDTF() && nameToSplitSample.isEmpty()) { // Here we recursively check: @@ -103,8 +153,9 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { if (!tab.isPartitioned()) { if (filterOps.size() == 0) { Integer tempOffset = tempGlobalLimitDesc.getOffset(); - globalLimitCtx.enableOpt(tempGlobalLimitDesc.getLimit(), + globalLimitCtx.enableInputsPruning(tempGlobalLimitDesc.getLimit(), (tempOffset == null) ? 0 : tempOffset); + globalLimitCtx.setLastReduceLimitDesc(tempGlobalLimitDesc); } } else { // check if the pruner only contains partition columns @@ -112,17 +163,18 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { String alias = (String) topOps.keySet().toArray()[0]; PrunedPartitionList partsList = pctx.getPrunedPartitions(alias, ts); - + // If there is any unknown partition, create a map-reduce job for // the filter to prune correctly if (!partsList.hasUnknownPartitions()) { Integer tempOffset = tempGlobalLimitDesc.getOffset(); - globalLimitCtx.enableOpt(tempGlobalLimitDesc.getLimit(), + globalLimitCtx.enableInputsPruning(tempGlobalLimitDesc.getLimit(), (tempOffset == null) ? 0 : tempOffset); + globalLimitCtx.setLastReduceLimitDesc(tempGlobalLimitDesc); } } } - if (globalLimitCtx.isEnable()) { + if (globalLimitCtx.isInputsPruningEnabled()) { LOG.info("Qualify the optimize that reduces input size for 'offset' for offset " + globalLimitCtx.getGlobalOffset()); LOG.info("Qualify the optimize that reduces input size for 'limit' for limit " @@ -130,7 +182,6 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } } - return pctx; } private boolean onlyContainsPartnCols(Table table, Set filters) { @@ -146,7 +197,6 @@ private boolean onlyContainsPartnCols(Table table, Set filters) * Check the limit number in all sub queries * * @return if there is one and only one limit for all subqueries, return the limit - * if there is no limit, return 0 * otherwise, return null */ private static LimitOperator checkQbpForGlobalLimit(TableScanOperator ts) { @@ -187,7 +237,6 @@ private static LimitOperator checkQbpForGlobalLimit(TableScanOperator ts) { } } // If there is one and only one limit starting at op, return the limit - // If there is no limit, return 0 // Otherwise, return null Collection> limitOps = ops.get(LimitOperator.class); if (limitOps.size() == 1) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index eaf0abc..84c0fa6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -201,9 +201,7 @@ public void initialize(HiveConf hiveConf) { && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { transformations.add(new IdentityProjectRemover()); } - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) { - transformations.add(new GlobalLimitOptimizer()); - } + transformations.add(new GlobalLimitOptimizer()); if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) && !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) && diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index eb0ba7b..043e684 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,7 +44,6 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; @@ -82,6 +79,8 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tries to convert simple fetch query to single fetch task, which fetches rows directly @@ -126,7 +125,11 @@ private FetchTask optimize(ParseContext pctx, String alias, TableScanOperator so pctx.getConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION); boolean aggressive = "more".equals(mode); - final int limit = pctx.getQueryProperties().getOuterQueryLimit(); + final int limit = pctx.getGlobalLimitCtx().getGlobalLimit(); + if (pctx.getGlobalLimitCtx().getGlobalOffset() == 0) { + // Bail out: SimpleFetchOptimizer does not currently work with offset + return null; + } FetchData fetch = checkTree(aggressive, pctx, alias, source); if (fetch != null && checkThreshold(fetch, limit, pctx)) { FetchWork fetchWork = fetch.convertToWork(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 0ead9be..fd17917 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -149,18 +148,9 @@ private OpAttr clone(Operator... inputs) { } } - private void handleTopLimit(Operator rootOp) { - if (rootOp instanceof LimitOperator) { - // this can happen only on top most limit, not while visiting Limit Operator - // since that can be within subquery. - this.semanticAnalyzer.getQB().getParseInfo().setOuterQueryLimit(((LimitOperator) rootOp).getConf().getLimit()); - } - } - public Operator convert(RelNode root) throws SemanticException { OpAttr opAf = dispatch(root); Operator rootOp = opAf.inputs.get(0); - handleTopLimit(rootOp); return rootOp; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java index c37f9ce..ab18425 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java @@ -25,9 +25,9 @@ */ public class GlobalLimitCtx { - private boolean enable; private int globalLimit; private int globalOffset; + private boolean inputsPruningEnabled; private boolean hasTransformOrUDTF; private LimitDesc lastReduceLimitDesc; @@ -43,7 +43,7 @@ public int getGlobalOffset() { return globalOffset; } - public boolean ifHasTransformOrUDTF() { + public boolean isHasTransformOrUDTF() { return hasTransformOrUDTF; } @@ -59,25 +59,27 @@ public void setLastReduceLimitDesc(LimitDesc lastReduceLimitDesc) { this.lastReduceLimitDesc = lastReduceLimitDesc; } - public boolean isEnable() { - return enable; + public boolean isInputsPruningEnabled() { + return inputsPruningEnabled; } - public void enableOpt(int globalLimit, int globalOffset) { - this.enable = true; + public void enableInputsPruning(int globalLimit, int globalOffset) { + this.inputsPruningEnabled = true; + this.setGlobalLimitOffset(globalLimit, globalOffset); + } + + public void setGlobalLimitOffset(int globalLimit, int globalOffset) { this.globalLimit = globalLimit; this.globalOffset = globalOffset; } - public void disableOpt() { - this.enable = false; - this.globalLimit = -1; - this.globalOffset = 0; + public void disableInputsPruning() { + this.inputsPruningEnabled = false; this.lastReduceLimitDesc = null; } public void reset() { - enable = false; + inputsPruningEnabled = false; globalLimit = -1; globalOffset = 0; hasTransformOrUDTF = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index d7a56e5..cbc997b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -208,7 +208,7 @@ public boolean accept(Path file) { long estimatedInput; - if (globalLimitCtx != null && globalLimitCtx.isEnable()) { + if (globalLimitCtx != null && globalLimitCtx.isInputsPruningEnabled()) { // If the global limit optimization is triggered, we will // estimate input data actually needed based on limit rows. // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2) diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index 3a0402e..ab1df67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -106,7 +106,6 @@ // KEY of SimpleEntry: offset // VALUE of SimpleEntry: rowcount private final HashMap> destToLimit; - private int outerQueryLimit; // used by GroupBy private final LinkedHashMap> destToAggregationExprs; @@ -147,7 +146,6 @@ public QBParseInfo(String alias, boolean isSubQ) { this.alias = alias; this.isSubQ = isSubQ; - outerQueryLimit = -1; aliasToLateralViews = new HashMap>(); @@ -459,21 +457,6 @@ public Integer getDestLimitOffset(String dest) { return destToLimit.get(dest) == null ? 0 : destToLimit.get(dest).getKey(); } - /** - * @return the outerQueryLimit - */ - public int getOuterQueryLimit() { - return outerQueryLimit; - } - - /** - * @param outerQueryLimit - * the outerQueryLimit to set - */ - public void setOuterQueryLimit(int outerQueryLimit) { - this.outerQueryLimit = outerQueryLimit; - } - public boolean isTopLevelSimpleSelectStarQuery() { if (alias != null || destToSelExpr.size() != 1 || !isSimpleSelectQuery()) { return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9d58193..4f51e8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -430,7 +430,7 @@ protected void reset(boolean clearPartsCache) { viewsExpanded = null; viewSelect = null; ctesExpanded = null; - globalLimitCtx.disableOpt(); + globalLimitCtx.disableInputsPruning(); viewAliasToInput.clear(); reduceSinkOperatorsAddedByEnforceBucketingSorting.clear(); topToTableProps.clear(); @@ -7244,8 +7244,6 @@ private Operator genLimitPlan(String dest, QB qb, Operator input, int offset, in RowResolver inputRR = opParseCtx.get(input).getRowResolver(); LimitDesc limitDesc = new LimitDesc(offset, limit); - globalLimitCtx.setLastReduceLimitDesc(limitDesc); - Operator limitMap = putOpInsertMap(OperatorFactory.getAndMakeChild( limitDesc, new RowSchema(inputRR.getColumnInfos()), input), inputRR); @@ -9476,7 +9474,6 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, curr = genLimitMapRedPlan(dest, qb, curr, offset.intValue(), limit.intValue(), extraMRStep); - qb.getParseInfo().setOuterQueryLimit(limit.intValue()); } if (!queryState.getHiveOperation().equals(HiveOperation.CREATEVIEW)) { curr = genFileSinkPlan(dest, qb, curr); @@ -13116,7 +13113,6 @@ private void copyInfoToQueryProperties(QueryProperties queryProperties) { queryProperties.setCTAS(qb.getTableDesc() != null); queryProperties.setHasOuterOrderBy(!qb.getParseInfo().getIsSubQ() && !qb.getParseInfo().getDestToOrderBy().isEmpty()); - queryProperties.setOuterQueryLimit(qb.getParseInfo().getOuterQueryLimit()); queryProperties.setMaterializedView(qb.getViewDesc() != null); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 97cf585..c1f82df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,18 +20,12 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.Set; -import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +36,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -54,7 +47,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -76,6 +68,8 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Interner; import com.google.common.collect.Interners; @@ -113,7 +107,7 @@ public void compile(final ParseContext pCtx, final List loadFileWork = pCtx.getLoadFileWork(); boolean isCStats = pCtx.getQueryProperties().isAnalyzeRewrite(); - int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit(); + int outerQueryLimit = globalLimitCtx.getGlobalLimit(); if (pCtx.getFetchTask() != null) { if (pCtx.getFetchTask().getTblDesc() == null) { @@ -191,11 +185,10 @@ public void compile(final ParseContext pCtx, final List fetchLimit) { + if (globalLimitCtx.isInputsPruningEnabled() && globalLimitCtx.getGlobalLimit() > fetchLimit) { LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit + ". Doesn't qualify limit optimiztion."); - globalLimitCtx.disableOpt(); - + globalLimitCtx.disableInputsPruning(); } if (outerQueryLimit == 0) { // Believe it or not, some tools do generate queries with limit 0 and than expect @@ -330,14 +323,16 @@ public void compile(final ParseContext pCtx, final List mrTasks = Utilities.getMRTasks(rootTasks); for (ExecDriver tsk : mrTasks) { tsk.setRetryCmdWhenFail(true);