Index: java/org/apache/hadoop/hive/ql/exec/MapRedTask.java =================================================================== --- java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (revision 16351) +++ java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (working copy) @@ -60,6 +60,10 @@ private transient ContentSummary inputSummary = null; private transient boolean runningViaChild = false; + private transient boolean inputSizeEstimated = false; + private transient long totalInputFileSize; + private transient long totalInputNumFiles; + public MapRedTask() { super(); } @@ -91,16 +95,19 @@ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); } + estimateSampledInputSize(); + // at this point the number of reducers is precisely defined in the plan int numReducers = work.getNumReduceTasks(); if (LOG.isDebugEnabled()) { LOG.debug("Task: " + getId() + ", Summary: " + - inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + totalInputFileSize + "," + totalInputNumFiles + "," + numReducers); } - String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers); + String reason = MapRedTask.isEligibleForLocalMode(conf, numReducers, + totalInputFileSize, totalInputNumFiles); if (reason == null) { // clone configuration before modifying it on per-task basis cloneConf(); @@ -366,9 +373,32 @@ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); } - long totalInputFileSize = inputSummary.getLength(); + // if all inputs are sampled, we should shrink the size of reducers accordingly. + estimateSampledInputSize(); - // if all inputs are sampled, we should shrink the size of reducers accordingly. + if (totalInputFileSize != inputSummary.getLength()) { + LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); + } else { + LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " totalInputFileSize=" + totalInputFileSize); + } + + int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + return reducers; + } + + private void estimateSampledInputSize() { + if (inputSizeEstimated) { + return; + } + + totalInputFileSize = inputSummary.getLength(); + totalInputNumFiles = inputSummary.getFileCount(); + + // if all inputs are sampled, we should shrink the size of the input accordingly double highestSamplePercentage = 0; boolean allSample = false; for (String alias : work.getAliasToWork().keySet()) { @@ -390,37 +420,33 @@ // guess and there is no guarantee. totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D) , totalInputFileSize); - LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); - } else { - LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " totalInputFileSize=" + totalInputFileSize); + totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D) + , totalInputNumFiles); } - int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); - reducers = Math.max(1, reducers); - reducers = Math.min(maxReducers, reducers); - return reducers; + inputSizeEstimated = true; } /** * Find out if a job can be run in local mode based on it's characteristics * * @param conf Hive Configuration - * @param inputSummary summary about the input files for this job * @param numReducers total number of reducers for this job + * @param inputLength the size of the input + * @param inputFileCount the number of files of input * @return String null if job is eligible for local mode, reason otherwise */ public static String isEligibleForLocalMode(HiveConf conf, - ContentSummary inputSummary, - int numReducers) { + int numReducers, + long inputLength, + long inputFileCount) { long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES); long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS); // check for max input size - if (inputSummary.getLength() > maxBytes) { - return "Input Size (= " + inputSummary.getLength() + ") is larger than " + + if (inputLength > maxBytes) { + return "Input Size (= " + inputLength + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")"; } @@ -428,8 +454,8 @@ // in the absence of an easy way to get the number of splits - do this // based on the total number of files (pessimistically assumming that // splits are equal to number of files in worst case) - if (inputSummary.getFileCount() > maxTasks) { - return "Number of Input Files (= " + inputSummary.getFileCount() + + if (inputFileCount > maxTasks) { + return "Number of Input Files (= " + inputFileCount + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")"; } Index: java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 16351) +++ java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -7968,7 +7968,8 @@ + numReducers); } - if(MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers) != null) { + if(MapRedTask.isEligibleForLocalMode(conf, numReducers, + inputSummary.getLength(), inputSummary.getFileCount()) != null) { hasNonLocalJob = true; break; }else{ Index: test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java =================================================================== --- test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java (revision 0) +++ test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java (revision 0) @@ -0,0 +1,25 @@ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; + +public class VerifyIsLocalModeHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) { + List taskRunners = hookContext.getCompleteTaskList(); + for (TaskRunner taskRunner : taskRunners) { + Task task = taskRunner.getTask(); + if (task.isMapRedTask()) { + Assert.assertTrue("VerifyIsLocalModeHook fails because a isLocalMode was not set for a task.", + task.isLocalMode()); + } + } + } + } +} Index: test/queries/clientpositive/sample_islocalmode_hook.q =================================================================== --- test/queries/clientpositive/sample_islocalmode_hook.q (revision 0) +++ test/queries/clientpositive/sample_islocalmode_hook.q (revision 0) @@ -0,0 +1,42 @@ +drop table if exists sih_i_part; +drop table if exists sih_src; +drop table if exists sih_src2; + +set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +set mapred.max.split.size=300; +set mapred.min.split.size=300; +set mapred.min.split.size.per.node=300; +set mapred.min.split.size.per.rack=300; +set hive.exec.mode.local.auto=true; +set hive.merge.smallfiles.avgsize=1; + +-- create file inputs +create table sih_i_part (key int, value string) partitioned by (p string); +insert overwrite table sih_i_part partition (p='1') select key, value from src; +insert overwrite table sih_i_part partition (p='2') select key+10000, value from src; +insert overwrite table sih_i_part partition (p='3') select key+20000, value from src; +create table sih_src as select key, value from sih_i_part; +create table sih_src2 as select key, value from sih_src; + +set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.VerifyIsLocalModeHook ; +set mapred.job.tracker=does.notexist.com:666; +set hive.exec.mode.local.auto.tasks.max=1; + +-- sample split, running locally limited by num tasks +select key, value from sih_src tablesample(1 percent); + +set mapred.job.tracker=does.notexist.com:666; + +-- sample two tables +select a.key, b.value from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key; + +set hive.exec.mode.local.auto.inputbytes.max=1000; +set hive.exec.mode.local.auto.tasks.max=4; +set mapred.job.tracker=does.notexist.com:666; + +-- sample split, running locally limited by max bytes +select key, value from sih_src tablesample(1 percent); + +drop table sih_i_part; +drop table sih_src; +drop table sih_src2; \ No newline at end of file