diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b07a37a..b08722a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -102,7 +102,7 @@ private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, fs); } - if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) { + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { try { if (deletePath != null) { fs.delete(deletePath, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a8af788..c2742dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2517,16 +2517,16 @@ private static boolean destExists(List> result, Path proposed) { return false; } - private static boolean isSubDir(Path srcf, Path destf, FileSystem fs, boolean isSrcLocal){ + private static boolean isSubDir(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs, boolean isSrcLocal){ if (srcf == null) { LOG.debug("The source path is null for isSubDir method."); return false; } - String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, fs); - String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, fs); + String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, srcFs); + String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, destFs); - boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(fs.getConf(), ConfVars.HIVE_IN_TEST)); + boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(srcFs.getConf(), ConfVars.HIVE_IN_TEST)); // In the automation, the data warehouse is the local file system based. LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2); if (isInTest) { @@ -2565,8 +2565,21 @@ private static String getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileS //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission public static boolean moveFile(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException { + boolean replace, boolean isSrcLocal) throws HiveException { boolean success = false; + FileSystem srcFs, destFs; + try { + destFs = destf.getFileSystem(conf); + } catch (IOException e) { + LOG.error(e); + throw new HiveException(e.getMessage(), e); + } + try { + srcFs = srcf.getFileSystem(conf); + } catch (IOException e) { + LOG.error(e); + throw new HiveException(e.getMessage(), e); + } //needed for perm inheritance. boolean inheritPerms = HiveConf.getBoolVar(conf, @@ -2581,11 +2594,11 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, // (1) Do not delete the dest dir before doing the move operation. // (2) It is assumed that subdir and dir are in same encryption zone. // (3) Move individual files from scr dir to dest dir. - boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal); + boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal); try { if (inheritPerms || replace) { try{ - destStatus = shims.getFullFileStatus(conf, fs, destf.getParent()); + destStatus = shims.getFullFileStatus(conf, destFs, destf.getParent()); //if destf is an existing directory: //if replace is true, delete followed by rename(mv) is equivalent to replace //if replace is false, rename (mv) actually move the src under dest dir @@ -2593,33 +2606,35 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, // to delete the file first if (replace && !destIsSubDir) { LOG.debug("The path " + destf.toString() + " is deleted"); - fs.delete(destf, true); + destFs.delete(destf, true); } } catch (FileNotFoundException ignore) { //if dest dir does not exist, any re if (inheritPerms) { - destStatus = shims.getFullFileStatus(conf, fs, destf.getParent()); + destStatus = shims.getFullFileStatus(conf, destFs, destf.getParent()); } } } - if (!isSrcLocal) { - // For NOT local src file, rename the file - if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf)) - && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf)) - { + if (isSrcLocal) { + // For local src file, copy to hdfs + destFs.copyFromLocalFile(srcf, destf); + success = true; + } else { + //copy if across file system or encryption zones. + if (needToCopy(srcf, destf, srcFs, destFs)) { LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); - success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, + success = FileUtils.copy(srcFs, srcf, destFs, destf, true, // delete source replace, // overwrite destination conf); } else { if (destIsSubDir) { - FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); if (srcs.length == 0) { success = true; // Nothing to move. } for (FileStatus status : srcs) { - success = FileUtils.copy(srcf.getFileSystem(conf), status.getPath(), destf.getFileSystem(conf), destf, + success = FileUtils.copy(srcFs, status.getPath(), destFs, destf, true, // delete source replace, // overwrite destination conf); @@ -2629,13 +2644,9 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, } } } else { - success = fs.rename(srcf, destf); + success = destFs.rename(srcf, destf); } } - } else { - // For local src file, copy to hdfs - fs.copyFromLocalFile(srcf, destf); - success = true; } LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString() @@ -2646,7 +2657,7 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, if (success && inheritPerms) { try { - ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf); + ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, destFs, destf); } catch (IOException e) { LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); } @@ -2655,6 +2666,25 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, } /** + * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename. + * TODO- consider if need to do this for different file authority. + */ + static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException, IOException { + //Check if different FileSystems + if (!srcFs.getClass().equals(destFs.getClass())) { + return true; + } + + //Check if different encryption zones + HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs ); + HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs); + return srcHdfsEncryptionShim != null + && destHdfsEncryptionShim != null + && (srcHdfsEncryptionShim.isPathEncrypted(srcf) || destHdfsEncryptionShim.isPathEncrypted(destf)) + && !srcHdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf, destHdfsEncryptionShim); + } + + /** * Copy files. This handles building the mapping for buckets and such between the source and * destination * @param conf Configuration object @@ -2708,7 +2738,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, try { for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) { + if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) { throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); } @@ -2889,7 +2919,7 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o inheritFromTable(tablePath, destParent, conf, destFs); } } - if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) { + if (!moveFile(conf, sdpair[0], sdpair[1], true, isSrcLocal)) { throw new IOException("Unable to move file/directory from " + sdpair[0] + " to " + sdpair[1]); } @@ -2908,7 +2938,7 @@ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path o // srcs must be a list of files -- ensured by LoadSemanticAnalyzer for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, + if (!moveFile(conf, sdpair[0], sdpair[1], true, isSrcLocal)) { throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a52f2f2..925de47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1876,17 +1876,16 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException * @throws HiveException If an error occurs while checking for encryption */ private boolean isPathEncrypted(Path path) throws HiveException { - HadoopShims.HdfsEncryptionShim hdfsEncryptionShim; - hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(); - if (hdfsEncryptionShim != null) { - try { + try { + HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(path.getFileSystem(conf)); + if (hdfsEncryptionShim != null) { if (hdfsEncryptionShim.isPathEncrypted(path)) { return true; } - } catch (Exception e) { - throw new HiveException("Unable to determine if " + path + " is encrypted: " + e, e); } + } catch (Exception e) { + throw new HiveException("Unable to determine if " + path + " is encrypted: " + e, e); } return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 540bafd..4ba28b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.UUID; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -201,7 +202,7 @@ /** * Gets information about HDFS encryption */ - private HadoopShims.HdfsEncryptionShim hdfsEncryptionShim; + private Map hdfsEncryptionShims = Maps.newHashMap(); /** * Lineage state. @@ -414,12 +415,24 @@ public boolean isAutoCommit() { } public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException { - if (hdfsEncryptionShim == null) { + try { + return getHdfsEncryptionShim(FileSystem.get(conf)); + } + catch(HiveException hiveException) { + throw hiveException; + } + catch(Exception exception) { + throw new HiveException(exception); + } + } + + public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim(FileSystem fs) throws HiveException { + if (!hdfsEncryptionShims.containsKey(fs.getUri())) { try { - FileSystem fs = FileSystem.get(conf); if ("hdfs".equals(fs.getUri().getScheme())) { - hdfsEncryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + hdfsEncryptionShims.put(fs.getUri(), ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf)); } else { + LOG.debug("Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem."); LOG.info("Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem."); } } catch (Exception e) { @@ -427,7 +440,7 @@ public boolean isAutoCommit() { } } - return hdfsEncryptionShim; + return hdfsEncryptionShims.get(fs.getUri()); } // SessionState is not available in runtime and Hive.get().getConf() is not safe to call diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 19324b8..45574e0 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1223,11 +1223,11 @@ public boolean isPathEncrypted(Path path) throws IOException { @Override public boolean arePathsOnSameEncryptionZone(Path path1, Path path2) throws IOException { - EncryptionZone zone1, zone2; - - zone1 = hdfsAdmin.getEncryptionZoneForPath(path1); - zone2 = hdfsAdmin.getEncryptionZoneForPath(path2); + return equivalentEncryptionZones(hdfsAdmin.getEncryptionZoneForPath(path1), + hdfsAdmin.getEncryptionZoneForPath(path2)); + } + private boolean equivalentEncryptionZones(EncryptionZone zone1, EncryptionZone zone2) { if (zone1 == null && zone2 == null) { return true; } else if (zone1 == null || zone2 == null) { @@ -1238,6 +1238,19 @@ public boolean arePathsOnSameEncryptionZone(Path path1, Path path2) throws IOExc } @Override + public boolean arePathsOnSameEncryptionZone(Path path1, Path path2, + HadoopShims.HdfsEncryptionShim encryptionShim2) throws IOException { + if (!(encryptionShim2 instanceof Hadoop23Shims.HdfsEncryptionShim)) { + LOG.warn("EncryptionShim for path2 (" + path2 + ") is of unexpected type: " + encryptionShim2.getClass() + + ". Assuming path2 is on the same EncryptionZone as path1(" + path1 + ")."); + return true; + } + + return equivalentEncryptionZones(hdfsAdmin.getEncryptionZoneForPath(path1), + ((HdfsEncryptionShim)encryptionShim2).hdfsAdmin.getEncryptionZoneForPath(path2)); + } + + @Override public int comparePathKeyStrength(Path path1, Path path2) throws IOException { EncryptionZone zone1, zone2; diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index b89b4c3..1a73433 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -623,6 +623,17 @@ public void checkFileAccess(FileSystem fs, FileStatus status, FsAction action) public boolean arePathsOnSameEncryptionZone(Path path1, Path path2) throws IOException; /** + * Checks if two HDFS paths are on the same encrypted or unencrypted zone. + * + * @param path1 Path to HDFS file system + * @param path2 Path to HDFS file system + * @param encryptionShim2 The encryption-shim corresponding to path2. + * @return True if both paths are in the same zone; False otherwise. + * @throws IOException If an error occurred attempting to get encryption information + */ + public boolean arePathsOnSameEncryptionZone(Path path1, Path path2, HdfsEncryptionShim encryptionShim2) throws IOException; + + /** * Compares two encrypted path strengths. * * @param path1 HDFS path to compare. @@ -677,6 +688,12 @@ public boolean arePathsOnSameEncryptionZone(Path path1, Path path2) throws IOExc } @Override + public boolean arePathsOnSameEncryptionZone(Path path1, Path path2, HdfsEncryptionShim encryptionShim2) throws IOException { + // Not supported. + return true; + } + + @Override public int comparePathKeyStrength(Path path1, Path path2) throws IOException { /* not supported */ return 0;