diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a92ef3b499..e9966e6364 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -4356,7 +4357,9 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS } } - cleanDirectInsertDirectoriesConcurrently(directInsertDirectories, committed, fs, hconf, unionSuffix, lbLevels); + for (Path path : directInsertDirectories) { + cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed); + } if (!committed.isEmpty()) { throw new HiveException("The following files were committed but not found: " + committed); @@ -4390,61 +4393,14 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS } } - private static void cleanDirectInsertDirectoriesConcurrently( - List directInsertDirectories, Set committed, FileSystem fs, Configuration hconf, String unionSuffix, int lbLevels) - throws IOException, HiveException { - - ExecutorService executor = createCleanTaskExecutor(hconf, directInsertDirectories.size()); - List> cleanTaskFutures = submitCleanTasksForExecution(executor, directInsertDirectories, committed, fs, unionSuffix, lbLevels); - waitForCleanTasksToComplete(executor, cleanTaskFutures); - } - - private static ExecutorService createCleanTaskExecutor(Configuration hconf, int numOfDirectories) { - int threadCount = Math.min(numOfDirectories, HiveConf.getIntVar(hconf, ConfVars.HIVE_MOVE_FILES_THREAD_COUNT)); - return Executors.newFixedThreadPool(threadCount, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Clean-Direct-Insert-Dirs-Thread-%d").build()); - } - - private static List> submitCleanTasksForExecution(ExecutorService executor, List directInsertDirectories, - Set committed, FileSystem fs, String unionSuffix, int lbLevels) { - List> cleanTaskFutures = new ArrayList<>(directInsertDirectories.size()); - for (Path directory : directInsertDirectories) { - Future cleanTaskFuture = executor.submit(() -> { - cleanDirectInsertDirectory(directory, fs, unionSuffix, lbLevels, committed); - return null; - }); - cleanTaskFutures.add(cleanTaskFuture); - } - return cleanTaskFutures; - } - - private static void waitForCleanTasksToComplete(ExecutorService executor, List> cleanTaskFutures) - throws IOException, HiveException { - executor.shutdown(); - for (Future cleanFuture : cleanTaskFutures) { - try { - cleanFuture.get(); - } catch (InterruptedException | ExecutionException e) { - executor.shutdownNow(); - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - if (e.getCause() instanceof HiveException) { - throw (HiveException) e.getCause(); - } - } - } - } - - private static final class PathOnlyFileStatus extends FileStatus { public PathOnlyFileStatus(Path path) { super(0, true, 0, 0, 0, path); } } - private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, int lbLevels, Set committed) - throws IOException, HiveException { + private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, + int lbLevels, HashSet committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); if (lbLevels > 0) {