diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index ebbb0b6..3a027fa 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -172,7 +172,7 @@ public void testRecyclePartTable() throws Exception { ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(db, table, part) api moves file to cmroot dir - int ret = cm.recycle(part1Path, false); + int ret = cm.recycle(part1Path, true, false); Assert.assertEquals(ret, 1); Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, path1Chksum); assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); @@ -242,7 +242,7 @@ public void testRecycleNonPartTable() throws Exception { ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(Path) api moves file to cmroot dir - cm.recycle(filePath1, false); + cm.recycle(filePath1, true, false); assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, fileChksum1); @@ -293,9 +293,9 @@ public void testClearer() throws Exception { createFile(part32, "testClearer32"); String fileChksum32 = ReplChangeManager.checksumFor(part32, fs); - ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false); - ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false); - ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, true); + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, true, false); + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, true, false); + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, true, true); assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum11))); assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum12))); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index ec41537..1743651 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1655,6 +1655,131 @@ public void testInsertOverwriteOnPartitionedTableWithCM() throws IOException { } @Test + public void testRenameTableWithCM() throws IOException { + String testName = "renameTableWithCM"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "ten", "twenty" }; + String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen" }; + + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')"); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')"); + + // Get the last repl ID corresponding to all insert events except RENAME. + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String lastDumpIdWithoutRename = getResult(0, 1); + + run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_renamed"); + run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed"); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutRename); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyFail("SELECT a from " + dbName + "_dupe.unptned ORDER BY a"); + verifyFail("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a"); + verifyRun("SELECT a from " + dbName + "_dupe.unptned_renamed ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_renamed where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_renamed where (b=2) ORDER BY a", ptn_data_2); + } + + @Test + public void testRenamePartitionWithCM() throws IOException { + String testName = "renamePartitionWithCM"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] empty = new String[] {}; + String[] ptn_data_1 = new String[] { "fifteen", "fourteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen" }; + + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[1] + "')"); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[1] + "')"); + + // Get the last repl ID corresponding to all insert events except RENAME. + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String lastDumpIdWithoutRename = getResult(0, 1); + + run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=10)"); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + lastDumpIdWithoutRename); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=10) ORDER BY a", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=10) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", empty); + } + + @Test public void testViewsReplication() throws IOException { String testName = "viewsReplication"; String dbName = createDB(testName); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 7c1be8c..eafce34 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -220,7 +220,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, + newDbName + "." + newTblName + " already exists : " + destPath); } // check that src exists and also checks permissions necessary, rename src to dest - if (srcFs.exists(srcPath) && srcFs.rename(srcPath, destPath)) { + if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, true)) { dataWasMoved = true; } } catch (IOException e) { @@ -515,7 +515,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } //rename the data directory - wh.renameDir(srcPath, destPath); + wh.renameDir(srcPath, destPath, true); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } @@ -569,7 +569,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String LOG.error("Revert the data move in renaming a partition."); try { if (destFs.exists(destPath)) { - wh.renameDir(destPath, srcPath); + wh.renameDir(destPath, srcPath, false); } } catch (MetaException me) { LOG.error("Failed to restore partition data from " + destPath + " to " + srcPath diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 4938fef..9765ec2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3173,7 +3173,7 @@ public Partition exchange_partition(Map partitionSpecs, * TODO: Use the hard link feature of hdfs * once https://issues.apache.org/jira/browse/HDFS-3370 is done */ - pathCreated = wh.renameDir(sourcePath, destPath); + pathCreated = wh.renameDir(sourcePath, destPath, false); // Setting success to false to make sure that if the listener fails, rollback happens. success = false; @@ -3200,7 +3200,7 @@ public Partition exchange_partition(Map partitionSpecs, if (!success || !pathCreated) { ms.rollbackTransaction(); if (pathCreated) { - wh.renameDir(destPath, sourcePath); + wh.renameDir(destPath, sourcePath, false); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 6f17d23..e518b4a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -133,11 +133,13 @@ void addFile(Path path) throws MetaException { * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), * recursively move files inside directory to cmroot. Note the table must be managed table * @param path a single file or directory - * @param ifPurge if the file should skip Trash when delete + * @param isMove if the files to be copied or moved to cmpath. Copy if false and move if true. + * @param ifPurge if the file should skip Trash when delete. + * This is referred only if isMove is true. * @return int * @throws MetaException */ - int recycle(Path path, boolean ifPurge) throws MetaException { + int recycle(Path path, boolean isMove, boolean ifPurge) throws MetaException { if (!enabled) { return 0; } @@ -148,25 +150,37 @@ int recycle(Path path, boolean ifPurge) throws MetaException { if (fs.isDirectory(path)) { FileStatus[] files = fs.listStatus(path, hiddenFileFilter); for (FileStatus file : files) { - count += recycle(file.getPath(), ifPurge); + count += recycle(file.getPath(), isMove, ifPurge); } } else { Path cmPath = getCMPath(hiveConf, checksumFor(path, fs)); - if (LOG.isDebugEnabled()) { - LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); - } - // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting // timestamp long now = System.currentTimeMillis(); fs.setTimes(path, now, now); - boolean succ = fs.rename(path, cmPath); + boolean success = false; + if (isMove) { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); + } + success = fs.rename(path, cmPath); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Copying " + path.toString() + " to " + cmPath.toString()); + } + // Copy the file only if cmroot already have the same file. If copy is invoked with + // destination file exist throws IOException. + if (!fs.exists(cmPath)) { + success = FileUtils.copy(fs, path, fs, cmPath, false, false, hiveConf); + } + } + // Ignore if a file with same content already exist in cmroot // We might want to setXAttr for the new location in the future - if (!succ) { + if (!success) { if (LOG.isDebugEnabled()) { LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore"); } @@ -192,7 +206,7 @@ int recycle(Path path, boolean ifPurge) throws MetaException { // Tag if we want to remain in trash after deletion. // If multiple files share the same content, then // any file claim remain in trash would be granted - if (!ifPurge) { + if (isMove && !ifPurge) { try { fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); } catch (UnsupportedOperationException e) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index fcbcf62..bd0c87f 100755 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -196,8 +196,13 @@ public boolean mkdirs(Path f) throws MetaException { return false; } - public boolean renameDir(Path sourcePath, Path destPath) throws MetaException { + public boolean renameDir(Path sourcePath, Path destPath, boolean needCmRecycle) throws MetaException { try { + if (needCmRecycle) { + // Recycle the source files to cmroot. As we just rename the source path, we should copy the + // files to cmroot instead of moving it. + cm.recycle(sourcePath, false, false); + } FileSystem fs = getFs(sourcePath); return FileUtils.rename(fs, sourcePath, destPath, conf); } catch (Exception ex) { @@ -215,13 +220,13 @@ public boolean deleteDir(Path f, boolean recursive) throws MetaException { } public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException { - cm.recycle(f, ifPurge); + cm.recycle(f, true, ifPurge); FileSystem fs = getFs(f); return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); } public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException { - cm.recycle(f, ifPurge); + cm.recycle(f, true, ifPurge); return; }