diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0161c20..95e6f94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -174,6 +175,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -186,8 +188,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -2981,6 +2986,13 @@ public static double getHighestSamplePercentage (MapWork work) { public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx, boolean skipDummy) throws Exception { + int numThreads = job.getInt("mapred.dfsclient.parallelism.max", 0); + ExecutorService pool = null; + if (numThreads != 0) { + pool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build()); + } + Set pathsProcessed = new HashSet(); List pathsToAdd = new LinkedList(); // AliasToWork contains all the aliases @@ -2988,32 +3000,32 @@ public static double getHighestSamplePercentage (MapWork work) { LOG.info("Processing alias " + alias); // The alias may not have any path - Path path = null; + boolean isEmptyTable = true; boolean hasLogged = false; // Note: this copies the list because createDummyFileForEmptyPartition may modify the map. for (Path file : new LinkedList(work.getPathToAliases().keySet())) { List aliases = work.getPathToAliases().get(file); if (aliases.contains(alias)) { - path = file; + if (file != null) { + isEmptyTable = false; + } // Multiple aliases can point to the same path - it should be // processed only once - if (pathsProcessed.contains(path)) { + if (pathsProcessed.contains(file)) { continue; } - pathsProcessed.add(path); + pathsProcessed.add(file); + if (LOG.isDebugEnabled()) { - LOG.debug("Adding input file " + path); + LOG.debug("Adding input file " + file); } else if (!hasLogged) { hasLogged = true; LOG.info("Adding " + work.getPathToAliases().size() - + " inputs; the first input is " + path); - } - if (!skipDummy && isEmptyPath(job, path, ctx)) { - path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir); + + " inputs; the first input is " + file); } - pathsToAdd.add(path); + pathsToAdd.add(file); } } @@ -3025,12 +3037,56 @@ public static double getHighestSamplePercentage (MapWork work) { // T2) x; // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) - if (path == null && !skipDummy) { - path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias); - pathsToAdd.add(path); + if (isEmptyTable && !skipDummy) { + pathsToAdd.add(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias)); + } + } + + List finalPathsToAdd = new LinkedList<>(); + List> futures = new LinkedList<>(); + for (final Path path : pathsToAdd) { + if (pool == null) { + finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call()); + } else { + futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy))); + } + } + + if (pool != null) { + for (Future future : futures) { + finalPathsToAdd.add(future.get()); } } - return pathsToAdd; + + return finalPathsToAdd; + } + + private static class GetInputPathsCallable implements Callable { + + private final Path path; + private final JobConf job; + private final MapWork work; + private final Path hiveScratchDir; + private final Context ctx; + private final boolean skipDummy; + + private GetInputPathsCallable(Path path, JobConf job, MapWork work, Path hiveScratchDir, + Context ctx, boolean skipDummy) { + this.path = path; + this.job = job; + this.work = work; + this.hiveScratchDir = hiveScratchDir; + this.ctx = ctx; + this.skipDummy = skipDummy; + } + + @Override + public Path call() throws Exception { + if (!skipDummy && isEmptyPath(job, path, ctx)) { + return createDummyFileForEmptyPartition(path, job, work, hiveScratchDir); + } + return path; + } } @SuppressWarnings({"rawtypes", "unchecked"})