From 82c0e3f4890f51a50ce250cb94441607d5a751d7 Mon Sep 17 00:00:00 2001 From: Kamoor Date: Wed, 12 Oct 2016 16:01:29 -0400 Subject: [PATCH] HIVE-14925: MSCK failing on multithreaded execution --- .../hive/ql/metadata/HiveMetaStoreChecker.java | 154 ++++++++++----------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 047589a..e995179 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -426,91 +426,22 @@ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) // process the basePaths in parallel and then the next level of basePaths private void checkPartitionDirs(final ExecutorService pool, - final ConcurrentLinkedQueue basePaths, final Set allDirs, - final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { + final ConcurrentLinkedQueue basePaths, final Set allDirs, + final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { final ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); if (null == pool) { for (final Path path : basePaths) { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } - } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " + path.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); - } - } - } else { - allDirs.add(path); + new ProcessPath(allDirs, fs, depth, maxDepth, nextLevel, path).callSync(); + if (!nextLevel.isEmpty()) { + checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); } + } } else { + LOG.info("Threaded mode enabled"); final List> futures = new LinkedList<>(); for (final Path path : basePaths) { - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws Exception { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } - } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " - + path.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); - } - } - } else { - allDirs.add(path); - } - return null; - } - })); + futures.add(pool.submit(new ProcessPath(allDirs, fs, depth, maxDepth, nextLevel, path))); } for (Future future : futures) { try { @@ -526,4 +457,73 @@ public Void call() throws Exception { } } } + + class ProcessPath implements Callable{ + + Set allDirs; + FileSystem fs; + int depth; + int maxDepth; + ConcurrentLinkedQueue nextLevel; + Path path; + + ProcessPath(Set allDirs, + FileSystem fs, int depth, int maxDepth, ConcurrentLinkedQueue nextLevel, + Path path){ + this.allDirs = allDirs; + this.fs = fs; + this.depth = depth; + this.maxDepth = maxDepth; + this.nextLevel = nextLevel; + this.path = path; + } + + public Void callSync()throws HiveException{ + try{ + return call(); + }catch(Exception e){ + throw new HiveException(e); + } + } + + @Override + public Void call() throws Exception { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean fileFound = false; + for (FileStatus status : statuses) { + if (status.isDirectory()) { + nextLevel.add(status.getPath()); + } else { + fileFound = true; + } + } + if (depth != 0) { + // we are in the middle of the search and we find a file + if (fileFound) { + if ("throw".equals(HiveConf.getVar(conf, + HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { + throw new HiveException( + "MSCK finds a file rather than a folder when it searches for " + + path.toString()); + } else { + LOG.warn("MSCK finds a file rather than a folder when it searches for " + + path.toString()); + } + } + if (depth != maxDepth) { + // since nextLevel is empty, we are missing partition columns. + if ("throw".equals(HiveConf.getVar(conf, + HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { + throw new HiveException("MSCK is missing partition columns under " + + path.toString()); + } else { + LOG.warn("MSCK is missing partition columns under " + path.toString()); + } + } + } else { + allDirs.add(path); + } + return null; + } + } } -- 2.5.4 (Apple Git-61)