diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e9966e6364..b9ff12c336 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -4357,8 +4356,8 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS } } - for (Path path : directInsertDirectories) { - cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed); + if (!directInsertDirectories.isEmpty()) { + cleanDirectInsertDirectoriesConcurrently(directInsertDirectories, committed, fs, hconf, unionSuffix, lbLevels); } if (!committed.isEmpty()) { @@ -4393,14 +4392,62 @@ public static void handleDirectInsertTableFinalPath(Path specPath, String unionS } } + private static void cleanDirectInsertDirectoriesConcurrently( + List directInsertDirectories, Set committed, FileSystem fs, Configuration hconf, String unionSuffix, int lbLevels) + throws IOException, HiveException { + + ExecutorService executor = createCleanTaskExecutor(hconf, directInsertDirectories.size()); + List> cleanTaskFutures = submitCleanTasksForExecution(executor, directInsertDirectories, committed, fs, unionSuffix, lbLevels); + waitForCleanTasksToComplete(executor, cleanTaskFutures); + } + + private static ExecutorService createCleanTaskExecutor(Configuration hconf, int numOfDirectories) { + int threadCount = Math.min(numOfDirectories, HiveConf.getIntVar(hconf, ConfVars.HIVE_MOVE_FILES_THREAD_COUNT)); + threadCount = threadCount <= 0 ? 1 : threadCount; + return Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Clean-Direct-Insert-Dirs-Thread-%d").build()); + } + + private static List> submitCleanTasksForExecution(ExecutorService executor, List directInsertDirectories, + Set committed, FileSystem fs, String unionSuffix, int lbLevels) { + List> cleanTaskFutures = new ArrayList<>(directInsertDirectories.size()); + for (Path directory : directInsertDirectories) { + Future cleanTaskFuture = executor.submit(() -> { + cleanDirectInsertDirectory(directory, fs, unionSuffix, lbLevels, committed); + return null; + }); + cleanTaskFutures.add(cleanTaskFuture); + } + return cleanTaskFutures; + } + + private static void waitForCleanTasksToComplete(ExecutorService executor, List> cleanTaskFutures) + throws IOException, HiveException { + executor.shutdown(); + for (Future cleanFuture : cleanTaskFutures) { + try { + cleanFuture.get(); + } catch (InterruptedException | ExecutionException e) { + executor.shutdownNow(); + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + if (e.getCause() instanceof HiveException) { + throw (HiveException) e.getCause(); + } + } + } + } + + private static final class PathOnlyFileStatus extends FileStatus { public PathOnlyFileStatus(Path path) { super(0, true, 0, 0, 0, path); } } - private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, - int lbLevels, HashSet committed) throws IOException, HiveException { + private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, int lbLevels, Set committed) + throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); if (lbLevels > 0) {