diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java index 86519a6..77e2be5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java @@ -34,6 +34,7 @@ private static final long serialVersionUID = 1L; protected transient int limit; + protected transient int offset; protected transient int leastRow; protected transient int currCount; protected transient boolean isMap; @@ -43,6 +44,7 @@ Collection> result = super.initializeOp(hconf); limit = conf.getLimit(); leastRow = conf.getLeastRows(); + offset = conf.getOffset(); currCount = 0; isMap = hconf.getBoolean("mapred.task.is.map", true); return result; @@ -50,9 +52,11 @@ @Override public void process(Object row, int tag) throws HiveException { - if (currCount < limit) { + if (offset <=currCount && currCount < (offset+limit)) { forward(row, inputObjInspectors[tag]); currCount++; + } else if (offset>currCount) { + currCount++; } else { setDone(true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java index 41bb84c..1d6d1a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GlobalLimitOptimizer.java @@ -93,16 +93,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // SELECT * FROM (SELECT col1 as col2 (SELECT * FROM ...) t1 LIMIT ...) t2); // TableScanOperator ts = (TableScanOperator) topOps.values().toArray()[0]; - Integer tempGlobalLimit = checkQbpForGlobalLimit(ts); + LimitOperator tempGlobalLimit = checkQbpForGlobalLimit(ts); // query qualify for the optimization - if (tempGlobalLimit != null && tempGlobalLimit != 0) { + if (tempGlobalLimit != null) { Table tab = ts.getConf().getTableMetadata(); Set filterOps = OperatorUtils.findOperators(ts, FilterOperator.class); if (!tab.isPartitioned()) { if (filterOps.size() == 0) { - globalLimitCtx.enableOpt(tempGlobalLimit); + globalLimitCtx.enableOpt(tempGlobalLimit.getConf().getLimit(), + tempGlobalLimit.getConf().getOffset()); } } else { // check if the pruner only contains partition columns @@ -114,7 +115,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // If there is any unknown partition, create a map-reduce job for // the filter to prune correctly if (!partsList.hasUnknownPartitions()) { - globalLimitCtx.enableOpt(tempGlobalLimit); + globalLimitCtx.enableOpt(tempGlobalLimit.getConf().getLimit(), + tempGlobalLimit.getConf().getOffset()); } } } @@ -143,7 +145,7 @@ private boolean onlyContainsPartnCols(Table table, Set filters) * if there is no limit, return 0 * otherwise, return null */ - private static Integer checkQbpForGlobalLimit(TableScanOperator ts) { + private static LimitOperator checkQbpForGlobalLimit(TableScanOperator ts) { Set>> searchedClasses = new ImmutableSet.Builder>>() .add(ReduceSinkOperator.class) @@ -185,10 +187,10 @@ private static Integer checkQbpForGlobalLimit(TableScanOperator ts) { // Otherwise, return null Collection> limitOps = ops.get(LimitOperator.class); if (limitOps.size() == 1) { - return ((LimitOperator) limitOps.iterator().next()).getConf().getLimit(); + return (LimitOperator) limitOps.iterator().next(); } else if (limitOps.size() == 0) { - return 0; + return null; } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 9c731b8..e044434 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2294,12 +2294,14 @@ private RelNode genLimitLogicalPlan(QB qb, RelNode srcRel) throws SemanticExcept HiveRelNode sortRel = null; QBParseInfo qbp = getQBParseInfo(qb); Integer limit = qbp.getDestToLimit().get(qbp.getClauseNames().iterator().next()); + Integer offset = qbp.getDestToLimitOffset().get(qbp.getClauseNames().iterator().next()); if (limit != null) { + RexNode offsetRN = cluster.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(offset)); RexNode fetch = cluster.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit)); RelTraitSet traitSet = cluster.traitSetOf(HiveRelNode.CONVENTION); RelCollation canonizedCollation = traitSet.canonize(RelCollations.EMPTY); - sortRel = new HiveSortLimit(cluster, traitSet, srcRel, canonizedCollation, null, fetch); + sortRel = new HiveSortLimit(cluster, traitSet, srcRel, canonizedCollation, offsetRN, fetch); RowResolver outputRR = new RowResolver(); if (!RowResolver.add(outputRR, relToHiveRR.get(srcRel))) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java index 6cd636c..4433f4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GlobalLimitCtx.java @@ -27,6 +27,7 @@ private boolean enable; private int globalLimit; + private int globalOffset; private boolean hasTransformOrUDTF; private LimitDesc lastReduceLimitDesc; @@ -38,6 +39,8 @@ public int getGlobalLimit() { return globalLimit; } + public int getGlobalOffset() { return globalOffset;} + public boolean ifHasTransformOrUDTF() { return hasTransformOrUDTF; } @@ -58,20 +61,23 @@ public boolean isEnable() { return enable; } - public void enableOpt(int globalLimit) { + public void enableOpt(int globalLimit, int globalOffset) { this.enable = true; this.globalLimit = globalLimit; + this.globalOffset = globalOffset; } public void disableOpt() { this.enable = false; this.globalLimit = -1; + this.globalOffset = 0; this.lastReduceLimitDesc = null; } public void reset() { enable = false; globalLimit = -1; + globalOffset = 0; hasTransformOrUDTF = false; lastReduceLimitDesc = null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 3df67e9..a76dab8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -2348,7 +2348,7 @@ limitClause @init { pushMsg("limit clause", state); } @after { popMsg(state); } : - KW_LIMIT num=Number -> ^(TOK_LIMIT $num) + KW_LIMIT ((offset=Number COMMA)? num=Number) -> ^(TOK_LIMIT ($offset)? $num) ; //DELETE FROM WHERE ...; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index 14a7e9c..db28d17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -101,6 +101,7 @@ /* Order by clause */ private final HashMap destToOrderby; private final HashMap destToLimit; + private final HashMap destToLimitOffset; private int outerQueryLimit; // used by GroupBy @@ -130,6 +131,7 @@ public QBParseInfo(String alias, boolean isSubQ) { destToSortby = new HashMap(); destToOrderby = new HashMap(); destToLimit = new HashMap(); + destToLimitOffset = new HashMap(); insertIntoTables = new HashSet(); destRollups = new HashSet(); destCubes = new HashSet(); @@ -449,6 +451,14 @@ public Integer getDestLimit(String dest) { return destToLimit.get(dest); } + public void setDestLimitOffset(String dest, Integer offset) { + destToLimitOffset.put(dest, offset); + } + + public Integer getDestLimitOffset(String dest) { + return destToLimitOffset.get(dest); + } + /** * @return the outerQueryLimit */ @@ -579,6 +589,10 @@ public TableSpec getTableSpec() { return destToLimit; } + public HashMap getDestToLimitOffset() { + return destToLimitOffset; + } + public LinkedHashMap> getDestToAggregationExprs() { return destToAggregationExprs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c5f39d3..d424c16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1315,7 +1315,13 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1, PlannerContext plan break; case HiveParser.TOK_LIMIT: - qbp.setDestLimit(ctx_1.dest, new Integer(ast.getChild(0).getText())); + if (ast.getChildCount()==2) { + qbp.setDestLimitOffset(ctx_1.dest, new Integer(ast.getChild(0).getText())); + qbp.setDestLimit(ctx_1.dest, new Integer(ast.getChild(1).getText())); + }else{ + qbp.setDestLimitOffset(ctx_1.dest, new Integer(0)); + qbp.setDestLimit(ctx_1.dest, new Integer(ast.getChild(0).getText())); + } break; case HiveParser.TOK_ANALYZE: @@ -6858,7 +6864,7 @@ Operator genConversionSelectOperator(String dest, QB qb, Operator input, } @SuppressWarnings("nls") - private Operator genLimitPlan(String dest, QB qb, Operator input, int limit) + private Operator genLimitPlan(String dest, QB qb, Operator input, int offset, int limit) throws SemanticException { // A map-only job can be optimized - instead of converting it to a // map-reduce job, we can have another map @@ -6869,7 +6875,7 @@ private Operator genLimitPlan(String dest, QB qb, Operator input, int limit) RowResolver inputRR = opParseCtx.get(input).getRowResolver(); - LimitDesc limitDesc = new LimitDesc(limit); + LimitDesc limitDesc = new LimitDesc(offset, limit); globalLimitCtx.setLastReduceLimitDesc(limitDesc); Operator limitMap = putOpInsertMap(OperatorFactory.getAndMakeChild( @@ -6979,14 +6985,14 @@ private Operator genUDTFPlan(GenericUDTF genericUDTF, @SuppressWarnings("nls") private Operator genLimitMapRedPlan(String dest, QB qb, Operator input, - int limit, boolean extraMRStep) throws SemanticException { + int offset, int limit, boolean extraMRStep) throws SemanticException { // A map-only job can be optimized - instead of converting it to a // map-reduce job, we can have another map // job to do the same to avoid the cost of sorting in the map-reduce phase. // A better approach would be to // write into a local file and then have a map-only job. // Add the limit operator to get the value fields - Operator curr = genLimitPlan(dest, qb, input, limit); + Operator curr = genLimitPlan(dest, qb, input, offset, limit); // the client requested that an extra map-reduce step be performed if (!extraMRStep) { @@ -6995,7 +7001,7 @@ private Operator genLimitMapRedPlan(String dest, QB qb, Operator input, // Create a reduceSink operator followed by another limit curr = genReduceSinkPlan(dest, qb, curr, 1, false); - return genLimitPlan(dest, qb, curr, limit); + return genLimitPlan(dest, qb, curr, offset, limit); } private ArrayList getPartitionColsFromBucketCols(String dest, QB qb, Table tab, @@ -8939,6 +8945,7 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, curr = genSelectPlan(dest, qb, curr, gbySource); Integer limit = qbp.getDestLimit(dest); + Integer offset = qbp.getDestLimitOffset(dest)==null?0:qbp.getDestLimitOffset(dest); // Expressions are not supported currently without a alias. @@ -8983,7 +8990,8 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, if (limit != null) { // In case of order by, only 1 reducer is used, so no need of // another shuffle - curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), !hasOrderBy); + curr = genLimitMapRedPlan(dest, qb, curr, offset.intValue(), + limit.intValue(), !hasOrderBy); } } else { // exact limit can be taken care of by the fetch operator @@ -8996,8 +9004,8 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, extraMRStep = false; } - curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), - extraMRStep); + curr = genLimitMapRedPlan(dest, qb, curr, offset.intValue(), + limit.intValue(), extraMRStep); qb.getParseInfo().setOuterQueryLimit(limit.intValue()); } if (!SessionState.get().getHiveOperation().equals(HiveOperation.CREATEVIEW)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java index f88bf63..a8af42a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java @@ -27,6 +27,7 @@ @Explain(displayName = "Limit", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class LimitDesc extends AbstractOperatorDesc { private static final long serialVersionUID = 1L; + private int offset = 0; private int limit; private int leastRows = -1; @@ -37,6 +38,20 @@ public LimitDesc(final int limit) { this.limit = limit; } + public LimitDesc(final int offset,final int limit) { + this.offset = offset; + this.limit = limit; + } + + @Explain(displayName = "Offset of rows", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public int getOffset() { + return offset; + } + + public void setOffset(final int offset) { + this.offset = offset; + } + @Explain(displayName = "Number of rows", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public int getLimit() { return limit; diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseDriver.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseDriver.java new file mode 100644 index 0000000..a330266 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseDriver.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.conf.HiveConf; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestParseDriver { + static HiveConf conf; + + private static String LIMIT_QUERY_1 = "EXPLAIN INSERT OVERWRITE TABLE dest1 SELECT src.key, sum(substr(src.value,5)) FROM src GROUP BY src.key LIMIT 3,5"; + private static String LIMIT_QUERY_2 = "select * from dummy_table limit 10"; +// private static String LIMIT_QUERY_1 = "select * from dummy_table limit 1, 10"; + + ParseDriver pd; + @BeforeClass + public static void initialize() { + } + + @Before + public void setup() throws SemanticException { + pd = new ParseDriver(); + } + + ASTNode parse(String query) throws ParseException { + ASTNode nd = pd.parse(query); + return (ASTNode) nd.getChild(0); + } + + private void printLimit(ASTNode nd) { + + for(int i=0;i