diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3d4e9e023b..3c2ab1bb57 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2630,6 +2630,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" + "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" + "read MM tables with original files will fail. The default in Hive 3.0 is false."), + HIVE_MM_COMPUTE_SPLITS_NUM_THREADS("hive.mm.compute.splits.num.threads", 10, + "How many threads Hive input format should use to process splits in parallel to read MM table."), // Zookeeper related configs HIVE_ZOOKEEPER_USE_KERBEROS("hive.zookeeper.kerberos.enabled", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 233bd1ebc4..1733bdcaaf 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -60,6 +62,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; @@ -71,6 +74,7 @@ import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -79,7 +83,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static java.lang.Integer.min; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -577,10 +588,102 @@ public static void processPathsForMmRead(List dirs, Configuration conf, return; } boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS); - for (Path dir : dirs) { - processForWriteIdsForMmRead( - dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); + + int numThreads = min(HiveConf.getIntVar(conf, ConfVars.HIVE_MM_COMPUTE_SPLITS_NUM_THREADS), dirs.size()); + List> pathFutures = Lists.newArrayList(); + ExecutorService pool = null; + if (numThreads > 1) { + pool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MM-Split-Paths-%d").build()); + } + + try { + for (Path dir : dirs) { + if (pool != null) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ProcessForWriteIdsForMmReadCallable processPaths = new ProcessForWriteIdsForMmReadCallable(dir, conf, + validWriteIdList, allowOriginals, ugi); + pathFutures.add(pool.submit(processPaths)); + } else { + processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); + } + } + for (Future pathFuture : pathFutures) { + finalPaths.addAll(pathFuture.get().getFinalPaths()); + pathsWithFileOriginals.addAll(pathFuture.get().getPathsWithFileOriginals()); + } + } catch (InterruptedException | ExecutionException e) { + for (Future future : pathFutures) { + future.cancel(true); + } + throw new IOException(e); + } finally { + if (pool != null) { + pool.shutdown(); + } + } + } + + static final class MMPathInfo { + MMPathInfo(List finalPaths, + List pathsWithFileOriginals) { + this.finalPaths = finalPaths; + this.pathsWithFileOriginals = pathsWithFileOriginals; + } + + private final List finalPaths; + private final List pathsWithFileOriginals; + + public List getFinalPaths() { + return finalPaths; } + + public List getPathsWithFileOriginals() { + return pathsWithFileOriginals; + } + } + + static final class ProcessForWriteIdsForMmReadCallable implements Callable { + + private final Path dir; + private final Configuration conf; + private final ValidWriteIdList validWriteIdList; + private final boolean allowOriginals; + private final UserGroupInformation ugi; + + ProcessForWriteIdsForMmReadCallable(Path dir, Configuration conf, ValidWriteIdList validWriteIdList, + boolean allowOriginals, UserGroupInformation ugi) { + this.dir = dir; + this.conf = conf; + this.validWriteIdList= validWriteIdList; + this.allowOriginals = allowOriginals; + this.ugi = ugi; + + } + + @Override + public MMPathInfo call() throws Exception { + if (ugi == null) { + return processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals); + } + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public MMPathInfo run() throws Exception { + return processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + + private static MMPathInfo processForWriteIdsForMmRead(Path dir, Configuration conf, + ValidWriteIdList validWriteIdList, boolean allowOriginals) throws IOException { + MMPathInfo pathLists = new MMPathInfo(Lists.newArrayList(), Lists.newArrayList()); + processForWriteIdsForMmRead(dir, conf, validWriteIdList, allowOriginals, pathLists.getFinalPaths(), pathLists.getPathsWithFileOriginals()); + return pathLists; } private static void processForWriteIdsForMmRead(Path dir, Configuration conf,