diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a18a6d7ec2..8a9fe3cefa 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2094,6 +2094,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Allow synthetic file ID in splits on file systems that don't have a native one."), HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE("hive.orc.cache.stripe.details.mem.size", "256Mb", new SizeValidator(), "Maximum size of orc splits cached in the client."), + /** + * @deprecated Use HiveConf.HIVE_COMPUTE_SPLITS_NUM_THREADS + */ + @Deprecated HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10, "How many threads orc should use to create splits in parallel."), HIVE_ORC_CACHE_USE_SOFT_REFERENCES("hive.orc.cache.use.soft.references", false, @@ -4827,6 +4831,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.SECONDS), "Timeout for Running Query in seconds. A nonpositive value means infinite. " + "If the query timeout is also set by thrift API call, the smaller one will be taken."), + HIVE_COMPUTE_SPLITS_NUM_THREADS("hive.compute.splits.num.threads", 10, + "How many threads Input Format should use to create splits in parallel.", + HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname), HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 233bd1ebc4..96a48de42e 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -72,6 +73,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -80,6 +82,12 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static java.lang.Integer.min; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -478,7 +486,8 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job pushFilters(conf, tableScan, this.mrwork); } - List dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>(); + List dirsWithFileOriginals = Collections.synchronizedList(new ArrayList<>()), + finalDirs = Collections.synchronizedList(new ArrayList<>()); processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals); if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) { // This is for transactional tables. @@ -577,9 +586,41 @@ public static void processPathsForMmRead(List dirs, Configuration conf, return; } boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS); - for (Path dir : dirs) { - processForWriteIdsForMmRead( - dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); + + int numThreads = min(HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS), dirs.size()); + List> pathFutures = new ArrayList<>(); + ExecutorService pool = null; + if (numThreads > 1) { + pool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MM-Split-Paths-%d").build()); + } + + try { + for (Path dir : dirs) { + if (pool != null) { + Future pathFuture = pool.submit(() -> { + processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); + return null; + }); + pathFutures.add(pathFuture); + } else { + processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); + } + } + try { + for (Future pathFuture : pathFutures) { + pathFuture.get(); + } + } catch (InterruptedException | ExecutionException e) { + for (Future future : pathFutures) { + future.cancel(true); + } + throw new IOException(e); + } + } finally { + if (pool != null) { + pool.shutdown(); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7c8f4796bd..1059cb227f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -672,7 +672,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS); long cacheMemSize = HiveConf.getSizeVar( conf, ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE); - int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS); + int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS); boolean useSoftReference = HiveConf.getBoolVar( conf, ConfVars.HIVE_ORC_CACHE_USE_SOFT_REFERENCES);