diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 8759661..619dd49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -30,6 +30,10 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,6 +78,48 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Log LOG = LogFactory.getLog(CLASS_NAME); + // max number of threads we can use to check non-combinable paths + private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; + private static final int DEFAULT_NUM_PATH_PER_THREAD = 100; + + private class CheckNonCombinablePathCallable implements Callable> { + private final Path[] paths; + private final int start; + private final int length; + private final JobConf conf; + + public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { + this.paths = paths; + this.start = start; + this.length = length; + this.conf = conf; + } + + @Override + public Set call() throws Exception { + Set nonCombinablePathIndices = new HashSet(); + for (int i = 0; i < length; i++) { + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, paths[i + start], + IOPrepareCache.get().allocatePartitionDescMap()); + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = + getInputFormatFromCache(inputFormatClass, conf); + if (inputFormat instanceof AvoidSplitCombination && + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The path [" + paths[i + start] + + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePathIndices.add(i); + } + } + return nonCombinablePathIndices; + } + } + /** * CombineHiveInputSplit encapsulates an InputSplit with its corresponding * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks @@ -278,8 +324,6 @@ public int hashCode() { private InputSplit[] getCombineSplits(JobConf job, int numSplits, Map pathToPartitionInfo) throws IOException { - PerfLogger perfLogger = PerfLogger.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); init(job); Map> pathToAliases = mrwork.getPathToAliases(); Map> aliasToWork = @@ -290,7 +334,6 @@ public int hashCode() { InputSplit[] splits = null; if (combine == null) { splits = super.getSplits(job, numSplits); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; } @@ -349,7 +392,6 @@ public int hashCode() { } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) { //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; } @@ -363,7 +405,6 @@ public int hashCode() { fStatus[idx].getPath()) != null) { //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine) splits = super.getSplits(job, numSplits); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; } } @@ -373,7 +414,6 @@ public int hashCode() { //don't combine if inputformat is a SymlinkTextInputFormat if (inputFormat instanceof SymlinkTextInputFormat) { splits = super.getSplits(job, numSplits); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return splits; } @@ -451,7 +491,6 @@ public int hashCode() { } LOG.info("number of splits " + result.size()); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new CombineHiveInputSplit[result.size()]); } @@ -460,6 +499,8 @@ public int hashCode() { */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + PerfLogger perfLogger = PerfLogger.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); init(job); ArrayList result = new ArrayList(); @@ -469,26 +510,37 @@ public int hashCode() { List nonCombinablePaths = new ArrayList(paths.length / 2); List combinablePaths = new ArrayList(paths.length / 2); - for (Path path : paths) { - - PartitionDesc part = - HiveFileFormatUtils.getPartitionDescFromPathRecursively( - pathToPartitionInfo, path, - IOPrepareCache.get().allocatePartitionDescMap()); - - // Use HiveInputFormat if any of the paths is not splittable - Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - if (inputFormat instanceof AvoidSplitCombination && - ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { - if (LOG.isDebugEnabled()) { - LOG.debug("The split [" + path + - "] is being parked for HiveInputFormat.getSplits"); + int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, + (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); + int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); + LOG.info("Total number of paths: " + paths.length + + ", launching " + numThreads + " threads to check non-combinable ones."); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List>> futureList = new ArrayList>>(numThreads); + try { + for (int i = 0; i < numThreads; i++) { + int start = i * numPathPerThread; + int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(paths, start, length, job))); + } + Set nonCombinablePathIndices = new HashSet(); + for (Future> future : futureList) { + nonCombinablePathIndices.addAll(future.get()); + } + for (int i = 0; i < paths.length; i++) { + if (nonCombinablePathIndices.contains(i)) { + nonCombinablePaths.add(paths[i]); + } else { + combinablePaths.add(paths[i]); } - nonCombinablePaths.add(path); - } else { - combinablePaths.add(path); } + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException(e); + } finally { + executor.shutdownNow(); } // Store the previous value for the path specification @@ -528,6 +580,7 @@ public int hashCode() { job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths); } LOG.info("Number of all splits " + result.size()); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); }