diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 10fa561..ca945ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -21,9 +21,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +38,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.thrift.TException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Verify that the information in the metastore matches what is on the * filesystem. Return a CheckResult object containing lists of missing and any @@ -286,9 +296,10 @@ void checkTable(Table table, List parts, * Result object * @throws IOException * Thrown if we fail at fetching listings from the fs. + * @throws HiveException */ void findUnknownPartitions(Table table, Set partPaths, - CheckResult result) throws IOException { + CheckResult result) throws IOException, HiveException { Path tablePath = table.getPath(); // now check the table folder and see if we find anything @@ -353,28 +364,61 @@ private String getPartitionName(Path tablePath, Path partitionPath) { * This set will contain the leaf paths at the end. * @throws IOException * Thrown if we can't get lists from the fs. + * @throws HiveException */ - private void getAllLeafDirs(Path basePath, Set allDirs) - throws IOException { - getAllLeafDirs(basePath, allDirs, basePath.getFileSystem(conf)); + private void getAllLeafDirs(Path basePath, Set allDirs) throws IOException, HiveException { + ConcurrentLinkedQueue basePaths = new ConcurrentLinkedQueue<>(); + basePaths.add(basePath); + Set dirSet = Collections.synchronizedSet(new HashSet()); + getAllLeafDirs(basePaths, dirSet, basePath.getFileSystem(conf)); + allDirs.addAll(dirSet); } - private void getAllLeafDirs(Path basePath, Set allDirs, FileSystem fs) - throws IOException { - - FileStatus[] statuses = fs.listStatus(basePath, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean directoryFound=false; + // process the basePaths in parallel and then the next level of basePaths + private void getAllLeafDirs(ConcurrentLinkedQueue basePaths, final Set allDirs, + final FileSystem fs) throws IOException, HiveException { + final ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); + final List> futures = new LinkedList<>(); + // here we just reuse the THREAD_COUNT configuration for HIVE_MOVE_FILES_THREAD_COUNT + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("GetLeave-Thread-%d").build()); + for (final Path path : basePaths) { + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean directoryFound = false; + + for (FileStatus status : statuses) { + if (status.isDir()) { + directoryFound = true; + nextLevel.add(status.getPath()); + } + } - for (FileStatus status : statuses) { - if (status.isDir()) { - directoryFound = true; - getAllLeafDirs(status.getPath(), allDirs, fs); + if (!directoryFound) { + allDirs.add(path); + } + return true; + } + })); + } + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); } } - - if(!directoryFound){ - allDirs.add(basePath); + if (nextLevel.isEmpty()) { + return; + } else { + getAllLeafDirs(nextLevel, allDirs, fs); } }