diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b37558c063..e8612e6647 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -91,14 +91,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.HiveInterruptCallback; -import org.apache.hadoop.hive.common.HiveInterruptUtils; -import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -1182,6 +1175,14 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, } } + private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst, + Set filesToMove) throws IOException, HiveException { + Set filePaths = new HashSet<>(); + for (FileStatus fstatus : filesToMove) { + filePaths.add(fstatus.getPath()); + } + moveSpecifiedFiles(fs, src, dst, filePaths); + } /** * Moves files from src to dst if it is within the specified set of paths * @param fs @@ -1523,10 +1524,11 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // 3) Rename/move the temp directory to specPath FileSystem fs = specPath.getFileSystem(hconf); + boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { - if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath)) { + if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) { // 1) Rename tmpPath to a new directory name to prevent additional files // from being added by runaway processes. Path tmpPathOriginal = tmpPath; @@ -1561,9 +1563,11 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // move to the file destination Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); - if(shouldAvoidRename(conf, hconf)){ + if (shouldAvoidRename(conf, hconf)) { LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); conf.getFilesToFetch().addAll(filesKept); + } else if (isBlobStorage) { + Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); } else { perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); Utilities.renameOrMoveFiles(fs, tmpPath, specPath);