diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 3deba27dd6..53ccd5a7ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -1598,18 +1599,61 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); } - private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { + private static FileStatus[] removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { + // listStatus is not required to be called to check if we need to delete the directory or not, + // delete does that internally. We are getting the file list as it is used by the caller. FileStatus[] items = fs.listStatus(path); - // remove empty directory since DP insert should not generate empty partitions. - // empty directories could be generated by crashed Task/ScriptOperator - if (items.length != 0) { - return false; + + // Remove empty directory since DP insert should not generate empty partitions. + // Empty directories could be generated by crashed Task/ScriptOperator. + if (items.length == 0) { + // delete() returns false in only two conditions + // 1. Tried to delete root + // 2. The file wasn't actually there (or deleted by some other thread) + // So return value is not checked for delete. + fs.delete(path, true); } - if (!fs.delete(path, true)) { - LOG.error("Cannot delete empty directory {}", path); - throw new IOException("Cannot delete empty directory " + path); + return items; + } + + // Returns the list of non empty sub-directories, deletes the empty sub sub-directories. + private static Map getNonEmptySubDirs(FileSystem fs, Configuration hConf, FileStatus[] parts) + throws IOException { + int threadCount = hConf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15); + final ExecutorService pool = (threadCount <= 0 ? null : + Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder().setDaemon(true).setNameFormat( + "Remove-Temp-%d").build())); + Map partStatusMap = new ConcurrentHashMap<>(); + List> futures = new LinkedList<>(); + + for (FileStatus part : parts) { + Path path = part.getPath(); + if (pool != null) { + futures.add(pool.submit(() -> { + FileStatus[] items = removeEmptyDpDirectory(fs, path); + partStatusMap.put(path, items); + return null; + })); + } else { + partStatusMap.put(path, removeEmptyDpDirectory(fs, path)); + } } - return true; + + if (null != pool) { + pool.shutdown(); + try { + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception in getting dir status", e); + for (Future future : futures) { + future.cancel(true); + } + throw new IOException(e); + } + } + return partStatusMap; } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, @@ -1621,33 +1665,32 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I List result = new ArrayList(); HashMap taskIDToFile = null; if (dpLevels > 0) { - FileStatus[] parts = fileStats; - for (int i = 0; i < parts.length; ++i) { - assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath() - + " is not a directory"; - Path path = parts[i].getPath(); - if (removeEmptyDpDirectory(fs, path)) { - parts[i] = null; + Map partStatusMap = getNonEmptySubDirs(fs, hconf, fileStats); + for (int i = 0; i < fileStats.length; ++i) { + Path path = fileStats[i].getPath(); + assert fileStats[i].isDirectory() : "dynamic partition " + path + " is not a directory"; + FileStatus[] items = partStatusMap.get(path); + if (items.length == 0) { + fileStats[i] = null; continue; } if (isMmTable) { - Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { - throw new IOException("Unexpected non-MM directory name " + mmDir); + if (!path.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { + throw new IOException("Unexpected non-MM directory name " + path); } - Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); + Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", path); if (!StringUtils.isEmpty(unionSuffix)) { - path = new Path(path, unionSuffix); - if (!fs.exists(path)) { + try { + items = fs.listStatus(new Path(path, unionSuffix)); + } catch (FileNotFoundException e) { continue; } } } - FileStatus[] items = fs.listStatus(path); taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1656,11 +1699,10 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); } } else if (isMmTable && !StringUtils.isEmpty(unionSuffix)) { - FileStatus[] items = fileStats; if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); + Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( fs.listStatus(new Path(mmDir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { @@ -1668,17 +1710,16 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result); } else { - FileStatus[] items = fileStats; - if (items.length == 0) { + if (fileStats.length == 0) { return result; } if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); + Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1755,7 +1796,8 @@ private static void addBucketFileIfMissing(List result, Path onePath = one.getPath(); Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles deleting {}", onePath); if (!fs.delete(onePath, true)) { - throw new IOException("Unable to delete tmp file: " + onePath); + // If file is already deleted by some other task, just ignore the failure with a warning. + LOG.warn("Unable to delete tmp file: " + onePath); } } else { // This would be a single file. See if we need to remove it. @@ -1850,7 +1892,7 @@ private static long getFileSizeRecursively(FileSystem fs, FileStatus src) size += getFileSizeRecursively(fs, file); } } catch (IOException e) { - throw new IOException("Unable to fetch files in directory " + src.getPath()); + throw new IOException("Unable to fetch files in directory " + src.getPath(), e); } } else { size = src.getLen();