From 5b97e045aeae7b1e5d5cae56f9ba21853fbcc1df Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Mon, 9 May 2016 18:31:15 -0700 Subject: [PATCH] HIVE-13726 : Improve dynamic partition loading VI --- .../org/apache/hadoop/hive/common/FileUtils.java | 58 +++------------------- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 49 ++++++++++++++++-- 2 files changed, 54 insertions(+), 53 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index b65c35b..5cf4d39 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -50,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Collection of file manipulation utilities common across Hive. */ @@ -575,73 +574,32 @@ public static boolean copy(FileSystem srcFS, Path src, } /** - * Trashes or deletes all files under a directory. Leaves the directory as is. - * @param fs FileSystem to use - * @param f path of directory - * @param conf hive configuration - * @param forceDelete whether to force delete files if trashing does not succeed - * @return true if deletion successful - * @throws FileNotFoundException - * @throws IOException - */ - public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf, - boolean forceDelete) throws FileNotFoundException, IOException { - FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER); - boolean result = true; - for (FileStatus status : statuses) { - result = result & moveToTrash(fs, status.getPath(), conf, forceDelete); - } - return result; - } - - /** - * Move a particular file or directory to the trash. If for a certain reason the trashing fails - * it will force deletes the file or directory - * @param fs FileSystem to use - * @param f path of file or directory to move to trash. - * @param conf - * @return true if move successful - * @throws IOException - */ - public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) throws IOException { - return moveToTrash(fs, f, conf, true); - } - - /** * Move a particular file or directory to the trash. * @param fs FileSystem to use * @param f path of file or directory to move to trash. * @param conf - * @param forceDelete whether force delete the file or directory if trashing fails * @return true if move successful * @throws IOException */ - public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean forceDelete) + public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) throws IOException { LOG.debug("deleting " + f); - boolean result = false; try { result = Trash.moveToAppropriateTrash(fs, f, conf); if (result) { - LOG.info("Moved to trash: " + f); + LOG.trace("Moved to trash: " + f); return true; } } catch (IOException ioe) { - if (forceDelete) { - // for whatever failure reason including that trash has lower encryption zone - // retry with force delete - LOG.warn(ioe.getMessage() + "; Force to delete it."); - } else { - throw ioe; - } + // for whatever failure reason including that trash has lower encryption zone + // retry with force delete + LOG.warn(ioe.getMessage() + "; Force to delete it."); } - if (forceDelete) { - result = fs.delete(f, true); - if (!result) { - LOG.error("Failed to delete " + f); - } + result = fs.delete(f, true); + if (!result) { + LOG.error("Failed to delete " + f); } return result; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index b5e660b..dcfc2b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -44,6 +44,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -1528,7 +1529,7 @@ public Partition loadPartition(Path loadPath, Table tbl, } List newFiles = null; if (replace || (oldPart == null && !isAcid)) { - Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), + replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { @@ -3080,7 +3081,7 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, * @param isSrcLocal * If the source directory is LOCAL */ - protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal) throws HiveException { try { @@ -3114,7 +3115,7 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o // existing content might result in incorrect (extra) data. // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is // not the destf or its subdir? - oldPathDeleted = FileUtils.trashFilesUnderDir(fs2, oldPath, conf, true); + oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf); } } } catch (IOException e) { @@ -3162,6 +3163,48 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o } } + + /** + * Trashes or deletes all files under a directory. Leaves the directory as is. + * @param fs FileSystem to use + * @param f path of directory + * @param conf hive configuration + * @param forceDelete whether to force delete files if trashing does not succeed + * @return true if deletion successful + * @throws IOException + */ + private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) + throws IOException { + FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean result = true; + final List> futures = new LinkedList<>(); + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()); + final SessionState parentSession = SessionState.get(); + for (final FileStatus status : statuses) { + futures.add(pool.submit(new Callable() { + + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf); + } + })); + } + pool.shutdown(); + for (Future future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete: ",e); + pool.shutdownNow(); + throw new IOException(e); + } + } + return result; + } + public static boolean isHadoop1() { return ShimLoader.getMajorVersion().startsWith("0.20"); } -- 1.7.12.4 (Apple Git-37)