diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 6ade76d0c2..c86f1f4fd4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -175,19 +175,19 @@ public void testRecyclePartTable() throws Exception { // verify cm.recycle(db, table, part) api moves file to cmroot dir int ret = cm.recycle(part1Path, RecycleType.MOVE, false); Assert.assertEquals(ret, 1); - Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, part1Path.getName(), path1Chksum); + Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, part1Path.getName(), path1Chksum, cmroot.toString()); assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); // Verify dropPartition recycle part files client.dropPartition(dbName, tblName, Arrays.asList("20160102")); assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); - Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, part2Path.getName(), path2Chksum); + Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, part2Path.getName(), path2Chksum, cmroot.toString()); assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); // Verify dropTable recycle partition files client.dropTable(dbName, tblName); assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); - Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, part3Path.getName(), path3Chksum); + Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, part3Path.getName(), path3Chksum, cmroot.toString()); assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); client.dropDatabase(dbName, true, true); @@ -246,17 +246,17 @@ public void testRecycleNonPartTable() throws Exception { cm.recycle(filePath1, RecycleType.MOVE, false); assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); - Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, filePath1.getName(), fileChksum1); + Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, filePath1.getName(), fileChksum1, cmroot.toString()); assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); // Verify dropTable recycle table files client.dropTable(dbName, tblName); - Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, filePath2.getName(), fileChksum2); + Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, filePath2.getName(), fileChksum2, cmroot.toString()); assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); - Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, filePath3.getName(), fileChksum3); + Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, filePath3.getName(), fileChksum3, cmroot.toString()); assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); @@ -298,17 +298,21 @@ public void testClearer() throws Exception { ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, RecycleType.MOVE, false); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, RecycleType.MOVE, true); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31))); - assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32))); - - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32), now - 86400*1000*2, now - 86400*1000*2); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString()))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()))); + + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString()), + now - 86400 * 1000 * 2, now - 86400 * 1000 * 2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString()), + now - 86400 * 1000 * 2, now - 86400 * 1000 * 2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString()), + now - 86400 * 1000 * 2, now - 86400 * 1000 * 2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()), + now - 86400 * 1000 * 2, now - 86400 * 1000 * 2); ReplChangeManager.scheduleCMClearer(hiveConf); @@ -321,12 +325,12 @@ public void testClearer() throws Exception { if (end - start > 5000) { Assert.fail("timeout, cmroot has not been cleared"); } - if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11)) && - fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21)) && - fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31)) && - !fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32))) { + if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, part11.getName(), fileChksum11, cmroot.toString())) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, part12.getName(), fileChksum12, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part21.getName(), fileChksum21, cmroot.toString())) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, part22.getName(), fileChksum22, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part31.getName(), fileChksum31, cmroot.toString())) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, part32.getName(), fileChksum32, cmroot.toString()))) { cleared = true; } } while (!cleared); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index de270cfcdb..f30bc9ac08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -81,9 +81,9 @@ protected int execute(DriverContext driverContext) { // This should only be true for copy tasks created from functions, otherwise there should never // be a CM uri in the from path. if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { - String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); - ReplChangeManager.FileInfo sourceInfo = ReplChangeManager - .getFileInfo(new Path(result[0]), result[1], conf); + String[] result = ReplChangeManager.getFileWithChksumAndCMRootURIFromURI(fromPath.toString()); + ReplChangeManager.FileInfo sourceInfo = + ReplChangeManager.getFileInfo(new Path(result[0]), result[1], result[2], conf); if (FileUtils.copy( sourceInfo.getSrcFs(), sourceInfo.getSourcePath(), dstFs, toPath, false, false, conf)) { @@ -187,14 +187,14 @@ protected int execute(DriverContext driverContext) { while ((line = br.readLine()) != null) { LOG.debug("ReplCopyTask :_filesReadLine: {}", line); - String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); + String[] fileWithChksumAndCMRootURI = ReplChangeManager.getFileWithChksumAndCMRootURIFromURI(line); try { - ReplChangeManager.FileInfo f = ReplChangeManager - .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf); + ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(fileWithChksumAndCMRootURI[0]), + fileWithChksumAndCMRootURI[1], fileWithChksumAndCMRootURI[2], conf); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception - LOG.warn("Cannot find {} in source repo or cmroot", fileWithChksum[0]); + LOG.warn("Cannot find {} in source repo or cmroot", fileWithChksumAndCMRootURI[0]); throw new IOException(e.getMessage()); } // Note - we need srcFs rather than fs, because it is possible that the _files lists files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index f7c90409b7..2f9dd76bd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -188,7 +188,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri) .addDescendant(destinationDbName.toLowerCase()) .addDescendant(metadata.function.getFunctionName().toLowerCase()) .addDescendant(String.valueOf(System.nanoTime())) - .addDescendant(ReplChangeManager.getFileWithChksumFromURI(split[split.length - 1])[0]) + .addDescendant(ReplChangeManager.getFileWithChksumAndCMRootURIFromURI(split[split.length - 1])[0]) .build(), FileSystem.get(context.hiveConf) ); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 7c1d5f5cca..b1d19c1982 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -57,6 +57,7 @@ private static final String ORIG_LOC_TAG = "user.original-loc"; static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; private static final String URI_FRAGMENT_SEPARATOR = "#"; + private static final String URI_CMROOTURI_SEPARATOR = ";"; public enum RecycleType { MOVE, @@ -178,7 +179,7 @@ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOExcept } } else { String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting @@ -281,19 +282,23 @@ static public void setCmRoot(Path cmRoot) { * with the original location plus checksum. * @param conf * @param name original filename + * @param srcCMRootURI full uri of the CM root on source * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} * @return Path */ - static Path getCMPath(Configuration conf, String name, String checkSum) { + static Path getCMPath(Configuration conf, String name, String srcCMRootURI, String checkSum) { + String newFileName = getCMFileName(conf, name, checkSum); + return new Path(srcCMRootURI, newFileName); + } + + static String getCMFileName(Configuration conf, String name, String checkSum) { String newFileName = name + "_" + checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); - if (newFileName.length() > maxLength) { - newFileName = newFileName.substring(0, maxLength-1); + newFileName = newFileName.substring(0, maxLength - 1); } - - return new Path(cmroot, newFileName); + return newFileName; } /*** @@ -304,15 +309,15 @@ static Path getCMPath(Configuration conf, String name, String checkSum) { * @param conf * @return Corresponding FileInfo object */ - public static FileInfo getFileInfo(Path src, String checksumString, Configuration conf) - throws MetaException { + public static FileInfo getFileInfo(Path src, String checksumString, String srcCMRootURI, Configuration conf) + throws MetaException { try { FileSystem srcFs = src.getFileSystem(conf); if (checksumString == null) { return new FileInfo(srcFs, src); } - Path cmPath = getCMPath(conf, src.getName(), checksumString); + Path cmPath = getCMPath(conf, src.getName(), srcCMRootURI, checksumString); if (!srcFs.exists(src)) { return new FileInfo(srcFs, src, cmPath, checksumString, false); } @@ -335,38 +340,44 @@ public static FileInfo getFileInfo(Path src, String checksumString, Configuratio } /*** - * Concatenate filename and checksum with "#" + * Concatenate filename, checksum and source cmroot uri * @param fileUriStr Filename string * @param fileChecksum Checksum string * @return Concatenated Uri string */ // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum as the format + // Currently using fileuri#checksum; as the format + // cmroot_uri is used at the target cluster to create the appropriate filesystem object, + // for the CM directory at the source static public String encodeFileUri(String fileUriStr, String fileChecksum) { if (fileChecksum != null) { - return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum; + return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum + URI_CMROOTURI_SEPARATOR + cmroot.toString(); } else { return fileUriStr; } } - + /*** - * Split uri with fragment into file uri and checksum - * @param fileURIStr uri with fragment - * @return array of file name and checksum - */ - static public String[] getFileWithChksumFromURI(String fileURIStr) { + * Split uri with fragment into file uri, checksum and source cmroot uri + * @param fileURIStr uri with fragment + * @return array of file name, checksum and cm root uri on the source + */ + public static String[] getFileWithChksumAndCMRootURIFromURI(String fileURIStr) { String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); - String[] result = new String[2]; + String[] result = new String[3]; result[0] = uriAndFragment[0]; - if (uriAndFragment.length>1) { - result[1] = uriAndFragment[1]; + if (uriAndFragment.length > 1) { + String[] checksumAndCMRootUri = uriAndFragment[1].split(URI_CMROOTURI_SEPARATOR); + result[1] = checksumAndCMRootUri[0]; + if (checksumAndCMRootUri.length > 1) { + result[2] = checksumAndCMRootUri[1]; + } } return result; } public static boolean isCMFileUri(Path fromPath, FileSystem srcFs) { - String[] result = getFileWithChksumFromURI(fromPath.toString()); + String[] result = getFileWithChksumAndCMRootURIFromURI(fromPath.toString()); return result[1] != null; }