diff --git common/src/java/org/apache/hadoop/hive/common/FileUtils.java common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 23fcc8a..f6aea2b 100644 --- common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -536,7 +536,11 @@ public static boolean mkdir(FileSystem fs, Path f, boolean inheritPerms, Configu } else { try { //set on the entire subtree - HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true); + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, + new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, + firstNonExistentParent, true); + } } catch (Exception e) { LOG.warn("Error setting permissions of " + firstNonExistentParent, e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index ec5d693..89893eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -245,7 +245,9 @@ private Path getStagingDir(Path inputPath, boolean mkdir) { if (mkdir) { try { - if (!FileUtils.mkdir(fs, dir, true, conf)) { + boolean inheritPerms = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + if (!FileUtils.mkdir(fs, dir, inheritPerms, conf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString() + "'"); } @@ -380,10 +382,9 @@ public void removeMaterializedCTEs() { Path location = materializedTable.getDataLocation(); try { FileSystem fs = location.getFileSystem(conf); - if (fs.exists(location)) { - fs.delete(location, true); - LOG.info("Removed " + location + " for materialized " + materializedTable.getTableName()); - } + boolean status = fs.delete(location, true); + LOG.info("Removed " + location + " for materialized " + + materializedTable.getTableName() + ", status=" + status); } catch (IOException e) { // ignore LOG.warn("Error removing " + location + " for materialized " + materializedTable.getTableName() + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index cadda8f..fd25978 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1388,10 +1388,11 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { - if (fs.exists(tmpPath)) { + FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( + tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); + if(statuses != null && statuses.length > 0) { // remove any tmp file or double-committed output files - List emptyBuckets = - Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf); + List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); @@ -1462,21 +1463,31 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I removeTempOrDuplicateFiles(fs, path, null,null,null); } + public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + if (path == null) { + return null; + } + FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, + ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); + return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + } + /** * Remove all temporary files and duplicate (double-committed) files from a given directory. * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, + public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { - if (path == null) { + if (fileStats == null) { return null; } List result = new ArrayList(); HashMap taskIDToFile = null; if (dpCtx != null) { - FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); + FileStatus parts[] = fileStats; for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() @@ -1512,7 +1523,10 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } } else { - FileStatus[] items = fs.listStatus(path); + FileStatus[] items = fileStats; + if (items.length == 0) { + return result; + } taskIDToFile = removeTempOrDuplicateFiles(items, fs); if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null && (conf.getTable().getNumBuckets() > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { @@ -2253,12 +2267,13 @@ public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception { FileSystem inpFs = dirPath.getFileSystem(job); - - if (inpFs.exists(dirPath)) { + try { FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER); if (fStats.length > 0) { return false; } + } catch(FileNotFoundException fnf) { + return true; } return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9d927bd..4fa9fbb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3193,18 +3193,16 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, boolean isOldPathUnderDestf = false; try { FileSystem fs2 = oldPath.getFileSystem(conf); - if (fs2.exists(oldPath)) { - // Do not delete oldPath if: - // - destf is subdir of oldPath - //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2))) - isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2); - if (isOldPathUnderDestf) { - // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its - // existing content might result in incorrect (extra) data. - // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is - // not the destf or its subdir? - oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf); - } + // Do not delete oldPath if: + // - destf is subdir of oldPath + //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2))) + isOldPathUnderDestf = FileUtils.isSubDir(oldPath, destf, fs2); + if (isOldPathUnderDestf) { + // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its + // existing content might result in incorrect (extra) data. + // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is + // not the destf or its subdir? + oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf); } } catch (IOException e) { if (isOldPathUnderDestf) { @@ -3223,7 +3221,9 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates // destf with inherited permissions - boolean destfExist = FileUtils.mkdir(destFs, destf, true, conf); + boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars + .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf); if(!destfExist) { throw new IOException("Directory " + destf.toString() + " does not exist and could not be created."); @@ -3255,16 +3255,18 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, /** * Trashes or deletes all files under a directory. Leaves the directory as is. * @param fs FileSystem to use - * @param f path of directory + * @param statuses fileStatuses of files to be deleted * @param conf hive configuration - * @param forceDelete whether to force delete files if trashing does not succeed * @return true if deletion successful * @throws IOException */ - private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) + private boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf) throws IOException { - FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean result = true; + + if (statuses == null || statuses.length == 0) { + return false; + } final List> futures = new LinkedList<>(); final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), @@ -3298,6 +3300,20 @@ public Boolean call() throws Exception { return result; } + /** + * Trashes or deletes all files under a directory. Leaves the directory as is. + * @param fs FileSystem to use + * @param f path of directory + * @param conf hive configuration + * @return true if deletion successful + * @throws IOException + */ + private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) + throws IOException { + FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); + return trashFiles(fs, statuses, conf); + } + public static boolean isHadoop1() { return ShimLoader.getMajorVersion().startsWith("0.20"); }