diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 3420ef8..a0d3389 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -28,8 +28,10 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import com.google.common.annotations.VisibleForTesting; @@ -52,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.thrift.TException; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -411,30 +414,21 @@ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) // pool here the smaller sized pool of the two becomes a bottleneck int poolSize = conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15); - // Check if too low config is provided for move files. 2x CPU is reasonable max count. - poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, - getMinPoolSize()); - - // Fixed thread pool on need basis - final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor) - Executors.newFixedThreadPool(poolSize, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null; - - if (pool == null) { - LOG.debug("Not-using threaded version of MSCK-GetPaths"); - Queue basePaths = new LinkedList<>(); - basePaths.add(basePath); - checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf), maxDepth, - maxDepth); + ExecutorService executor; + if (poolSize <= 0) { + LOG.debug("Using single-threaded version of MSCK-GetPaths"); + executor = MoreExecutors.sameThreadExecutor(); } else { - LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " - + pool.getMaximumPoolSize()); - checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs, - basePath.getFileSystem(conf), maxDepth); - } - if (pool != null) { - pool.shutdown(); + // Check if too low config is provided for move files. 2x CPU is reasonable max count. + poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, getMinPoolSize()); + LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build(); + executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory); } + checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth); + + executor.shutdown(); } @VisibleForTesting @@ -515,7 +509,7 @@ private Path processPathDepthInfo(final PathDepthInfo pd) } } - private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool, + private void checkPartitionDirs(final ExecutorService executor, final Path basePath, final Set result, final FileSystem fs, final int maxDepth) throws HiveException { try { @@ -534,7 +528,7 @@ private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool, //process each level in parallel while(!nextLevel.isEmpty()) { futures.add( - pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); + executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); } while(!futures.isEmpty()) { Path p = futures.poll().get(); @@ -547,52 +541,8 @@ private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool, } } catch (InterruptedException | ExecutionException e) { LOG.error(e.getMessage()); - pool.shutdownNow(); + executor.shutdownNow(); throw new HiveException(e.getCause()); } } - - /* - * Original recursive implementation works well for single threaded use-case but has limitations - * if we attempt to parallelize this directly - */ - private void checkPartitionDirsSingleThreaded(Queue basePaths, final Set allDirs, - final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { - for (final Path path : basePaths) { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - final Queue nextLevel = new LinkedList<>(); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } - } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " + path.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); - } - } - } else { - allDirs.add(path); - } - } - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index f95c130..5e5987b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.metadata; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -25,9 +24,6 @@ import java.util.List; import java.util.Map; -import com.google.common.collect.Lists; -import junit.framework.TestCase; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,6 +38,10 @@ import org.apache.thrift.TException; import org.mockito.Mockito; +import com.google.common.collect.Lists; + +import junit.framework.TestCase; + /** * TestHiveMetaStoreChecker. * @@ -419,18 +419,22 @@ public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, createDirectory(sb.toString()); //check result now CheckResult result = new CheckResult(); + Exception exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); createFile(sb.toString(), "dummyFile"); result = new CheckResult(); + exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); } /* @@ -451,20 +455,22 @@ public void testErrorForMissingPartitionsSingleThreaded() createDirectory(sb.toString()); // check result now CheckResult result = new CheckResult(); + Exception exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), - e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); createFile(sb.toString(), "dummyFile"); result = new CheckResult(); + exception = null; try { checker.checkMetastore(dbName, tableName, null, result); } catch (Exception e) { - assertTrue("Expected exception HiveException got " + e.getClass(), - e instanceof HiveException); + exception = e; } + assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); } /** * Creates a test partitioned table with the required level of nested partitions and number of