diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java index 53769a0..e687b1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java @@ -60,6 +60,10 @@ public class MapRedTask extends ExecDriver implements Serializable { private transient ContentSummary inputSummary = null; private transient boolean runningViaChild = false; + private transient boolean inputSizeEstimated = false; + private transient long totalInputFileSize; + private transient long totalInputNumFiles; + public MapRedTask() { super(); } @@ -91,16 +95,21 @@ public class MapRedTask extends ExecDriver implements Serializable { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); } + // set the values of totalInputFileSize and totalInputNumFiles, estimating them + // if percentage block sampling is being used + estimateInputSize(); + // at this point the number of reducers is precisely defined in the plan int numReducers = work.getNumReduceTasks(); if (LOG.isDebugEnabled()) { LOG.debug("Task: " + getId() + ", Summary: " + - inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + totalInputFileSize + "," + totalInputNumFiles + "," + numReducers); } - String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers); + String reason = MapRedTask.isEligibleForLocalMode(conf, numReducers, + totalInputFileSize, totalInputNumFiles); if (reason == null) { // clone configuration before modifying it on per-task basis cloneConf(); @@ -366,9 +375,50 @@ public class MapRedTask extends ExecDriver implements Serializable { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); } - long totalInputFileSize = inputSummary.getLength(); - // if all inputs are sampled, we should shrink the size of reducers accordingly. + estimateInputSize(); + + if (totalInputFileSize != inputSummary.getLength()) { + LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); + } else { + LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " totalInputFileSize=" + totalInputFileSize); + } + + int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + return reducers; + } + + /** + * Sets the values of totalInputFileSize and totalInputNumFiles. If percentage + * block sampling is used, these values are estimates based on the highest + * percentage being used for sampling multiplied by the value obtained from the + * input summary. Otherwise, these values are set to the exact value obtained + * from the input summary. + * + * Once the function completes, inputSizeEstimated is set so that the logic is + * never run more than once. + */ + private void estimateInputSize() { + if (inputSizeEstimated) { + // If we've already run this function, return + return; + } + + // Initialize the values to be those taken from the input summary + totalInputFileSize = inputSummary.getLength(); + totalInputNumFiles = inputSummary.getFileCount(); + + if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) { + // If percentage block sampling wasn't used, we don't need to do any estimation + inputSizeEstimated = true; + return; + } + + // if all inputs are sampled, we should shrink the size of the input accordingly double highestSamplePercentage = 0; boolean allSample = false; for (String alias : work.getAliasToWork().keySet()) { @@ -385,42 +435,38 @@ public class MapRedTask extends ExecDriver implements Serializable { } if (allSample) { // This is a little bit dangerous if inputs turns out not to be able to be sampled. - // In that case, we significantly underestimate number of reducers. - // It's the same as other cases of estimateNumberOfReducers(). It's just our best + // In that case, we significantly underestimate the input. + // It's the same as estimateNumberOfReducers(). It's just our best // guess and there is no guarantee. totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D) , totalInputFileSize); - LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize); - } else { - LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " totalInputFileSize=" + totalInputFileSize); + totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D) + , totalInputNumFiles); } - int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); - reducers = Math.max(1, reducers); - reducers = Math.min(maxReducers, reducers); - return reducers; + inputSizeEstimated = true; } /** * Find out if a job can be run in local mode based on it's characteristics * * @param conf Hive Configuration - * @param inputSummary summary about the input files for this job * @param numReducers total number of reducers for this job + * @param inputLength the size of the input + * @param inputFileCount the number of files of input * @return String null if job is eligible for local mode, reason otherwise */ public static String isEligibleForLocalMode(HiveConf conf, - ContentSummary inputSummary, - int numReducers) { + int numReducers, + long inputLength, + long inputFileCount) { long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES); long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS); // check for max input size - if (inputSummary.getLength() > maxBytes) { - return "Input Size (= " + inputSummary.getLength() + ") is larger than " + + if (inputLength > maxBytes) { + return "Input Size (= " + inputLength + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")"; } @@ -428,8 +474,8 @@ public class MapRedTask extends ExecDriver implements Serializable { // in the absence of an easy way to get the number of splits - do this // based on the total number of files (pessimistically assumming that // splits are equal to number of files in worst case) - if (inputSummary.getFileCount() > maxTasks) { - return "Number of Input Files (= " + inputSummary.getFileCount() + + if (inputFileCount > maxTasks) { + return "Number of Input Files (= " + inputFileCount + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cd3de76..af43a72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7968,7 +7968,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { + numReducers); } - if(MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers) != null) { + if(MapRedTask.isEligibleForLocalMode(conf, numReducers, + inputSummary.getLength(), inputSummary.getFileCount()) != null) { hasNonLocalJob = true; break; }else{ diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java new file mode 100644 index 0000000..166bd5b --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; + +public class VerifyIsLocalModeHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) { + List taskRunners = hookContext.getCompleteTaskList(); + for (TaskRunner taskRunner : taskRunners) { + Task task = taskRunner.getTask(); + if (task.isMapRedTask()) { + Assert.assertTrue("VerifyIsLocalModeHook fails because a isLocalMode was not set for a task.", + task.isLocalMode()); + } + } + } + } +} diff --git ql/src/test/queries/clientpositive/sample_islocalmode_hook.q ql/src/test/queries/clientpositive/sample_islocalmode_hook.q new file mode 100644 index 0000000..6fab1e6 --- /dev/null +++ ql/src/test/queries/clientpositive/sample_islocalmode_hook.q @@ -0,0 +1,42 @@ +drop table if exists sih_i_part; +drop table if exists sih_src; +drop table if exists sih_src2; + +set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +set mapred.max.split.size=300; +set mapred.min.split.size=300; +set mapred.min.split.size.per.node=300; +set mapred.min.split.size.per.rack=300; +set hive.exec.mode.local.auto=true; +set hive.merge.smallfiles.avgsize=1; + +-- create file inputs +create table sih_i_part (key int, value string) partitioned by (p string); +insert overwrite table sih_i_part partition (p='1') select key, value from src; +insert overwrite table sih_i_part partition (p='2') select key+10000, value from src; +insert overwrite table sih_i_part partition (p='3') select key+20000, value from src; +create table sih_src as select key, value from sih_i_part; +create table sih_src2 as select key, value from sih_src; + +set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.VerifyIsLocalModeHook ; +set mapred.job.tracker=does.notexist.com:666; +set hive.exec.mode.local.auto.tasks.max=1; + +-- sample split, running locally limited by num tasks +select key, value from sih_src tablesample(1 percent); + +set mapred.job.tracker=does.notexist.com:666; + +-- sample two tables +select a.key, b.value from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key; + +set hive.exec.mode.local.auto.inputbytes.max=1000; +set hive.exec.mode.local.auto.tasks.max=4; +set mapred.job.tracker=does.notexist.com:666; + +-- sample split, running locally limited by max bytes +select key, value from sih_src tablesample(1 percent); + +drop table sih_i_part; +drop table sih_src; +drop table sih_src2; \ No newline at end of file