diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 0a466e4..50c4a96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -102,7 +102,7 @@ private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, fs); } - if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) { + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { try { if (deletePath != null) { fs.delete(deletePath, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5840802..396c070 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2517,16 +2517,16 @@ private static boolean destExists(List> result, Path proposed) { return false; } - private static boolean isSubDir(Path srcf, Path destf, FileSystem fs, boolean isSrcLocal){ + private static boolean isSubDir(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs, boolean isSrcLocal) { if (srcf == null) { LOG.debug("The source path is null for isSubDir method."); return false; } - String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, fs); - String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, fs); + String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, srcFs); + String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, destFs); - boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(fs.getConf(), ConfVars.HIVE_IN_TEST)); + boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(srcFs.getConf(), ConfVars.HIVE_IN_TEST)); // In the automation, the data warehouse is the local file system based. LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2); if (isInTest) { @@ -2565,15 +2565,27 @@ private static String getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileS //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission public static boolean moveFile(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException { + boolean replace, boolean isSrcLocal) throws HiveException { boolean success = false; + FileSystem srcFs, destFs; + try { + destFs = destf.getFileSystem(conf); + } catch (IOException e) { + LOG.error(e); + throw new HiveException(e.getMessage(), e); + } + try { + srcFs = srcf.getFileSystem(conf); + } catch (IOException e) { + LOG.error(e); + throw new HiveException(e.getMessage(), e); + } //needed for perm inheritance. boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); HadoopShims shims = ShimLoader.getHadoopShims(); HadoopShims.HdfsFileStatus destStatus = null; - HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(); // If source path is a subdirectory of the destination path: // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; @@ -2581,11 +2593,11 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, // (1) Do not delete the dest dir before doing the move operation. // (2) It is assumed that subdir and dir are in same encryption zone. // (3) Move individual files from scr dir to dest dir. - boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal); + boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal); try { if (inheritPerms || replace) { try{ - destStatus = shims.getFullFileStatus(conf, fs, destf.getParent()); + destStatus = shims.getFullFileStatus(conf, destFs, destf.getParent()); //if destf is an existing directory: //if replace is true, delete followed by rename(mv) is equivalent to replace //if replace is false, rename (mv) actually move the src under dest dir @@ -2593,20 +2605,22 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, // to delete the file first if (replace && !destIsSubDir) { LOG.debug("The path " + destf.toString() + " is deleted"); - fs.delete(destf, true); + destFs.delete(destf, true); } } catch (FileNotFoundException ignore) { //if dest dir does not exist, any re if (inheritPerms) { - destStatus = shims.getFullFileStatus(conf, fs, destf.getParent()); + destStatus = shims.getFullFileStatus(conf, destFs, destf.getParent()); } } } - if (!isSrcLocal) { - // For NOT local src file, rename the file - if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf)) - && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf)) - { + if (isSrcLocal) { + // For local src file, copy to hdfs + destFs.copyFromLocalFile(srcf, destf); + success = true; + } else { + if (needToCopy(srcf, destf, srcFs, destFs)) { + //copy if across file system or encryption zones. LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, true, // delete source @@ -2614,7 +2628,7 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, conf); } else { if (destIsSubDir) { - FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); if (srcs.length == 0) { success = true; // Nothing to move. } @@ -2629,13 +2643,9 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, } } } else { - success = fs.rename(srcf, destf); + success = destFs.rename(srcf, destf); } } - } else { - // For local src file, copy to hdfs - fs.copyFromLocalFile(srcf, destf); - success = true; } LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString() @@ -2646,7 +2656,7 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, if (success && inheritPerms) { try { - ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf); + ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, destFs, destf); } catch (IOException e) { LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); } @@ -2655,6 +2665,23 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, } /** + * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename. + * TODO- consider if need to do this for different file authority. + */ + static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException, IOException { + //Check if different FileSystems + if (!srcFs.getClass().equals(destFs.getClass())) { + return true; + } + + //Check if different encryption zones + HadoopShims.HdfsFileStatus destStatus = null; + HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(); + return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf)) + && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf); + } + + /** * Copy files. This handles building the mapping for buckets and such between the source and * destination * @param conf Configuration object @@ -2708,7 +2735,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, try { for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) { + if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) { throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); } @@ -2889,7 +2916,7 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o inheritFromTable(tablePath, destParent, conf, destFs); } } - if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { + if (!moveFile(conf, sdpair[0], sdpair[1], true, isSrcLocal)) { throw new IOException("Unable to move file/directory from " + sdpair[0] + " to " + sdpair[1]); } @@ -2908,7 +2935,7 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o // srcs must be a list of files -- ensured by LoadSemanticAnalyzer for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, + if (!moveFile(conf, sdpair[0], sdpair[1], true, isSrcLocal)) { throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]); }