diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0161c20..06b7a17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec; import com.esotericsoftware.kryo.Kryo; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -174,6 +176,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -186,8 +189,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -2981,6 +2987,19 @@ public static double getHighestSamplePercentage (MapWork work) { public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx, boolean skipDummy) throws Exception { + int numThreads = job.getInt("mapred.dfsclient.parallelism.max", 0); + ExecutorService pool = null; + if (numThreads > 1) { + pool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build()); + } + return getInputPaths(job, work, hiveScratchDir, ctx, skipDummy, pool); + } + + @VisibleForTesting + static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, + Context ctx, boolean skipDummy, ExecutorService pool) throws Exception { + Set pathsProcessed = new HashSet(); List pathsToAdd = new LinkedList(); // AliasToWork contains all the aliases @@ -2988,32 +3007,35 @@ public static double getHighestSamplePercentage (MapWork work) { LOG.info("Processing alias " + alias); // The alias may not have any path - Path path = null; + boolean isEmptyTable = true; boolean hasLogged = false; // Note: this copies the list because createDummyFileForEmptyPartition may modify the map. for (Path file : new LinkedList(work.getPathToAliases().keySet())) { List aliases = work.getPathToAliases().get(file); if (aliases.contains(alias)) { - path = file; + if (file != null) { + isEmptyTable = false; + } else { + LOG.warn("Found a null path for alias " + alias); + continue; + } // Multiple aliases can point to the same path - it should be // processed only once - if (pathsProcessed.contains(path)) { + if (pathsProcessed.contains(file)) { continue; } - pathsProcessed.add(path); + pathsProcessed.add(file); + if (LOG.isDebugEnabled()) { - LOG.debug("Adding input file " + path); + LOG.debug("Adding input file " + file); } else if (!hasLogged) { hasLogged = true; LOG.info("Adding " + work.getPathToAliases().size() - + " inputs; the first input is " + path); - } - if (!skipDummy && isEmptyPath(job, path, ctx)) { - path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir); + + " inputs; the first input is " + file); } - pathsToAdd.add(path); + pathsToAdd.add(file); } } @@ -3025,12 +3047,56 @@ public static double getHighestSamplePercentage (MapWork work) { // T2) x; // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 // rows) - if (path == null && !skipDummy) { - path = createDummyFileForEmptyTable(job, work, hiveScratchDir, alias); - pathsToAdd.add(path); + if (isEmptyTable && !skipDummy) { + pathsToAdd.add(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias)); + } + } + + List finalPathsToAdd = new LinkedList<>(); + List> futures = new LinkedList<>(); + for (final Path path : pathsToAdd) { + if (pool == null) { + finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call()); + } else { + futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy))); + } + } + + if (pool != null) { + for (Future future : futures) { + finalPathsToAdd.add(future.get()); + } + } + + return finalPathsToAdd; + } + + private static class GetInputPathsCallable implements Callable { + + private final Path path; + private final JobConf job; + private final MapWork work; + private final Path hiveScratchDir; + private final Context ctx; + private final boolean skipDummy; + + private GetInputPathsCallable(Path path, JobConf job, MapWork work, Path hiveScratchDir, + Context ctx, boolean skipDummy) { + this.path = path; + this.job = job; + this.work = work; + this.hiveScratchDir = hiveScratchDir; + this.ctx = ctx; + this.skipDummy = skipDummy; + } + + @Override + public Path call() throws Exception { + if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) { + return createDummyFileForEmptyPartition(this.path, this.job, this.work, this.hiveScratchDir); } + return this.path; } - return pathsToAdd; } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index bd067aa..e444946 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -289,4 +291,66 @@ public void testGetInputPathsWithEmptyTables() throws Exception { } } } + + /** + * Test for {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean, ExecutorService)} with a single + * threaded {@link ExecutorService}. + */ + @Test + public void testGetInputPathsWithPool() throws Exception { + ExecutorService pool = Executors.newSingleThreadExecutor(); + + JobConf jobConf = new JobConf(); + MapWork mapWork = new MapWork(); + Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)); + + String testTableName = "testTable"; + String testPartitionName = "testPartition"; + + Path testTablePath = new Path(testTableName); + Path testPartitionPath = new Path(testTablePath, testPartitionName); + Path testFileTablePath = new Path(testTablePath, "test.txt"); + Path testFilePartitionPath = new Path(testPartitionPath, "test.txt"); + + TableDesc mockTableDesc = mock(TableDesc.class); + + when(mockTableDesc.isNonNative()).thenReturn(false); + when(mockTableDesc.getProperties()).thenReturn(new Properties()); + + LinkedHashMap> pathToAliasTable = new LinkedHashMap<>(); + pathToAliasTable.put(testTablePath, Lists.newArrayList(testTableName)); + mapWork.setPathToAliases(pathToAliasTable); + + mapWork.getAliasToWork().put(testTableName, (Operator) mock(Operator.class)); + + FileSystem fs = FileSystem.getLocal(jobConf); + try { + fs.mkdirs(testTablePath); + fs.create(testFileTablePath).close(); + + // Run a test with an un-partitioned table with a single file as the input + List tableInputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, mock(Context.class), false, + pool); + assertEquals(tableInputPaths.size(), 1); + assertEquals(tableInputPaths.get(0), testTablePath); + + LinkedHashMap> pathToAliasPartition = new LinkedHashMap<>(); + pathToAliasPartition.put(testPartitionPath, Lists.newArrayList(testTableName)); + mapWork.setPathToAliases(pathToAliasPartition); + + fs.delete(testFileTablePath, false); + fs.mkdirs(testPartitionPath); + fs.create(testFilePartitionPath).close(); + + // Run a test with a partitioned table with a single partition and a single file as the input + List tablePartitionInputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, mock(Context.class), + false, pool); + assertEquals(tablePartitionInputPaths.size(), 1); + assertEquals(tablePartitionInputPaths.get(0), testPartitionPath); + } finally { + if (fs.exists(testTablePath)) { + fs.delete(testTablePath, true); + } + } + } }