diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 182a772..35437b1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -468,7 +468,7 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { String randomOne = RandomStringUtils.random(10, true, false); String randomTwo = RandomStringUtils.random(10, true, false); String dbOne = primaryDbName + randomOne; - primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); + primary.run("alter database default set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')"); WarehouseInstance.Tuple bootstrapTuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") @@ -777,6 +777,14 @@ private void verifyIfCkptSet(Map props, String dumpDir) { assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); } + private void verifyIfCkptPropMissing(Map props) { + assertFalse(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); + } + + private void verifyIfSrcOfReplPropMissing(Map props) { + assertFalse(props.containsKey(SOURCE_OF_REPLICATION)); + } + @Test public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { WarehouseInstance.Tuple tuple = primary @@ -811,4 +819,129 @@ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); } + + @Test + public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { + WarehouseInstance.Tuple tuplePrimary = primary + .run("use " + primaryDbName) + .run("create table t1 (place string) partitioned by (country string) " + + " tblproperties('custom.property'='custom.value')") + .run("insert into table t1 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + // Bootstrap Repl A -> B + WarehouseInstance.Tuple tupleReplica = replica.load(replicatedDbName, tuplePrimary.dumpLocation) + .run("repl status " + replicatedDbName) + .verifyResult(tuplePrimary.lastReplicationId) + .run("show tblproperties t1('custom.property')") + .verifyResults(new String[] { "custom.value\t " }) + .dumpFailure(replicatedDbName, null) + .run("alter database " + replicatedDbName + + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')") + .dump(replicatedDbName, null); + + // Bootstrap Repl B -> C + String replDbFromReplica = replicatedDbName + "_dupe"; + replica.load(replDbFromReplica, tupleReplica.dumpLocation) + .run("use " + replDbFromReplica) + .run("repl status " + replDbFromReplica) + .verifyResult(tupleReplica.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("select country from t1") + .verifyResults(Arrays.asList("india")) + .run("show tblproperties t1('custom.property')") + .verifyResults(new String[] { "custom.value\t " }); + + // Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property + // is set to bootstrap dump location used in C. + Database db = replica.getDatabase(replDbFromReplica); + verifyIfSrcOfReplPropMissing(db.getParameters()); + verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation); + Table t1 = replica.getTable(replDbFromReplica, "t1"); + verifyIfCkptSet(t1.getParameters(), tupleReplica.dumpLocation); + Partition india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tupleReplica.dumpLocation); + + // Perform alters in A for incremental replication + WarehouseInstance.Tuple tuplePrimaryInc = primary.run("use " + primaryDbName) + .run("alter database " + primaryDbName + " set dbproperties('dummy_key'='dummy_val')") + .run("alter table t1 set tblproperties('dummy_key'='dummy_val')") + .run("alter table t1 partition(country='india') set fileformat orc") + .dump(primaryDbName, tuplePrimary.lastReplicationId); + + // Incremental Repl A -> B with alters on db/table/partition + WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, tuplePrimaryInc.dumpLocation) + .run("repl status " + replicatedDbName) + .verifyResult(tuplePrimaryInc.lastReplicationId) + .dump(replicatedDbName, tupleReplica.lastReplicationId); + + // Check if DB in B have ckpt property is set to bootstrap dump location used in B and missing for table/partition. + db = replica.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), tuplePrimary.dumpLocation); + t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptPropMissing(t1.getParameters()); + india = replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india")); + verifyIfCkptPropMissing(india.getParameters()); + + // Incremental Repl B -> C with alters on db/table/partition + replica.load(replDbFromReplica, tupleReplicaInc.dumpLocation) + .run("use " + replDbFromReplica) + .run("repl status " + replDbFromReplica) + .verifyResult(tupleReplicaInc.lastReplicationId) + .run("show tblproperties t1('custom.property')") + .verifyResults(new String[] { "custom.value\t " }); + + // Check if DB/table/partition in C doesn't have repl.source.for props. Also ensure, ckpt property + // in DB is set to bootstrap dump location used in C but for table/partition, it is missing. + db = replica.getDatabase(replDbFromReplica); + verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation); + verifyIfSrcOfReplPropMissing(db.getParameters()); + t1 = replica.getTable(replDbFromReplica, "t1"); + verifyIfCkptPropMissing(t1.getParameters()); + india = replica.getPartition(replDbFromReplica, "t1", Collections.singletonList("india")); + verifyIfCkptPropMissing(india.getParameters()); + + replica.run("drop database if exists " + replDbFromReplica + " cascade"); + } + + @Test + public void testIfCkptPropIgnoredByExport() throws Throwable { + WarehouseInstance.Tuple tuplePrimary = primary + .run("use " + primaryDbName) + .run("create table t1 (place string) partitioned by (country string)") + .run("insert into table t1 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + // Bootstrap Repl A -> B and then export table t1 + String path = "hdfs:///tmp/" + replicatedDbName + "/"; + String exportPath = "'" + path + "1/'"; + replica.load(replicatedDbName, tuplePrimary.dumpLocation) + .run("repl status " + replicatedDbName) + .verifyResult(tuplePrimary.lastReplicationId) + .run("use " + replicatedDbName) + .run("export table t1 to " + exportPath); + + // Check if ckpt property set in table/partition in B after bootstrap load. + Table t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptSet(t1.getParameters(), tuplePrimary.dumpLocation); + Partition india = replica.getPartition(replicatedDbName, "t1", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tuplePrimary.dumpLocation); + + // Import table t1 to C + String importDbFromReplica = replicatedDbName + "_dupe"; + replica.run("create database " + importDbFromReplica) + .run("use " + importDbFromReplica) + .run("import table t1 from " + exportPath) + .run("select country from t1") + .verifyResults(Arrays.asList("india")); + + // Check if table/partition in C doesn't have ckpt property + t1 = replica.getTable(importDbFromReplica, "t1"); + verifyIfCkptPropMissing(t1.getParameters()); + india = replica.getPartition(importDbFromReplica, "t1", Collections.singletonList("india")); + verifyIfCkptPropMissing(india.getParameters()); + + replica.run("drop database if exists " + importDbFromReplica + " cascade"); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 17fd799..79f145c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -217,6 +217,14 @@ Tuple dump(String dbName, String lastReplicationId) throws Throwable { return dump(dbName, lastReplicationId, Collections.emptyList()); } + WarehouseInstance dumpFailure(String dbName, String lastReplicationId) throws Throwable { + String dumpCommand = + "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + advanceDumpDir(); + runFailure(dumpCommand); + return this; + } + WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable { run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); printOutput(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 0d2fafb..0a5ecf9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -24,11 +24,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -255,12 +257,14 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using // Replv2 semantics, i.e. with listFiles laziness (no copy at export time) - // Remove all the entries from the parameters which are added for bootstrap dump progress + // Remove all the entries from the parameters which are added by repl tasks internally. Map parameters = dbObj.getParameters(); if (parameters != null) { Map tmpParameters = new HashMap<>(parameters); tmpParameters.entrySet() - .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)); + .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX) + || e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY) + || e.getKey().equals(ReplChangeManager.SOURCE_OF_REPLICATION)); dbObj.setParameters(tmpParameters); } try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index ce83523..9fdf742 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.thrift.TException; @@ -41,6 +42,13 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi throws SemanticException, IOException { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { + // Remove all the entries from the parameters which are added by repl tasks internally. + Map parameters = partition.getParameters(); + if (parameters != null) { + parameters.entrySet() + .removeIf(e -> e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY)); + } + if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Partition object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 143808b..70f4fed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -53,7 +54,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi } Table tTable = tableHandle.getTTable(); - tTable = addPropertiesToTable(tTable, additionalPropertiesProvider); + tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider); try { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); writer.jsonGenerator @@ -65,7 +66,14 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi } } - private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) { + private Table updatePropertiesInTable(Table table, ReplicationSpec additionalPropertiesProvider) { + // Remove all the entries from the parameters which are added by repl tasks internally. + Map parameters = table.getParameters(); + if (parameters != null) { + parameters.entrySet() + .removeIf(e -> e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY)); + } + if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Table object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump. @@ -87,7 +95,7 @@ private Table addPropertiesToTable(Table table, ReplicationSpec additionalProper // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); // TODO: if we want to be explicit about this dump not being a replication dump, we can - // uncomment this else section, but currently unnneeded. Will require a lot of golden file + // uncomment this else section, but currently unneeded. Will require a lot of golden file // regen if we do so. } return table; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index 00ce977..b59cdf2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; @@ -66,7 +68,9 @@ String key = entry.getKey(); // Ignore the keys which are local to source warehouse if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX) - || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())) { + || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString()) + || key.equals(ReplUtils.REPL_CHECKPOINT_KEY) + || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)) { continue; } newDbProps.put(key, entry.getValue());