diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index d732004e51..ec664226f2 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -614,7 +614,8 @@ //========================== 40000 range starts here ========================// SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), - SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true) + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true), + REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 529b387832..ff0f865f01 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -1702,6 +1702,28 @@ public void testCheckPointingMetadataDumpFailure() throws Throwable { assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + @Test + public void testReplTargetOfReplication() throws Throwable { + // Bootstrap + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true); + + //Try to do a dump on replicated db. It should fail + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')"); + try { + replica.dump(replicatedDbName); + } catch (Exception e) { + Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage()); + } + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')"); + + //Try to dump a different db on replica. That should succeed + replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')") + .dump(replicatedDbName + "_extra"); + replica.run("drop database if exists " + replicatedDbName + "_extra cascade"); + } + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { assertTrue("Path not found:" + filePath, fs.exists(filePath)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d7b360cd93..59116165eb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -944,96 +944,6 @@ public void testIncrementalDumpMultiIteration() throws Throwable { Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents); } - @Test - public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { - WarehouseInstance.Tuple tuplePrimary = primary - .run("use " + primaryDbName) - .run("create table t1 (place string) partitioned by (country string) " - + " tblproperties('custom.property'='custom.value')") - .run("insert into table t1 partition(country='india') values ('bangalore')") - .dump(primaryDbName); - - // Bootstrap Repl A -> B - replica.load(replicatedDbName, primaryDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuplePrimary.lastReplicationId) - .run("show tblproperties t1('custom.property')") - .verifyResults(new String[] { "custom.property\tcustom.value" }) - .dumpFailure(replicatedDbName) - .run("alter database " + replicatedDbName - + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')") - .dumpFailure(replicatedDbName); //can not dump the db before first successful incremental load is done. - - // do a empty incremental load to allow dump of replicatedDbName - WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList()); - replica.load(replicatedDbName, primaryDbName); // first successful incremental load. - - // Bootstrap Repl B -> C - WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName); - String replDbFromReplica = replicatedDbName + "_dupe"; - replica.load(replDbFromReplica, replicatedDbName) - .run("use " + replDbFromReplica) - .run("repl status " + replDbFromReplica) - .verifyResult(tupleReplica.lastReplicationId) - .run("show tables") - .verifyResults(new String[] { "t1" }) - .run("select country from t1") - .verifyResults(Arrays.asList("india")) - .run("show tblproperties t1('custom.property')") - .verifyResults(new String[] { "custom.property\tcustom.value" }); - - // Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property - // is set to bootstrap dump location used in C. - Database db = replica.getDatabase(replDbFromReplica); - verifyIfSrcOfReplPropMissing(db.getParameters()); - verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation); - Table t1 = replica.getTable(replDbFromReplica, "t1"); - verifyIfCkptSet(t1.getParameters(), tupleReplica.dumpLocation); - Partition india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india")); - verifyIfCkptSet(india.getParameters(), tupleReplica.dumpLocation); - - // Perform alters in A for incremental replication - WarehouseInstance.Tuple tuplePrimaryInc = primary.run("use " + primaryDbName) - .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')") - .run("alter table t1 set tblproperties('dummy_key'='dummy_val')") - .run("alter table t1 partition(country='india') set fileformat orc") - .dump(primaryDbName, Collections.emptyList()); - - // Incremental Repl A -> B with alters on db/table/partition - WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, primaryDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuplePrimaryInc.lastReplicationId) - .dump(replicatedDbName, Collections.emptyList()); - - // Check if DB in B have ckpt property is set to bootstrap dump location used in B and missing for table/partition. - db = replica.getDatabase(replicatedDbName); - verifyIfCkptSet(db.getParameters(), tuplePrimary.dumpLocation); - t1 = replica.getTable(replicatedDbName, "t1"); - verifyIfCkptPropMissing(t1.getParameters()); - india = replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india")); - verifyIfCkptPropMissing(india.getParameters()); - - // Incremental Repl B -> C with alters on db/table/partition - replica.load(replDbFromReplica, replicatedDbName) - .run("use " + replDbFromReplica) - .run("repl status " + replDbFromReplica) - .verifyResult(tupleReplicaInc.lastReplicationId) - .run("show tblproperties t1('custom.property')") - .verifyResults(new String[] { "custom.property\tcustom.value" }); - - // Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property - // in DB is set to bootstrap dump location used in C but for table/partition, it is missing. - db = replica.getDatabase(replDbFromReplica); - verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation); - verifyIfSrcOfReplPropMissing(db.getParameters()); - t1 = replica.getTable(replDbFromReplica, "t1"); - verifyIfCkptPropMissing(t1.getParameters()); - india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india")); - verifyIfCkptPropMissing(india.getParameters()); - - replica.run("drop database if exists " + replDbFromReplica + " cascade"); - } - @Test public void testIfCkptPropIgnoredByExport() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 498d59c359..0a7d5a0dc7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -468,6 +468,12 @@ private void verifyReplTargetProperty(Map props) { assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)); } + public void verifyTargetOfReplProperty(String dbName) throws Exception { + Database db = getDatabase(dbName); + assertTrue(db.getParameters().containsKey(ReplUtils.TARGET_OF_REPLICATION)); + assertTrue(Boolean.getBoolean(db.getParameters().get(ReplUtils.TARGET_OF_REPLICATION))); + } + public WarehouseInstance verifyReplTargetProperty(String dbName, List tblNames) throws Exception { for (String tblName : tblNames) { verifyReplTargetProperty(getTable(dbName, tblName).getParameters()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 85e9addcd3..1444e15041 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -156,6 +156,9 @@ private boolean isDbEmpty(String dbName) throws HiveException { // done for this database or not. If compaction is done before first incremental then duplicate check will fail as // compaction may change the directory structure. parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); + //This flag will be set to identify its a target of replication. Repl dump won't be allowed on a database + //which is a target of replication. + parameters.put(ReplUtils.TARGET_OF_REPLICATION, "true"); return parameters; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index bccf56af9e..eaa6690cfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.utils.StringUtils; @@ -130,6 +131,8 @@ public static final String RANGER_HIVE_SERVICE_NAME = "ranger.plugin.hive.service.name"; public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml"; + + public static final String TARGET_OF_REPLICATION = "repl.target.for"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -210,6 +213,15 @@ public static boolean replCkptStatus(String dbName, Map props, S return false; } + public static boolean isTargetOfReplication(Database db) { + assert (db != null); + Map m = db.getParameters(); + if ((m != null) && (m.containsKey(TARGET_OF_REPLICATION))) { + return !StringUtils.isEmpty(m.get(TARGET_OF_REPLICATION)); + } + return false; + } + public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) throws SemanticException { String val = hiveConf.get(configParam); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index ed358f379a..79f752a5bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -219,6 +219,11 @@ private void initReplDump(ASTNode ast) throws HiveException { " as it is not a source of replication (repl.source.for)"); throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getMsg()); } + if (ReplUtils.isTargetOfReplication(database)) { + LOG.error("Cannot dump database " + dbNameOrPattern + + " as it is a target of replication (repl.target.for)"); + throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg()); + } } else { throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist"); }