Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (revision 17445) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (working copy) @@ -34,6 +34,7 @@ protected transient int limit; protected transient int leastRow; protected transient int currCount; + protected transient boolean isMap; @Override protected void initializeOp(Configuration hconf) throws HiveException { @@ -41,6 +42,7 @@ limit = conf.getLimit(); leastRow = conf.getLeastRows(); currCount = 0; + isMap = hconf.getBoolean("mapred.task.is.map", true); } @Override @@ -65,7 +67,7 @@ @Override public void closeOp(boolean abort) throws HiveException { - if (currCount < leastRow) { + if (!isMap && currCount < leastRow) { throw new HiveException("No sufficient row found"); } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 17445) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -27,9 +27,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -92,7 +92,6 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; -import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; @@ -102,6 +101,7 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; @@ -122,7 +122,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; @@ -145,12 +144,13 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -158,9 +158,9 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -6935,7 +6935,7 @@ } } - decideExecMode(rootTasks, ctx); + decideExecMode(rootTasks, ctx, globalLimitCtx); if (qb.isCTAS()) { // generate a DDL task and make it a dependent task of the leaf @@ -7959,7 +7959,8 @@ } } - private void decideExecMode(List> rootTasks, Context ctx) + private void decideExecMode(List> rootTasks, Context ctx, + GlobalLimitCtx globalLimitCtx) throws SemanticException { // bypass for explain queries for now @@ -7990,14 +7991,32 @@ (ctx, (MapredWork)mrtask.getWork(), p); int numReducers = getNumberOfReducers(mrtask.getWork(), conf); + long estimatedInput; + + if (globalLimitCtx != null && globalLimitCtx.isEnable()) { + // If the global limit optimization is triggered, we will + // estimate input data actually needed based on limit rows. + // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2) + // + long sizePerRow = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVELIMITMAXROWSIZE); + estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow; + long minSplitSize = HiveConf.getLongVar(conf, + HiveConf.ConfVars.MAPREDMINSPLITSIZE); + long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1; + estimatedInput = estimatedInput * (estimatedNumMap + 1); + } else { + estimatedInput = inputSummary.getLength(); + } + if (LOG.isDebugEnabled()) { LOG.debug("Task: " + mrtask.getId() + ", Summary: " + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," - + numReducers); + + numReducers + ", estimated Input: " + estimatedInput); } if(MapRedTask.isEligibleForLocalMode(conf, numReducers, - inputSummary.getLength(), inputSummary.getFileCount()) != null) { + estimatedInput, inputSummary.getFileCount()) != null) { hasNonLocalJob = true; break; }else{