diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 437266355a..1d32ba0172 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -92,6 +92,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; @@ -1101,6 +1102,43 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, } } + /** + * Moves files from src to dst if it is within the specified set of paths + * @param fs + * @param src + * @param dst + * @param filesToMove + * @throws IOException + * @throws HiveException + */ + private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set filesToMove) + throws IOException, HiveException { + if (!fs.exists(dst)) { + fs.mkdirs(dst); + } + + FileStatus[] files = fs.listStatus(src); + for (FileStatus file : files) { + if (filesToMove.contains(file.getPath())) { + Utilities.moveFile(fs, file, dst); + } else if (file.isDir()) { + // Traverse directory contents. + // Directory nesting for dst needs to match src. + Path nestedDstPath = new Path(dst, file.getPath().getName()); + Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove); + } + } + } + + private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst, + Set filesToMove) throws IOException, HiveException { + Set filePaths = new HashSet<>(); + for (FileStatus fstatus : filesToMove) { + filePaths.add(fstatus.getPath()); + } + moveSpecifiedFiles(fs, src, dst, filePaths); + } + private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException, HiveException { Path srcFilePath = file.getPath(); @@ -1356,7 +1394,6 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI private static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) { // we are avoiding rename/move only if following conditions are met // * execution engine is tez - // * query cache is disabled // * if it is select query if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){ @@ -1394,10 +1431,11 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // 3) Rename/move the temp directory to specPath FileSystem fs = specPath.getFileSystem(hconf); + boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { - if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath)) { + if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) { // 1) Rename tmpPath to a new directory name to prevent additional files // from being added by runaway processes. Path tmpPathOriginal = tmpPath; @@ -1435,6 +1473,8 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, if(shouldAvoidRename(conf, hconf)){ LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); conf.getFilesToFetch().addAll(filesKept); + } else if (isBlobStorage) { + Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); } else { perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); Utilities.renameOrMoveFiles(fs, tmpPath, specPath);