diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index d732004e51..ec664226f2 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -614,7 +614,8 @@ //========================== 40000 range starts here ========================// SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), - SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true) + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true), + REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 529b387832..ff0f865f01 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -1702,6 +1702,28 @@ public void testCheckPointingMetadataDumpFailure() throws Throwable { assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + @Test + public void testReplTargetOfReplication() throws Throwable { + // Bootstrap + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true); + + //Try to do a dump on replicated db. It should fail + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')"); + try { + replica.dump(replicatedDbName); + } catch (Exception e) { + Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage()); + } + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')"); + + //Try to dump a different db on replica. That should succeed + replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')") + .dump(replicatedDbName + "_extra"); + replica.run("drop database if exists " + replicatedDbName + "_extra cascade"); + } + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { assertTrue("Path not found:" + filePath, fs.exists(filePath)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 498d59c359..0a7d5a0dc7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -468,6 +468,12 @@ private void verifyReplTargetProperty(Map props) { assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)); } + public void verifyTargetOfReplProperty(String dbName) throws Exception { + Database db = getDatabase(dbName); + assertTrue(db.getParameters().containsKey(ReplUtils.TARGET_OF_REPLICATION)); + assertTrue(Boolean.getBoolean(db.getParameters().get(ReplUtils.TARGET_OF_REPLICATION))); + } + public WarehouseInstance verifyReplTargetProperty(String dbName, List tblNames) throws Exception { for (String tblName : tblNames) { verifyReplTargetProperty(getTable(dbName, tblName).getParameters()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 85e9addcd3..1444e15041 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -156,6 +156,9 @@ private boolean isDbEmpty(String dbName) throws HiveException { // done for this database or not. If compaction is done before first incremental then duplicate check will fail as // compaction may change the directory structure. parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); + //This flag will be set to identify its a target of replication. Repl dump won't be allowed on a database + //which is a target of replication. + parameters.put(ReplUtils.TARGET_OF_REPLICATION, "true"); return parameters; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index bccf56af9e..eaa6690cfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.utils.StringUtils; @@ -130,6 +131,8 @@ public static final String RANGER_HIVE_SERVICE_NAME = "ranger.plugin.hive.service.name"; public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml"; + + public static final String TARGET_OF_REPLICATION = "repl.target.for"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -210,6 +213,15 @@ public static boolean replCkptStatus(String dbName, Map props, S return false; } + public static boolean isTargetOfReplication(Database db) { + assert (db != null); + Map m = db.getParameters(); + if ((m != null) && (m.containsKey(TARGET_OF_REPLICATION))) { + return !StringUtils.isEmpty(m.get(TARGET_OF_REPLICATION)); + } + return false; + } + public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) throws SemanticException { String val = hiveConf.get(configParam); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index ed358f379a..79f752a5bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -219,6 +219,11 @@ private void initReplDump(ASTNode ast) throws HiveException { " as it is not a source of replication (repl.source.for)"); throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getMsg()); } + if (ReplUtils.isTargetOfReplication(database)) { + LOG.error("Cannot dump database " + dbNameOrPattern + + " as it is a target of replication (repl.target.for)"); + throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg()); + } } else { throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist"); }