diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index e63250c9fb..235bd11107 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -119,6 +120,7 @@ public void testRecyclePartTable() throws Exception { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); db.setName(dbName); client.createDatabase(db); @@ -204,6 +206,7 @@ public void testRecycleNonPartTable() throws Exception { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1, 2, 3"); db.setName(dbName); client.createDatabase(db); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java index f14b4300da..0e0a5cc43f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestCopyUtils { @Rule @@ -110,7 +111,8 @@ public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } /** diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java index 73102a7dd3..7557280d2d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java @@ -39,6 +39,7 @@ import java.util.HashMap; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationOnHDFSEncryptedZones { private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; @@ -83,7 +84,8 @@ public static void classLevelTearDown() throws IOException { public void setup() throws Throwable { primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 8b33b78548..f4cdf02c97 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -89,6 +89,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenarios { @@ -379,7 +380,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data, driver); // Create an empty database to load - run("CREATE DATABASE " + dbName + "_empty", driverMirror); + createDB(dbName + "_empty", driverMirror); // Load to an empty database Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty"); @@ -389,7 +390,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { String[] nullReplId = new String[]{ "NULL" }; // Create a database with a table - run("CREATE DATABASE " + dbName + "_withtable", driverMirror); + createDB(dbName + "_withtable", driverMirror); run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror); // Load using same dump to a DB with table. It should fail as DB is not empty. verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'", driverMirror); @@ -398,7 +399,7 @@ public void testBootstrapLoadOnExistingDb() throws IOException { verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror); // Create a database with a view - run("CREATE DATABASE " + dbName + "_withview", driverMirror); + createDB(dbName + "_withview", driverMirror); run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror); run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror); // Load using same dump to a DB with view. It should fail as DB is not empty. @@ -1893,8 +1894,8 @@ public void testRenameTableAcrossDatabases() throws IOException { String replDbName1 = dbName1 + "_dupe"; String replDbName2 = dbName2 + "_dupe"; - run("CREATE DATABASE " + dbName1, driver); - run("CREATE DATABASE " + dbName2, driver); + createDB(dbName1, driver); + createDB(dbName2, driver); run("CREATE TABLE " + dbName1 + ".unptned(a string) STORED AS TEXTFILE", driver); String[] unptn_data = new String[] { "ten", "twenty" }; @@ -1909,14 +1910,14 @@ public void testRenameTableAcrossDatabases() throws IOException { verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror); verifyIfTableNotExist(replDbName2, "unptned", metaStoreClientMirror); - run("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver); + verifyFail("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver); incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); - verifyIfTableNotExist(replDbName1, "unptned", metaStoreClientMirror); verifyIfTableNotExist(replDbName1, "unptned_renamed", metaStoreClientMirror); - verifyRun("SELECT a from " + replDbName2 + ".unptned_renamed ORDER BY a", unptn_data, driverMirror); + verifyIfTableNotExist(replDbName2, "unptned_renamed", metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror); } @Test @@ -1928,8 +1929,8 @@ public void testRenamePartitionedTableAcrossDatabases() throws IOException { String replDbName1 = dbName1 + "_dupe"; String replDbName2 = dbName2 + "_dupe"; - run("CREATE DATABASE " + dbName1, driver); - run("CREATE DATABASE " + dbName2, driver); + createDB(dbName1, driver); + createDB(dbName2, driver); run("CREATE TABLE " + dbName1 + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] ptn_data = new String[] { "fifteen", "fourteen" }; @@ -1944,14 +1945,14 @@ public void testRenamePartitionedTableAcrossDatabases() throws IOException { verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror); verifyIfTableNotExist(replDbName2, "ptned", metaStoreClientMirror); - run("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver); + verifyFail("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver); incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); - verifyIfTableNotExist(replDbName1, "ptned", metaStoreClientMirror); verifyIfTableNotExist(replDbName1, "ptned_renamed", metaStoreClientMirror); - verifyRun("SELECT a from " + replDbName2 + ".ptned_renamed where (b=1) ORDER BY a", ptn_data, driverMirror); + verifyIfTableNotExist(replDbName2, "ptned_renamed", metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror); } @Test @@ -3076,7 +3077,7 @@ public void testAuthForNotificationAPIs() throws Exception { // Setup long firstEventId = metaStoreClient.getCurrentNotificationEventId().getEventId(); String dbName = "testAuthForNotificationAPIs"; - driver.run("create database " + dbName); + createDB(dbName, driver); NotificationEventResponse rsp = metaStoreClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); // Test various scenarios @@ -3156,7 +3157,68 @@ public void testRecycleFileDropTempTable() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testDumpNonReplDatabase() throws IOException { + String dbName = createDBNonRepl(testName.getMethodName(), driver); + verifyFail("REPL DUMP " + dbName, driver); + verifyFail("REPL DUMP " + dbName + " from 1 ", driver); + run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); + assertTrue(run("REPL DUMP " + dbName, true, driver)); + assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver)); + dbName = createDBNonRepl(testName.getMethodName() + "_case", driver); + run("alter database " + dbName + " set dbproperties ('repl.SOURCE.for' = '1, 2, 3')", driver); + assertTrue(run("REPL DUMP " + dbName, true, driver)); + assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver)); + } + + @Test + public void testRecycleFileNonReplDatabase() throws IOException { + String dbName = createDBNonRepl(testName.getMethodName(), driver); + + String cmDir = hconf.getVar(HiveConf.ConfVars.REPLCMDIR); + Path path = new Path(cmDir); + FileSystem fs = path.getFileSystem(hconf); + ContentSummary cs = fs.getContentSummary(path); + long fileCount = cs.getFileCount(); + + run("CREATE TABLE " + dbName + ".normal(a int)", driver); + run("INSERT INTO " + dbName + ".normal values (1)", driver); + + cs = fs.getContentSummary(path); + long fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (3)", driver); + run("TRUNCATE TABLE " + dbName + ".normal", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (4)", driver); + run("ALTER TABLE " + dbName + ".normal RENAME to " + dbName + ".normal1", driver); + verifyRun("SELECT count(*) from " + dbName + ".normal1", new String[]{"1"}, driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal1 values (5)", driver); + run("DROP TABLE " + dbName + ".normal1", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + } + private static String createDB(String name, IDriver myDriver) { + LOG.info("Testing " + name); + run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')", myDriver); + return name; + } + + private static String createDBNonRepl(String name, IDriver myDriver) { LOG.info("Testing " + name); String dbName = name + "_" + tid; run("CREATE DATABASE " + dbName, myDriver); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 8ad507f5ae..9a2d296c05 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.slf4j.Logger; @@ -97,7 +98,8 @@ public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 8caa55c5d5..9af072d8ac 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -54,6 +54,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertFalse; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenariosAcrossInstances { @Rule @@ -92,7 +93,8 @@ public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After @@ -396,17 +398,20 @@ public void testBootStrapDumpOfWarehouse() throws Throwable { String randomTwo = RandomStringUtils.random(10, true, false); String dbOne = primaryDbName + randomOne; String dbTwo = primaryDbName + randomTwo; + primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") - .run("create database " + dbOne) + .run("create database " + dbOne + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbOne) // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables. // This used to work by accident, now this works due a test flag. The test needs to be fixed. // Also applies for a couple more tests. .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") - .run("create database " + dbTwo) + .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", @@ -457,10 +462,12 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { String randomOne = RandomStringUtils.random(10, true, false); String randomTwo = RandomStringUtils.random(10, true, false); String dbOne = primaryDbName + randomOne; + primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); WarehouseInstance.Tuple bootstrapTuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") - .run("create database " + dbOne) + .run("create database " + dbOne + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbOne) .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") @@ -469,7 +476,8 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { String dbTwo = primaryDbName + randomTwo; WarehouseInstance.Tuple incrementalTuple = primary - .run("create database " + dbTwo) + .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") .run("use " + dbOne) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java index a3cdd6e1db..e3c83d2426 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java @@ -53,6 +53,7 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; /** * Test HiveAuthorizer api invocation @@ -104,7 +105,8 @@ public static void beforeTest() throws Exception { runCmd("create table " + tableName + " (i int, j int, k string) partitioned by (city string, `date` string) "); runCmd("create view " + viewName + " as select * from " + tableName); - runCmd("create database " + dbName); + runCmd("create database " + dbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); runCmd("create table " + fullInTableName + "(i int)"); // Need a separate table for ACID testing since it has to be bucketed and it has to be Acid runCmd("create table " + acidTableName + " (i int, j int, k int) clustered by (k) into 2 buckets " + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index 597544f021..14d389464f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.Table; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -52,6 +54,7 @@ private Path cmRootDirectory; private static FileSystem fs; private static MiniDFSCluster miniDFSCluster; + private final String dbName = "TestCleanerWithReplication"; @Before public void setup() throws Exception { @@ -68,6 +71,10 @@ public void setup() throws Exception { fs.mkdirs(cmRootDirectory); } tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); + Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); + db.setName(dbName); + ms.createDatabase(db); } @BeforeClass @@ -81,9 +88,10 @@ public static void classLevelSetup() throws LoginException, IOException { } @After - public void tearDown() throws IOException { + public void tearDown() throws Exception { fs.delete(cmRootDirectory, true); compactorTestCleanup(); + ms.dropDatabase(dbName, true, true, true); } @AfterClass @@ -93,16 +101,16 @@ public static void tearDownClass() { @Test public void cleanupAfterMajorTableCompaction() throws Exception { - Table t = newTable("default", "camtc", false); + Table t = newTable(dbName, "camtc", false); addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); addBaseFile(t, null, 25L, 25); - burnThroughTransactions("default", "camtc", 25); + burnThroughTransactions(dbName, "camtc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); @@ -113,7 +121,7 @@ public void cleanupAfterMajorTableCompaction() throws Exception { @Test public void cleanupAfterMajorPartitionCompaction() throws Exception { - Table t = newTable("default", "campc", true); + Table t = newTable(dbName, "campc", true); Partition p = newPartition(t, "today"); addBaseFile(t, p, 20L, 20); @@ -121,9 +129,9 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { addDeltaFile(t, p, 23L, 24L, 2); addBaseFile(t, p, 25L, 25); - burnThroughTransactions("default", "campc", 25); + burnThroughTransactions(dbName, "campc", 25); - CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); + CompactionRequest rqst = new CompactionRequest(dbName, "campc", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); @@ -135,16 +143,16 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { @Test public void cleanupAfterMinorTableCompaction() throws Exception { - Table t = newTable("default", "camitc", false); + Table t = newTable(dbName, "camitc", false); addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); addDeltaFile(t, null, 21L, 24L, 4); - burnThroughTransactions("default", "camitc", 25); + burnThroughTransactions(dbName, "camitc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); @@ -155,7 +163,7 @@ public void cleanupAfterMinorTableCompaction() throws Exception { @Test public void cleanupAfterMinorPartitionCompaction() throws Exception { - Table t = newTable("default", "camipc", true); + Table t = newTable(dbName, "camipc", true); Partition p = newPartition(t, "today"); addBaseFile(t, p, 20L, 20); @@ -163,9 +171,9 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { addDeltaFile(t, p, 23L, 24L, 2); addDeltaFile(t, p, 21L, 24L, 4); - burnThroughTransactions("default", "camipc", 25); + burnThroughTransactions(dbName, "camipc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index f52338a24b..3ada8030c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -83,6 +83,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestJdbcWithMiniHS2 { private static MiniHS2 miniHS2 = null; @@ -119,7 +120,8 @@ public static void setupBeforeClass() throws Exception { } Statement stmt = conDefault.createStatement(); stmt.execute("drop database if exists " + testDbName + " cascade"); - stmt.execute("create database " + testDbName); + stmt.execute("create database " + testDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); stmt.close(); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 16d08543ba..e06949928d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1587,10 +1587,10 @@ private void moveDir(FileSystem fs, Path from, Path to) throws HiveException { } } - private void deleteDir(Path dir) throws HiveException { + private void deleteDir(Path dir, Database db) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true); + wh.deleteDir(dir, true, db); } catch (MetaException e) { throw new HiveException(e); } @@ -1845,7 +1845,7 @@ private int archive(Hive db, AlterTableSimpleDesc simpleDesc, // If a failure occurs here, the directory containing the original files // will not be deleted. The user will run ARCHIVE again to clear this up if(pathExists(intermediateOriginalDir)) { - deleteDir(intermediateOriginalDir); + deleteDir(intermediateOriginalDir, db.getDatabase(tbl.getDbName())); } if(recovery) { @@ -2051,7 +2051,7 @@ private int unarchive(Hive db, AlterTableSimpleDesc simpleDesc) // If a failure happens here, the intermediate archive files won't be // deleted. The user will need to call unarchive again to clear those up. if(pathExists(intermediateArchivedDir)) { - deleteDir(intermediateArchivedDir); + deleteDir(intermediateArchivedDir, db.getDatabase(tbl.getDbName())); } if(recovery) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e8554f9ce5..107d032eb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest; @@ -1737,8 +1738,10 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, @@ -2304,8 +2307,10 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, - newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index 8fccf369f8..b6d8a28495 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -90,11 +90,9 @@ private void export_meta_data(PreDropTableEvent tableEvent) throws MetaException EximUtil.createExportDump(fs, outFile, mTbl, null, null, new HiveConf(conf, MetaDataExportListener.class)); if (moveMetadataToTrash == true) { - wh.deleteDir(metaPath, true); + wh.deleteDir(metaPath, true, false, false); } - } catch (IOException e) { - throw new MetaException(e.getMessage()); - } catch (SemanticException e) { + } catch (IOException | SemanticException e) { throw new MetaException(e.getMessage()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 562f497842..d7b3104cf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -105,7 +107,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { switch (ast.getToken().getType()) { case TOK_REPL_DUMP: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); - initReplDump(ast); + try { + initReplDump(ast); + } catch (HiveException e) { + throw new SemanticException("repl dump failed " + e.getMessage()); + } analyzeReplDump(ast); break; } @@ -127,9 +133,22 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } } - private void initReplDump(ASTNode ast) { + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + + for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { + Database database = db.getDatabase(dbName); + if (database != null) { + if (!ReplChangeManager.isSourceOfReplication(database)) { + throw new SemanticException("Cannot dump database " + dbNameOrPattern + + " as it is not a source of replication"); + } + } else { + throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist"); + } + } + // skip the first node, which is always required int currNode = 1; while (currNode < numChildren) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fe6d2d663d..3565616171 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -35,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -320,7 +323,7 @@ private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws IOException { + throws IOException, HiveException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); @@ -346,10 +349,13 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + Database db = Hive.get().getDatabase(ci.dbname); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); - replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + if (ReplChangeManager.isSourceOfReplication(db)) { + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + } fs.delete(dead, true); } } diff --git a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q index cd9080c26b..9d712ca7c3 100644 --- a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q +++ b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q @@ -16,7 +16,7 @@ drop database if exists test_repldump_adminpriv cascade; set user.name=ruser1; show role grant user ruser1; -create database test_repldump_adminpriv; +create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3'); create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string); show tables test_repldump_adminpriv; diff --git a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q index 68a132d9f7..0b1b12b5c4 100644 --- a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q +++ b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q @@ -18,7 +18,7 @@ drop database if exists test_replload_adminpriv_tgt2 cascade; set user.name=ruser1; show role grant user ruser1; -create database test_replload_adminpriv_src; +create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3'); create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string); show tables test_replload_adminpriv_src; diff --git a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out index ac5103e34c..272b8b8dc1 100644 --- a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out +++ b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out @@ -12,10 +12,10 @@ PREHOOK: type: SHOW_ROLE_GRANT POSTHOOK: query: show role grant user ruser1 POSTHOOK: type: SHOW_ROLE_GRANT public false -1 -PREHOOK: query: create database test_repldump_adminpriv +PREHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3') PREHOOK: type: CREATEDATABASE PREHOOK: Output: database:test_repldump_adminpriv -POSTHOOK: query: create database test_repldump_adminpriv +POSTHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3') POSTHOOK: type: CREATEDATABASE POSTHOOK: Output: database:test_repldump_adminpriv PREHOOK: query: create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string) diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out index 01b57a7631..1499c39464 100644 --- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out +++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out @@ -20,10 +20,10 @@ PREHOOK: type: SHOW_ROLE_GRANT POSTHOOK: query: show role grant user ruser1 POSTHOOK: type: SHOW_ROLE_GRANT public false -1 -PREHOOK: query: create database test_replload_adminpriv_src +PREHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3') PREHOOK: type: CREATEDATABASE PREHOOK: Output: database:test_replload_adminpriv_src -POSTHOOK: query: create database test_replload_adminpriv_src +POSTHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3') POSTHOOK: type: CREATEDATABASE POSTHOOK: Output: database:test_replload_adminpriv_src PREHOOK: query: create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string) diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 9ab9e85742..be05838ed7 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -242,7 +242,8 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam " already exists : " + destPath); } // check that src exists and also checks permissions necessary, rename src to dest - if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, true)) { + if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, + ReplChangeManager.isSourceOfReplication(olddb))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -559,6 +560,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String FileSystem srcFs; FileSystem destFs = null; boolean dataWasMoved = false; + Database db; try { msdb.openTransaction(); Table tbl = msdb.getTable(DEFAULT_CATALOG_NAME, dbname, name); @@ -593,9 +595,11 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String // 3) rename the partition directory if it is not an external table if (!tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { try { + db = msdb.getDatabase(catName, dbname); + // if tbl location is available use it // else derive the tbl location from database location - destPath = wh.getPartitionPath(msdb.getDatabase(catName, dbname), tbl, new_part.getValues()); + destPath = wh.getPartitionPath(db, tbl, new_part.getValues()); destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation())); } catch (NoSuchObjectException e) { LOG.debug("Didn't find object in metastore ", e); @@ -633,7 +637,7 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } //rename the data directory - wh.renameDir(srcPath, destPath, true); + wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db)); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } @@ -645,7 +649,6 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me); throw me; } - new_part.getSd().setLocation(newPartLoc); } } else { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 92d2e3f368..c1d25db796 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import static org.apache.commons.lang.StringUtils.join; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @@ -1019,7 +1020,7 @@ public void create_catalog(CreateCatalogRequest rqst) // Create a default database inside the catalog Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " + - catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); + catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); db.setCatalogName(catalog.getName()); create_database_core(ms, db); @@ -1035,7 +1036,7 @@ public void create_catalog(CreateCatalogRequest rqst) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(catPath, true); + wh.deleteDir(catPath, true, false, false); } } @@ -1165,7 +1166,7 @@ private void dropCatalogCore(String catName) success = ms.commitTransaction(); } finally { if (success) { - wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false); + wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, false); } else { ms.rollbackTransaction(); } @@ -1227,7 +1228,7 @@ private void create_database_core(RawStore ms, final Database db) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(dbPath, true); + wh.deleteDir(dbPath, true, db); } } @@ -1385,6 +1386,10 @@ private void drop_database_core(RawStore ms, String catName, ms.openTransaction(); db = ms.getDatabase(catName, name); + if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) { + throw new InvalidOperationException("can not drop a database which is a source of replication"); + } + firePreEvent(new PreDropDatabaseEvent(db, this)); String catPrependedName = MetaStoreUtils.prependCatalogToDbName(catName, name, conf); @@ -1516,14 +1521,14 @@ private void drop_database_core(RawStore ms, String catName, ms.rollbackTransaction(); } else if (deleteData) { // Delete the data in the partitions which have other locations - deletePartitionData(partitionPaths); + deletePartitionData(partitionPaths, false, db); // Delete the data in the tables which have other locations for (Path tablePath : tablePaths) { - deleteTableData(tablePath); + deleteTableData(tablePath, false, db); } // Delete the data in the database try { - wh.deleteDir(new Path(db.getLocationUri()), true); + wh.deleteDir(new Path(db.getLocationUri()), true, db); } catch (Exception e) { LOG.error("Failed to delete database directory: " + db.getLocationUri() + " " + e.getMessage()); @@ -1753,6 +1758,7 @@ private void create_table_core(final RawStore ms, final Table tbl, Map transactionalListenerResponses = Collections.emptyMap(); Path tblPath = null; boolean success = false, madeDir = false; + Database db = null; try { if (!tbl.isSetCatName()) { tbl.setCatName(getDefaultCatalog(conf)); @@ -1761,11 +1767,7 @@ private void create_table_core(final RawStore ms, final Table tbl, ms.openTransaction(); - Database db = ms.getDatabase(tbl.getCatName(), tbl.getDbName()); - if (db == null) { - throw new NoSuchObjectException("The database " + - Warehouse.getCatalogQualifiedDbName(tbl.getCatName(), tbl.getDbName()) + " does not exist"); - } + db = ms.getDatabase(tbl.getCatName(), tbl.getDbName()); // get_table checks whether database exists, it should be moved here if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) { @@ -1776,8 +1778,7 @@ private void create_table_core(final RawStore ms, final Table tbl, if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { if (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { - tblPath = wh.getDefaultTablePath( - ms.getDatabase(tbl.getCatName(), tbl.getDbName()), tbl.getTableName()); + tblPath = wh.getDefaultTablePath(db, tbl.getTableName()); } else { if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { LOG.warn("Location: " + tbl.getSd().getLocation() @@ -1900,7 +1901,7 @@ private void create_table_core(final RawStore ms, final Table tbl, if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true); + wh.deleteDir(tblPath, true, db); } } @@ -2344,8 +2345,11 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S Table tbl = null; boolean ifPurge = false; Map transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); + db = ms.getDatabase(catName, dbname); + // drop any partitions tbl = get_table_core(catName, dbname, name); if (tbl == null) { @@ -2396,9 +2400,9 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S } else if (deleteData && !isExternal) { // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge); + deletePartitionData(partPaths, ifPurge, db); // Delete the data in the table - deleteTableData(tblPath, ifPurge); + deleteTableData(tblPath, ifPurge, db); // ok even if the data is not deleted } @@ -2413,27 +2417,19 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S return success; } - /** - * Deletes the data in a table's location, if it fails logs an error - * - * @param tablePath - */ - private void deleteTableData(Path tablePath) { - deleteTableData(tablePath, false); - } - /** * Deletes the data in a table's location, if it fails logs an error * * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse + * @param db database the table belongs to */ - private void deleteTableData(Path tablePath, boolean ifPurge) { + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { if (tablePath != null) { try { - wh.deleteDir(tablePath, true, ifPurge); + wh.deleteDir(tablePath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete table directory: " + tablePath + " " + e.getMessage()); @@ -2441,16 +2437,6 @@ private void deleteTableData(Path tablePath, boolean ifPurge) { } } - /** - * Give a list of partitions' locations, tries to delete each one - * and for each that fails logs an error. - * - * @param partPaths - */ - private void deletePartitionData(List partPaths) { - deletePartitionData(partPaths, false); - } - /** * Give a list of partitions' locations, tries to delete each one * and for each that fails logs an error. @@ -2458,12 +2444,13 @@ private void deletePartitionData(List partPaths) { * @param partPaths * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse + * @param db database the partition belongs to */ - private void deletePartitionData(List partPaths, boolean ifPurge) { + private void deletePartitionData(List partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { try { - wh.deleteDir(partPath, true, ifPurge); + wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + " " + e.getMessage()); @@ -2695,6 +2682,7 @@ public void truncate_table(final String dbName, final String tableName, List part_vals, EnvironmentContext envContext) - throws InvalidObjectException, AlreadyExistsException, MetaException { + throws InvalidObjectException, AlreadyExistsException, MetaException, NoSuchObjectException { Partition part = new Partition(); boolean success = false, madeDir = false; Path partLocation = null; Table tbl = null; Map transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); part.setCatName(catName); @@ -2999,6 +2988,8 @@ private Partition append_partition_common(RawStore ms, String catName, String db "Cannot append a partition to a view"); } + db = get_database_core(catName, dbName); + firePreEvent(new PreAddPartitionEvent(tbl, part, this)); part.setSd(tbl.getSd().deepCopy()); @@ -3051,7 +3042,7 @@ private Partition append_partition_common(RawStore ms, String catName, String db if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(partLocation, true); + wh.deleteDir(partLocation, true, db); } } @@ -3177,6 +3168,7 @@ public boolean equals(Object obj) { final List existingParts = new ArrayList<>(); Table tbl = null; Map transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); @@ -3187,6 +3179,8 @@ public boolean equals(Object obj) { " does not exist"); } + db = ms.getDatabase(catName, dbName); + if (!parts.isEmpty()) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } @@ -3310,7 +3304,7 @@ public boolean equals(Object obj) { for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, db); } } @@ -3446,6 +3440,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName .getPartitionIterator(); Table tbl = null; Map transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); tbl = ms.getTable(catName, dbName, tblName); @@ -3453,7 +3448,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName throw new InvalidObjectException("Unable to add partitions because " + "database or table " + dbName + "." + tblName + " does not exist"); } - + db = ms.getDatabase(catName, dbName); firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); Set partsToAdd = new HashSet<>(partitionSpecProxy.size()); List partitionsToAdd = new ArrayList<>(partitionSpecProxy.size()); @@ -3569,7 +3564,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, db); } } } @@ -3721,7 +3716,8 @@ private Partition add_partition_core(final RawStore ms, success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true); + wh.deleteDir(new Path(part.getSd().getLocation()), true, + ms.getDatabase(tbl.getCatName(), tbl.getDbName())); } } @@ -3799,6 +3795,19 @@ public Partition exchange_partition(Map partitionSpecs, return new Partition(); } + private boolean isRenameAllowed(String catalogName, String srcDBName, String destDBName) + throws MetaException, NoSuchObjectException { + RawStore ms = getMS(); + if (!srcDBName.equalsIgnoreCase(destDBName)) { + Database destDB = ms.getDatabase(catalogName, destDBName); + Database srcDB = ms.getDatabase(catalogName, srcDBName); + if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) { + return false; + } + } + return true; + } + @Override public List exchange_partitions(Map partitionSpecs, String sourceDbName, String sourceTableName, String destDbName, @@ -3834,6 +3843,7 @@ public Partition exchange_partition(Map partitionSpecs, getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME], parsedSourceDbName[DB_NAME], sourceTableName) + " not found"); } + List partVals = MetaStoreUtils.getPvals(sourceTable.getPartitionKeys(), partitionSpecs); List partValsPresent = new ArrayList<> (); @@ -3886,6 +3896,11 @@ public Partition exchange_partition(Map partitionSpecs, } } + if (!isRenameAllowed(parsedDestDbName[CAT_NAME], parsedSourceDbName[DB_NAME], parsedDestDbName[DB_NAME])) { + throw new MetaException("Exchange partition not allowed for " + + getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME], + parsedSourceDbName[DB_NAME], sourceTableName) + " Dest db : " + destDbName); + } try { for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); @@ -3981,6 +3996,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam Path archiveParentDir = null; boolean mustPurge = false; boolean isExternalTbl = false; + boolean isSourceOfReplication = false; Map transactionalListenerResponses = Collections.emptyMap(); if (db_name == null) { @@ -4028,6 +4044,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam new DropPartitionEvent(tbl, part, true, deleteData, this), envContext); } + isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name)); success = ms.commitTransaction(); } } finally { @@ -4043,13 +4060,14 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam } // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params + if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge); + wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); + wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication); } // ok even if the data is not deleted } @@ -4077,12 +4095,13 @@ private static boolean isMustPurge(EnvironmentContext envContext, Table tbl) { || (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge"))); } - private void deleteParentRecursive(Path parent, int depth, boolean mustPurge) throws IOException, MetaException { + private void deleteParentRecursive(Path parent, int depth, boolean mustPurge, boolean needRecycle) + throws IOException, MetaException { if (depth > 0 && parent != null && wh.isWritable(parent)) { if (wh.isDir(parent) && wh.isEmpty(parent)) { - wh.deleteDir(parent, true, mustPurge); + wh.deleteDir(parent, true, mustPurge, needRecycle); } - deleteParentRecursive(parent.getParent(), depth - 1, mustPurge); + deleteParentRecursive(parent.getParent(), depth - 1, mustPurge, needRecycle); } } @@ -4124,6 +4143,7 @@ public DropPartitionsResult drop_partitions_req( List parts = null; boolean mustPurge = false; List> transactionalListenerResponses = Lists.newArrayList(); + boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName)); try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. @@ -4231,12 +4251,12 @@ public DropPartitionsResult drop_partitions_req( // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge); + wh.deleteDir(path, true, mustPurge, isSourceOfReplication); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge); + wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -4860,6 +4880,7 @@ private void alter_table_core(final String catName, final String dbname, final S throws InvalidOperationException, MetaException { startFunction("alter_table", ": " + getCatalogQualifiedTableName(catName, dbname, name) + " newtbl=" + newTable.getTableName()); + // Update the time if it hasn't been specified. if (newTable.getParameters() == null || newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { @@ -4884,6 +4905,11 @@ private void alter_table_core(final String catName, final String dbname, final S Exception ex = null; try { Table oldt = get_table_core(catName, dbname, name); + if (!isRenameAllowed(catName, dbname, newTable.getDbName())) { + throw new MetaException("Alter table not allowed for table " + + getCatalogQualifiedTableName(catName, dbname, name) + + " new table = " + getCatalogQualifiedTableName(newTable)); + } firePreEvent(new PreAlterTableEvent(oldt, newTable, this)); alterHandler.alterTable(getMS(), wh, catName, dbname, name, newTable, envContext, this); @@ -6850,12 +6876,15 @@ public void drop_function(String dbName, String funcName) if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } + Boolean isSourceOfReplication = + ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME])); + // if copy of jar to change management fails we fail the metastore transaction, since the // user might delete the jars on HDFS externally after dropping the function, hence having // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:")) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) { wh.addToChangeManagement(new Path(uri.getUri())); } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 79ba7ff35b..f7018c2d0b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -56,6 +58,7 @@ private static final String ORIG_LOC_TAG = "user.original-loc"; static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; private static final String URI_FRAGMENT_SEPARATOR = "#"; + public static final String SOURCE_OF_REPLICATION = "repl.source.for"; public enum RecycleType { MOVE, @@ -467,4 +470,24 @@ static void scheduleCMClearer(Configuration conf) { 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } + + public static boolean isSourceOfReplication(Database db) { + // Can not judge, so assuming replication is not enabled. + assert (db != null); + String replPolicyIds = getReplPolicyIdString(db); + return !StringUtils.isEmpty(replPolicyIds); + } + + public static String getReplPolicyIdString(Database db) { + if (db != null) { + Map m = db.getParameters(); + if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { + String replPolicyId = m.get(SOURCE_OF_REPLICATION); + LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); + return replPolicyId; + } + LOG.debug("Repl policy is not set for database ", db.getName()); + } + return null; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 88cbfcdc4b..e31935ebf5 100755 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -301,18 +301,16 @@ void addToChangeManagement(Path file) throws MetaException { } } - public boolean deleteDir(Path f, boolean recursive) throws MetaException { - return deleteDir(f, recursive, false); + public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException { + return deleteDir(f, recursive, false, db); } - public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException { - return deleteDir(f, recursive, ifPurge, true); + public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException { + return deleteDir(f, recursive, ifPurge, ReplChangeManager.isSourceOfReplication(db)); } public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { - // no need to create the CM recycle file for temporary tables if (needCmRecycle) { - try { cm.recycle(f, RecycleType.MOVE, ifPurge); } catch (IOException e) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java index fa30330e78..815b39c483 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java @@ -21,7 +21,12 @@ */ package org.apache.hadoop.hive.metastore.model; +import org.apache.hadoop.hive.metastore.ReplChangeManager; + +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * Storage Class representing the Hive MDatabase in a rdbms @@ -109,7 +114,21 @@ public void setDescription(String description) { * @param parameters the parameters mapping. */ public void setParameters(Map parameters) { - this.parameters = parameters; + if (parameters == null) { + this.parameters = null; + return; + } + this.parameters = new HashMap<>(); + Set keys = new HashSet<>(parameters.keySet()); + for(String key : keys) { + // Normalize the case for source of replication parameter + if (ReplChangeManager.SOURCE_OF_REPLICATION.equalsIgnoreCase(key)) { + // TODO : Some extra validation can also be added as this is a user provided parameter. + this.parameters.put(ReplChangeManager.SOURCE_OF_REPLICATION, parameters.get(key)); + } else { + this.parameters.put(key, parameters.get(key)); + } + } } public String getOwnerName() {