diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 8759661..b8e980b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -30,6 +30,11 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -74,6 +80,48 @@ private static final String CLASS_NAME = CombineHiveInputFormat.class.getName(); public static final Log LOG = LogFactory.getLog(CLASS_NAME); + // max number of threads we can use to check non-combinable paths + private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; + private static final int DEFAULT_NUM_PATH_PER_THREAD = 100; + + private class CheckNonCombinablePathCallable implements Callable> { + private final Path[] paths; + private final int start; + private final int length; + private final JobConf conf; + + public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { + this.paths = paths; + this.start = start; + this.length = length; + this.conf = conf; + } + + @Override + public Set call() throws Exception { + Set nonCombinablePathIndices = new HashSet(); + for (int i = 0; i < length; i++) { + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, paths[i + start], + IOPrepareCache.get().allocatePartitionDescMap()); + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = + getInputFormatFromCache(inputFormatClass, conf); + if (inputFormat instanceof AvoidSplitCombination && + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The path [" + paths[i + start] + + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePathIndices.add(i); + } + } + return nonCombinablePathIndices; + } + } + /** * CombineHiveInputSplit encapsulates an InputSplit with its corresponding * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks @@ -469,26 +517,36 @@ public int hashCode() { List nonCombinablePaths = new ArrayList(paths.length / 2); List combinablePaths = new ArrayList(paths.length / 2); - for (Path path : paths) { - - PartitionDesc part = - HiveFileFormatUtils.getPartitionDescFromPathRecursively( - pathToPartitionInfo, path, - IOPrepareCache.get().allocatePartitionDescMap()); - - // Use HiveInputFormat if any of the paths is not splittable - Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - if (inputFormat instanceof AvoidSplitCombination && - ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { - if (LOG.isDebugEnabled()) { - LOG.debug("The split [" + path + - "] is being parked for HiveInputFormat.getSplits"); + int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, + (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); + int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); + LOG.info("Total number of paths: " + paths.length + + ", launching " + numThreads + " threads to check non-combinable ones."); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List>> futureList = new ArrayList>>(); + try { + for (int i = 0; i < numThreads; i++) { + int start = i * numPathPerThread; + int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(paths, start, length, job))); + } + Set nonCombinablePathIndices = new HashSet(); + for (Future> future : futureList) { + nonCombinablePathIndices.addAll(future.get()); + } + for (int i = 0; i < paths.length; i++) { + if (nonCombinablePathIndices.contains(i)) { + nonCombinablePaths.add(paths[i]); + } else { + combinablePaths.add(paths[i]); } - nonCombinablePaths.add(path); - } else { - combinablePaths.add(path); } + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + throw new IOException(e); + } finally { + executor.shutdownNow(); } // Store the previous value for the path specification