diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 57f731f..31de976 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -28,7 +28,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -400,24 +399,31 @@ static String getPartitionName(Path tablePath, Path partitionPath, * Specify how deep the search goes. * @throws IOException * Thrown if we can't get lists from the fs. - * @throws HiveException + * @throws HiveException */ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) throws IOException, HiveException { ConcurrentLinkedQueue basePaths = new ConcurrentLinkedQueue<>(); basePaths.add(basePath); - Set dirSet = Collections.newSetFromMap(new ConcurrentHashMap()); + Set dirSet = Collections.newSetFromMap(new ConcurrentHashMap()); // Here we just reuse the THREAD_COUNT configuration for // HIVE_MOVE_FILES_THREAD_COUNT - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors - .newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) - : null; + int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15); + + // Check if too low config is provided for move files. 2x CPU is reasonable max count. + poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, + Runtime.getRuntime().availableProcessors() * 2); + + // Fixed thread pool on need basis + final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor) + Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build()) : null; + if (pool == null) { LOG.debug("Not-using threaded version of MSCK-GetPaths"); } else { LOG.debug("Using threaded version of MSCK-GetPaths with number of threads " - + ((ThreadPoolExecutor) pool).getPoolSize()); + + pool.getMaximumPoolSize()); } checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth); if (pool != null) { @@ -427,11 +433,28 @@ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) } // process the basePaths in parallel and then the next level of basePaths - private void checkPartitionDirs(final ExecutorService pool, + private void checkPartitionDirs(final ThreadPoolExecutor pool, final ConcurrentLinkedQueue basePaths, final Set allDirs, final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { final ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); - if (null == pool) { + + // Check if thread pool can be used. + boolean useThreadPool = false; + if (pool != null) { + synchronized (pool) { + if (LOG.isDebugEnabled()) { + LOG.debug("threadpool active count:" + pool.getActiveCount() + + ", max:" + pool.getMaximumPoolSize()); + } + + // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here. + if (pool.getActiveCount() < pool.getMaximumPoolSize()) { + useThreadPool = true; + } + } + } + + if (null == pool || !useThreadPool) { for (final Path path : basePaths) { FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean fileFound = false; diff --git ql/src/test/queries/clientpositive/msck_repair_batchsize.q ql/src/test/queries/clientpositive/msck_repair_batchsize.q index 06e4507..e56e97a 100644 --- ql/src/test/queries/clientpositive/msck_repair_batchsize.q +++ ql/src/test/queries/clientpositive/msck_repair_batchsize.q @@ -20,3 +20,13 @@ MSCK REPAIR TABLE default.repairtable; MSCK TABLE repairtable; DROP TABLE default.repairtable; + + +dfs ${system:test.dfs.mkdir} -p ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b; +CREATE TABLE `repairtable`( `col` string) PARTITIONED BY ( `p1` string, `p2` string) location '${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/'; + +dfs -touchz ${system:test.tmp.dir}/apps/hive/warehouse/test.db/repairtable/p1=c/p2=a/p3=b/datafile; +set hive.mv.files.thread=1; +MSCK TABLE repairtable; + +DROP TABLE default.repairtable; diff --git ql/src/test/results/clientpositive/msck_repair_batchsize.q.out ql/src/test/results/clientpositive/msck_repair_batchsize.q.out index a0180b7..ba99024 100644 --- ql/src/test/results/clientpositive/msck_repair_batchsize.q.out +++ ql/src/test/results/clientpositive/msck_repair_batchsize.q.out @@ -47,3 +47,28 @@ POSTHOOK: query: DROP TABLE default.repairtable POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@repairtable POSTHOOK: Output: default@repairtable +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@repairtable +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@repairtable +PREHOOK: query: MSCK TABLE repairtable +PREHOOK: type: MSCK +PREHOOK: Output: default@repairtable +POSTHOOK: query: MSCK TABLE repairtable +POSTHOOK: type: MSCK +POSTHOOK: Output: default@repairtable +Partitions not in metastore: repairtable:p1=c/p2=a +PREHOOK: query: DROP TABLE default.repairtable +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@repairtable +PREHOOK: Output: default@repairtable +POSTHOOK: query: DROP TABLE default.repairtable +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@repairtable +POSTHOOK: Output: default@repairtable