diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 9a2d296..e973628 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -28,6 +28,9 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.junit.rules.TestName; import org.junit.rules.TestRule; @@ -45,6 +48,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import javax.annotation.Nullable; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -345,4 +349,93 @@ public void testTxnEventNonAcid() throws Throwable { .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); } + + @Test + public void testAcidBootstrapReplLoadRetryAfterFailure() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='bob') values(11)") + .run("insert into t2 partition(name='carl') values(10)") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Inject a behavior where REPL LOAD failed when try to load table "t2", it fails. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t1"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + List withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + callerVerifier.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("select id from t1") + .verifyResults(Arrays.asList("1")); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation); + + // Verify if no create table on t1. Only table t2 should be created in retry. + callerVerifier = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t2"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding just constraints for table t4. + replica.load(replicatedDbName, tuple.dumpLocation); + callerVerifier.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("1")) + .run("select name from t2 order by name") + .verifyResults(Arrays.asList("bob", "carl")); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 35437b1..070e497 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -53,11 +56,13 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -436,8 +441,10 @@ public void testBootStrapDumpOfWarehouse() throws Throwable { End of additional steps */ + // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load replica.run("show databases") .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')") .load("", tuple.dumpLocation) .run("show databases") .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) @@ -450,8 +457,9 @@ public void testBootStrapDumpOfWarehouse() throws Throwable { .run("use " + dbTwo) .run("show tables") .verifyResults(new String[] { "t1" }); + /* - Start of cleanup + Start of cleanup */ replica.run("drop database " + primaryDbName + " cascade"); @@ -505,8 +513,10 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { End of additional steps */ + // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load replica.run("show databases") .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')") .load("", bootstrapTuple.dumpLocation) .run("show databases") .verifyResults(new String[] { "default", primaryDbName, dbOne }) @@ -772,6 +782,23 @@ public void shouldNotCreateDirectoryForNonNativeTableInDumpDirectory() throws Th assertFalse(fs.exists(cSerdesTableDumpLocation)); } + private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String dumpDir) throws Exception { + Database db = wh.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), dumpDir); + + List tblNames = wh.getAllTables(dbName); + for (String tblName : tblNames) { + Table tbl = wh.getTable(dbName, tblName); + verifyIfCkptSet(tbl.getParameters(), dumpDir); + if (tbl.getPartitionKeysSize() != 0) { + List partitions = wh.getAllPartitions(dbName, tblName); + for (Partition ptn : partitions) { + verifyIfCkptSet(ptn.getParameters(), dumpDir); + } + } + } + } + private void verifyIfCkptSet(Map props, String dumpDir) { assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); @@ -786,41 +813,6 @@ private void verifyIfSrcOfReplPropMissing(Map props) { } @Test - public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create table t1 (id int)") - .run("insert into table t1 values (10)") - .run("create table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='uk') values ('london')") - .run("insert into table t2 partition(country='us') values ('sfo')") - .dump(primaryDbName, null); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select country from t2") - .verifyResults(Arrays.asList("india", "uk", "us")); - - Database db = replica.getDatabase(replicatedDbName); - verifyIfCkptSet(db.getParameters(), tuple.dumpLocation); - Table t1 = replica.getTable(replicatedDbName, "t1"); - verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation); - Table t2 = replica.getTable(replicatedDbName, "t2"); - verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation); - Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); - verifyIfCkptSet(india.getParameters(), tuple.dumpLocation); - Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); - verifyIfCkptSet(us.getParameters(), tuple.dumpLocation); - Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); - verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); - } - - @Test public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary .run("use " + primaryDbName) @@ -944,4 +936,272 @@ public void testIfCkptPropIgnoredByExport() throws Throwable { replica.run("drop database if exists " + importDbFromReplica + " cascade"); } + + @Test + public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india", "uk", "us")); + verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation); + + // Retry with same dump with which it was already loaded also fails. + replica.loadFailure(replicatedDbName, tuple.dumpLocation); + + // Retry from same dump when the database is empty is also not allowed. + replica.run("drop table t1") + .run("drop table t2") + .loadFailure(replicatedDbName, tuple.dumpLocation); + } + + @Test + public void testBootstrapReplLoadRetryAfterFailureForTablesAndConstraints() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1(a string, b string, primary key (a, b) disable novalidate rely)") + .run("create table t2(a string, b string, foreign key (a, b) references t1(a, b) disable novalidate)") + .run("create table t3(a string, b string not null disable, unique (a) disable)") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Need to drop the primary DB as metastore is shared by both primary/replica. So, constraints + // conflict when loaded. Some issue with framework which needs to be relook into later. + primary.run("drop database if exists " + primaryDbName + " cascade"); + + // Allow create table only on t1. Create should fail for rest of the tables and hence constraints + // also not loaded. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Constraint Table: " + String.valueOf(args.constraintTblName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t1"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded. + List withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1" }); + assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size()); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation); + + // Verify if create table is not called on table t1 but called for t2 and t3. + // Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails. + callerVerifier = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return (args.tblName.equals("t2") || args.tblName.equals("t3")); + } + if (args.constraintTblName != null) { + LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); + return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3")); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it fails when try to load the foreign key constraints. All other constraints are loaded. + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }); + assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size()); + + // Verify if no create table/function calls. Only add foreign key constraints on table t2. + callerVerifier = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Table: " + String.valueOf(args.tblName)); + return false; + } + if (args.constraintTblName != null) { + LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); + return args.constraintTblName.equals("t2"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding just foreign key constraints for table t2. + replica.load(replicatedDbName, tuple.dumpLocation); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }); + assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(2, replica.getForeignKeyList(replicatedDbName, "t2").size()); + } + + @Test + public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .run("CREATE FUNCTION " + primaryDbName + + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". + // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed. + BehaviourInjection getPartitionStub + = new BehaviourInjection() { + @Nullable + @Override + public Partition apply(@Nullable Partition ptn) { + if (ptn.getValues().get(0).equals("india")) { + injectionPathCalled = true; + LOG.warn("####getPartition Stub called"); + return null; + } + return ptn; + } + }; + InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub); + + List withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour + getPartitionStub.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india")) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation); + + // Verify if no create table/function calls. Only add partitions. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null) || (args.funcName != null)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Table: " + String.valueOf(args.tblName) + + " Func: " + String.valueOf(args.funcName)); + return false; + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding remaining partitions. + replica.load(replicatedDbName, tuple.dumpLocation); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(false, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india", "uk", "us")) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 79f145c..fc812ad 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -30,9 +30,19 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; @@ -130,6 +140,8 @@ private void initialize(String cmRoot, String warehouseRoot, if (!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); } + hiveConf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, + "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); @@ -257,12 +269,19 @@ WarehouseInstance status(String replicatedDbName, List withClauseOptions } WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable { - runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); - printOutput(); runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); return this; } + WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List withClauseOptions) + throws Throwable { + String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; + if (!withClauseOptions.isEmpty()) { + replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return runFailure(replLoadCmd); + } + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; @@ -331,15 +350,56 @@ private void printOutput() throws IOException { } public Database getDatabase(String dbName) throws Exception { - return client.getDatabase(dbName); + try { + return client.getDatabase(dbName); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List getAllTables(String dbName) throws Exception { + return client.getAllTables(dbName); } public Table getTable(String dbName, String tableName) throws Exception { - return client.getTable(dbName, tableName); + try { + return client.getTable(dbName, tableName); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List getAllPartitions(String dbName, String tableName) throws Exception { + try { + return client.listPartitions(dbName, tableName, Short.MAX_VALUE); + } catch (NoSuchObjectException e) { + return null; + } } public Partition getPartition(String dbName, String tableName, List partValues) throws Exception { - return client.getPartition(dbName, tableName, partValues); + try { + return client.getPartition(dbName, tableName, partValues); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List getPrimaryKeyList(String dbName, String tblName) throws Exception { + return client.getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } + + public List getForeignKeyList(String dbName, String tblName) throws Exception { + return client.getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } + + public List getUniqueConstraintList(String dbName, String tblName) throws Exception { + return client.getUniqueConstraints(new UniqueConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)); + } + + public List getNotNullConstraintList(String dbName, String tblName) throws Exception { + return client.getNotNullConstraints( + new NotNullConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)); } ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java index cbec3ad..da2c228 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java @@ -18,23 +18,71 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; public class ReplUtils { public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + public enum ReplLoadOpType { + LOAD_NEW, LOAD_SKIP, LOAD_REPLACE + } + + public static Map> genPartSpecs( + Table table, List> partitions) throws SemanticException { + Map> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if (partitions.size() > 0) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List partitionDesc = new ArrayList<>(); + for (Map ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + partitionDesc.add(expr); + } + } + if (partitionDesc.size() > 0) { + partSpecs.put(partPrefixLength, partitionDesc); + } + return partSpecs; + } + public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) throws SemanticException { ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); @@ -55,4 +103,18 @@ } return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); } + + public static boolean replCkptStatus(String dbName, Map props, String dumpRoot) + throws InvalidOperationException { + // If ckpt property not set or empty means, bootstrap is not run on this object. + if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) { + if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) { + return true; + } + throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot + + " is not allowed as the target DB: " + dbName + + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY)); + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 97917f8..76fb2a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -112,8 +112,10 @@ a database ( directory ) loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); } work.updateDbEventState(dbEvent.toState()); - scope.database = true; - scope.rootTasks.addAll(dbTracker.tasks()); + if (dbTracker.hasTasks()) { + scope.rootTasks.addAll(dbTracker.tasks()); + scope.database = true; + } dbTracker.debugLog("database"); break; case Table: { @@ -129,11 +131,11 @@ a database ( directory ) LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); tableTracker = loadTable.tasks(); - if (!scope.database) { + setUpDependencies(dbTracker, tableTracker); + if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); scope.table = true; } - setUpDependencies(dbTracker, tableTracker); /* for table replication if we reach the max number of tasks then for the next run we will try to reload the same table again, this is mainly for ease of understanding the code @@ -285,9 +287,15 @@ private void partitionsPostProcessing(BootstrapEventsIterator iterator, This sets up dependencies such that a child task is dependant on the parent to be complete. */ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { - for (Task parentTask : parentTasks.tasks()) { + if (parentTasks.hasTasks()) { + for (Task parentTask : parentTasks.tasks()) { + for (Task childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } else { for (Task childTask : childTasks.tasks()) { - parentTask.addDependentTask(childTask); + parentTasks.addTask(childTask); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 0fabf5a..d203ae4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -54,7 +54,7 @@ } public boolean shouldNotReplicate() { - ReplicationSpec spec = metadata.getReplicationSpec(); + ReplicationSpec spec = replicationSpec(); return spec.isNoop() || !spec.isInReplicationScope(); } @@ -69,7 +69,7 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { Table table = new Table(metadata.getTable()); ImportTableDesc tableDesc = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); - tableDesc.setReplicationSpec(metadata.getReplicationSpec()); + tableDesc.setReplicationSpec(replicationSpec()); return tableDesc; } catch (Exception e) { throw new SemanticException(e); @@ -122,7 +122,7 @@ private AddPartitionDesc partitionDesc(Path fromPath, partDesc.setSortCols(partition.getSd().getSortCols()); partDesc.setLocation(new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); - partsDesc.setReplicationSpec(metadata.getReplicationSpec()); + partsDesc.setReplicationSpec(replicationSpec()); return partsDesc; } catch (Exception e) { throw new SemanticException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index de17d70..26f4892 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -17,8 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; @@ -50,6 +62,7 @@ private final ConstraintEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; + private final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, TaskTracker existingTracker) { @@ -73,7 +86,7 @@ public TaskTracker tasks() throws IOException, SemanticException { String nnsString = json.getString("nns"); List> tasks = new ArrayList>(); - if (pksString != null && !pksString.isEmpty()) { + if (pksString != null && !pksString.isEmpty() && !isPrimaryKeysAlreadyLoaded(pksString)) { AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -84,7 +97,7 @@ public TaskTracker tasks() throws IOException, SemanticException { context.hiveDb, context.nestedContext, LOG))); } - if (uksString != null && !uksString.isEmpty()) { + if (uksString != null && !uksString.isEmpty() && !isUniqueConstraintsAlreadyLoaded(uksString)) { AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -95,7 +108,7 @@ public TaskTracker tasks() throws IOException, SemanticException { context.hiveDb, context.nestedContext, LOG))); } - if (nnsString != null && !nnsString.isEmpty()) { + if (nnsString != null && !nnsString.isEmpty() && !isNotNullConstraintsAlreadyLoaded(nnsString)) { AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -106,7 +119,7 @@ public TaskTracker tasks() throws IOException, SemanticException { context.hiveDb, context.nestedContext, LOG))); } - if (fksString != null && !fksString.isEmpty()) { + if (fksString != null && !fksString.isEmpty() && !isForeignKeysAlreadyLoaded(fksString)) { AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -124,4 +137,72 @@ public TaskTracker tasks() throws IOException, SemanticException { } } + private boolean isPrimaryKeysAlreadyLoaded(String pksMsgString) throws Exception { + AddPrimaryKeyMessage msg = deserializer.getAddPrimaryKeyMessage(pksMsgString); + List pksInMsg = msg.getPrimaryKeys(); + if (pksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? pksInMsg.get(0).getTable_db() : dbNameToLoadIn; + List pks; + try { + pks = context.hiveDb.getPrimaryKeyList(dbName, pksInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((pks != null) && !pks.isEmpty()); + } + + private boolean isForeignKeysAlreadyLoaded(String fksMsgString) throws Exception { + AddForeignKeyMessage msg = deserializer.getAddForeignKeyMessage(fksMsgString); + List fksInMsg = msg.getForeignKeys(); + if (fksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? fksInMsg.get(0).getFktable_db() : dbNameToLoadIn; + List fks; + try { + fks = context.hiveDb.getForeignKeyList(dbName, fksInMsg.get(0).getFktable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((fks != null) && !fks.isEmpty()); + } + + private boolean isUniqueConstraintsAlreadyLoaded(String uksMsgString) throws Exception { + AddUniqueConstraintMessage msg = deserializer.getAddUniqueConstraintMessage(uksMsgString); + List uksInMsg = msg.getUniqueConstraints(); + if (uksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? uksInMsg.get(0).getTable_db() : dbNameToLoadIn; + List uks; + try { + uks = context.hiveDb.getUniqueConstraintList(dbName, uksInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((uks != null) && !uks.isEmpty()); + } + + private boolean isNotNullConstraintsAlreadyLoaded(String nnsMsgString) throws Exception { + AddNotNullConstraintMessage msg = deserializer.getAddNotNullConstraintMessage(nnsMsgString); + List nnsInMsg = msg.getNotNullConstraints(); + if (nnsInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? nnsInMsg.get(0).getTable_db() : dbNameToLoadIn; + List nns; + try { + nns = context.hiveDb.getNotNullConstraintList(dbName, nnsInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((nns != null) && !nns.isEmpty()); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index c5f2779..b536cb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import java.io.Serializable; import java.util.HashMap; @@ -58,11 +59,21 @@ public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, public TaskTracker tasks() throws SemanticException { try { Database dbInMetadata = readDbMetadata(); - Task dbRootTask = existEmptyDb(dbInMetadata.getName()) - ? alterDbTask(dbInMetadata, context.hiveConf) - : createDbTask(dbInMetadata); - dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); - tracker.addTask(dbRootTask); + String dbName = dbInMetadata.getName(); + Task dbRootTask = null; + ReplLoadOpType loadDbType = getLoadDbType(dbName); + switch (loadDbType) { + case LOAD_NEW: + dbRootTask = createDbTask(dbInMetadata); + break; + case LOAD_REPLACE: + dbRootTask = alterDbTask(dbInMetadata); + break; + } + if (dbRootTask != null) { + dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); + tracker.addTask(dbRootTask); + } return tracker; } catch (Exception e) { throw new SemanticException(e); @@ -73,24 +84,44 @@ Database readDbMetadata() throws SemanticException { return event.dbInMetadata(dbNameToLoadIn); } + private ReplLoadOpType getLoadDbType(String dbName) throws InvalidOperationException, HiveException { + Database db = context.hiveDb.getDatabase(dbName); + if (db == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (isDbAlreadyBootstrapped(db)) { + throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed on Database: " + dbName + + " as it was already done."); + } + if (ReplUtils.replCkptStatus(dbName, db.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; + } + if (isDbEmpty(dbName)) { + return ReplLoadOpType.LOAD_REPLACE; + } + throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed on Database: " + dbName + + " as it is not empty. One or more tables/functions exist."); + } + + private boolean isDbAlreadyBootstrapped(Database db) { + Map props = db.getParameters(); + return ((props != null) + && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()) + && !props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty()); + } + + private boolean isDbEmpty(String dbName) throws HiveException { + List allTables = context.hiveDb.getAllTables(dbName); + List allFunctions = context.hiveDb.getFunctions(dbName, "*"); + return allTables.isEmpty() && allFunctions.isEmpty(); + } + private Task createDbTask(Database dbObj) { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbObj.getName()); createDbDesc.setComment(dbObj.getDescription()); + createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory)); - /* - explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going - to run multiple times and explicit logic is in place which prevents updates to tables when db level - last repl id is set and we create a AlterDatabaseTask at the end of processing a database. - */ - Map parameters = new HashMap<>(dbObj.getParameters()); - parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - - // Add the checkpoint key to the Database binding it to current dump directory. - // So, if retry using same dump, we shall skip Database object update. - parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory); - - createDbDesc.setDatabaseProperties(parameters); // note that we do not set location - for repl load, we want that auto-created. createDbDesc.setIfNotExists(false); // If it exists, we want this to be an error condition. Repl Load is not intended to replace a @@ -100,11 +131,8 @@ Database readDbMetadata() throws SemanticException { return TaskFactory.get(work, context.hiveConf); } - private static Task alterDbTask(Database dbObj, HiveConf hiveConf) { - AlterDatabaseDesc alterDbDesc = - new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null); - DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); - return TaskFactory.get(work, hiveConf); + private Task alterDbTask(Database dbObj) { + return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory), context.hiveConf); } private Task setOwnerInfoTask(Database dbObj) { @@ -115,18 +143,27 @@ Database readDbMetadata() throws SemanticException { return TaskFactory.get(work, context.hiveConf); } - private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { - Database db = context.hiveDb.getDatabase(dbName); - if (db == null) { - return false; - } - List allTables = context.hiveDb.getAllTables(dbName); - List allFunctions = context.hiveDb.getFunctions(dbName, "*"); - if (allTables.isEmpty() && allFunctions.isEmpty()) { - return true; - } - throw new InvalidOperationException( - "Database " + db.getName() + " is not empty. One or more tables/functions exist."); + private static Map updateDbProps(Database dbObj, String dumpDirectory) { + /* + explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going + to run multiple times and explicit logic is in place which prevents updates to tables when db level + last repl id is set and we create a AlterDatabaseTask at the end of processing a database. + */ + Map parameters = new HashMap<>(dbObj.getParameters()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + + // Add the checkpoint key to the Database binding it to current dump directory. + // So, if retry using same dump, we shall skip Database object update. + parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory); + return parameters; + } + + private static Task alterDbTask(String dbName, Map props, + HiveConf hiveConf) { + AlterDatabaseDesc alterDbDesc = + new AlterDatabaseDesc(dbName, props, null); + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); + return TaskFactory.get(work, hiveConf); } public static class AlterDatabase extends LoadDatabase { @@ -138,7 +175,8 @@ public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn @Override public TaskTracker tasks() throws SemanticException { - tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf)); + Database dbObj = readDbMetadata(); + tracker.addTask(alterDbTask(dbObj.getName(), dbObj.getParameters(), context.hiveConf)); return tracker; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index c100344..b886ff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -26,9 +30,11 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.slf4j.Logger; @@ -71,6 +77,9 @@ public TaskTracker tasks() throws IOException, SemanticException { Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); try { + if (isFunctionAlreadyLoaded(fromPath)) { + return tracker; + } CreateFunctionHandler handler = new CreateFunctionHandler(); List> tasks = handler.handle( new MessageHandler.Context( @@ -85,4 +94,21 @@ public TaskTracker tasks() throws IOException, SemanticException { } } + private boolean isFunctionAlreadyLoaded(Path funcDumpRoot) throws HiveException, IOException { + Path metadataPath = new Path(funcDumpRoot, EximUtil.METADATA_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), context.hiveConf); + MetaData metadata = EximUtil.readMetaData(fs, metadataPath); + Function function; + try { + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? metadata.function.getDbName() : dbNameToLoadIn; + function = context.hiveDb.getFunction(dbName, metadata.function.getFunctionName()); + } catch (HiveException e) { + if (e.getCause() instanceof NoSuchObjectException) { + return false; + } + throw e; + } + return (function != null); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 870f70a..549a252 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -37,11 +39,12 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -167,6 +170,13 @@ private ReplicationState initialReplicationState() throws SemanticException { } private TaskTracker forNewTable() throws Exception { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + // This in-turn applicable for partitions creation as well. + if ((parentDb != null) && (!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) { + return tracker; + } + Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc currentPartitionDesc = iterator.next(); @@ -174,13 +184,14 @@ private TaskTracker forNewTable() throws Exception { the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the current retrieved lastReplicatedPartition */ - addPartition(iterator.hasNext(), currentPartitionDesc); + addPartition(iterator.hasNext(), currentPartitionDesc, null); } return tracker; } - private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc) throws Exception { - tracker.addTask(tasksForAddPartition(table, addPartitionDesc)); + private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc, Task ptnRootTask) + throws Exception { + tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask)); if (hasMorePartitions && !tracker.canAddMoreTasks()) { ReplicationState currentReplicationState = new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); @@ -191,29 +202,36 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti /** * returns the root task for adding a partition */ - private Task tasksForAddPartition(Table table, - AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { + private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc, Task ptnRootTask) + throws MetaException, IOException, HiveException { + Task addPartTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), + context.hiveConf + ); + if (event.replicationSpec().isMetadataOnly()) { + if (ptnRootTask == null) { + ptnRootTask = addPartTask; + } else { + ptnRootTask.addDependentTask(addPartTask); + } + return ptnRootTask; + } + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, context.hiveConf ); - - Task addPartTask = TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), - context.hiveConf - ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for @@ -225,11 +243,16 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti context.hiveConf ); + if (ptnRootTask == null) { + ptnRootTask = copyTask; + } else { + ptnRootTask.addDependentTask(copyTask); + } copyTask.addDependentTask(addPartTask); addPartTask.addDependentTask(movePartitionTask); movePartitionTask.addDependentTask(ckptTask); - return copyTask; + return ptnRootTask; } /** @@ -271,17 +294,18 @@ private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartiti } } - private Task alterSinglePartition(AddPartitionDesc desc, - ReplicationSpec replicationSpec, Partition ptn) { - desc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - desc.setReplicationSpec(replicationSpec); + private Task dropPartitionTask(Table table, Map partSpec) throws SemanticException { + Task dropPtnTask = null; + Map> partSpecsExpr = + ReplUtils.genPartSpecs(table, Collections.singletonList(partSpec)); + if (partSpecsExpr.size() > 0) { + DropTableDesc dropPtnDesc = new DropTableDesc(table.getFullyQualifiedName(), + partSpecsExpr, null, true, event.replicationSpec()); + dropPtnTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), dropPtnDesc), context.hiveConf + ); } - desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location - return TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), desc), - context.hiveConf - ); + return dropPtnTask; } private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception { @@ -293,7 +317,6 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t StringUtils.mapToString(lastReplicatedPartSpec)); } - ReplicationSpec replicationSpec = event.replicationSpec(); Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); @@ -304,33 +327,31 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Partition ptn = context.hiveDb.getPartition(table, partSpec, false); - if (ptn == null) { - if (!replicationSpec.isMetadataOnly()) { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())) { - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { - tracker.setReplicationState( - new ReplicationState( - new PartitionState(table.getTableName(), addPartitionDesc) - ) - ); - } - } else { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // ignore this ptn, do nothing, not an error. - } + Task ptnRootTask = null; + ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); + switch (loadPtnType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + ptnRootTask = dropPartitionTask(table, partSpec); + break; + case LOAD_SKIP: + continue; } + addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); } return tracker; } + + private ReplLoadOpType getLoadPartitionType(Map partSpec) throws InvalidOperationException, HiveException { + Partition ptn = context.hiveDb.getPartition(table, partSpec, false); + if (ptn == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (ReplUtils.replCkptStatus(tableContext.dbNameToLoadIn, ptn.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; + } + return ReplLoadOpType.LOAD_REPLACE; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index f2b7fa4..037e443 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -27,17 +28,21 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -88,7 +93,6 @@ public TaskTracker tasks() throws SemanticException { // Executed if relevant, and used to contain all the other details about the table if not. ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); - ReplicationSpec replicationSpec = event.replicationSpec(); // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying @@ -106,25 +110,22 @@ public TaskTracker tasks() throws SemanticException { } } - if (table == null) { - // If table doesn't exist, allow creating a new one only if the database state is older than the update. - if ((parentDb != null) && (!replicationSpec - .allowReplacementInto(parentDb.getParameters()))) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + Task tblRootTask = null; + ReplLoadOpType loadTblType = getLoadTableType(table); + switch (loadTblType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + tblRootTask = dropTableTask(table); + break; + case LOAD_SKIP: return tracker; - } - } else { - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. - return tracker; - } } if (tableDesc.getLocation() == null) { tableDesc.setLocation(location(tableDesc, parentDb)); } - /* Note: In the following section, Metadata-only import handling logic is interleaved with regular repl-import logic. The rule of thumb being followed here is that MD-only imports are essentially ALTERs. They do @@ -134,11 +135,7 @@ public TaskTracker tasks() throws SemanticException { or in the case of an unpartitioned table. In all other cases, it should behave like a noop or a pure MD alter. */ - if (table == null) { - newTableTasks(tableDesc); - } else { - existingTableTasks(tableDesc, table, replicationSpec); - } + newTableTasks(tableDesc, tblRootTask); // Set Checkpoint task as dependant to create table task. So, if same dump is retried for // bootstrap, we skip current table update. @@ -160,54 +157,48 @@ public TaskTracker tasks() throws SemanticException { } } - private void existingTableTasks(ImportTableDesc tblDesc, Table table, - ReplicationSpec replicationSpec) { - if (!table.isPartitioned()) { - - LOG.debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - return; // silently return, table is newer than our replacement. - } - - Task alterTableTask = alterTableTask(tblDesc, replicationSpec); - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterTableTask); - } else { - Task loadTableTask = - loadTableTask(table, replicationSpec, table.getDataLocation(), event.metadataPath()); - alterTableTask.addDependentTask(loadTableTask); - tracker.addTask(alterTableTask); - } + private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException { + if (table == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (ReplUtils.replCkptStatus(table.getDbName(), table.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; } + return ReplLoadOpType.LOAD_REPLACE; } - private void newTableTasks(ImportTableDesc tblDesc) throws Exception { + private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask) throws Exception { Table table = tblDesc.toTable(context.hiveConf); - // Either we're dropping and re-creating, or the table didn't exist, and we're creating. + ReplicationSpec replicationSpec = event.replicationSpec(); Task createTableTask = tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); - if (event.replicationSpec().isMetadataOnly()) { - tracker.addTask(createTableTask); + if (tblRootTask == null) { + tblRootTask = createTableTask; + } else { + tblRootTask.addDependentTask(createTableTask); + } + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(tblRootTask); return; } Task parentTask = createTableTask; - if (event.replicationSpec().isTransactionalTableDump()) { + if (replicationSpec.isTransactionalTableDump()) { List partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null; ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames, - event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); + replicationSpec.getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); Task replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); - createTableTask.addDependentTask(replTxnTask); + parentTask.addDependentTask(replTxnTask); parentTask = replTxnTask; } if (!isPartitioned(tblDesc)) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); Task loadTableTask = - loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), + loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()), event.metadataPath()); parentTask.addDependentTask(loadTableTask); } - tracker.addTask(createTableTask); + tracker.addTask(tblRootTask); } private String location(ImportTableDesc tblDesc, Database parentDb) @@ -249,12 +240,10 @@ private String location(ImportTableDesc tblDesc, Database parentDb) return copyTask; } - private Task alterTableTask(ImportTableDesc tableDesc, - ReplicationSpec replicationSpec) { - tableDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - tableDesc.setReplicationSpec(replicationSpec); - } - return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + private Task dropTableTask(Table table) { + assert(table != null); + DropTableDesc dropTblDesc = new DropTableDesc(table.getFullyQualifiedName(), table.getTableType(), + true, false, event.replicationSpec()); + return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), context.hiveConf); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index d7b3104..356a8c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -352,10 +351,13 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // import job in its place. try { - + assert(path != null); Path loadPath = new Path(path); final FileSystem fs = loadPath.getFileSystem(conf); + // Make fully qualified path for further use. + loadPath = fs.makeQualified(loadPath); + if (!fs.exists(loadPath)) { // supposed dump path does not exist. throw new FileNotFoundException(loadPath.toUri().toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 7281a1c..939884d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -20,21 +20,15 @@ import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +41,7 @@ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; Map> partSpecs = - genPartSpecs(new Table(msg.getTableObj()), + ReplUtils.genPartSpecs(new Table(msg.getTableObj()), msg.getPartitions()); if (partSpecs.size() > 0) { DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, @@ -70,37 +64,4 @@ : new SemanticException("Error reading message members", e); } } - - private Map> genPartSpecs(Table table, - List> partitions) throws SemanticException { - Map> partSpecs = new HashMap<>(); - int partPrefixLength = 0; - if (partitions.size() > 0) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List partitionDesc = new ArrayList<>(); - for (Map ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - partitionDesc.add(expr); - } - } - if (partitionDesc.size() > 0) { - partSpecs.put(partPrefixLength, partitionDesc); - } - return partSpecs; - } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b3a8dd0..55bf14d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -9965,19 +9965,20 @@ private String getPrimaryKeyConstraintName(String catName, String db_name, Strin final String catName, final String parent_db_name_input, final String parent_tbl_name_input, final String foreign_db_name_input, final String foreign_tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { - final String parent_db_name = parent_db_name_input; - final String parent_tbl_name = parent_tbl_name_input; - final String foreign_db_name = foreign_db_name_input; - final String foreign_tbl_name = foreign_tbl_name_input; + final String parent_db_name = (parent_db_name_input != null) ? normalizeIdentifier(parent_db_name_input) : null; + final String parent_tbl_name = (parent_tbl_name_input != null) ? normalizeIdentifier(parent_tbl_name_input) : null; + final String foreign_db_name = (foreign_db_name_input != null) ? normalizeIdentifier(foreign_db_name_input) : null; + final String foreign_tbl_name = (foreign_tbl_name_input != null) + ? normalizeIdentifier(foreign_tbl_name_input) : null; final String db_name; final String tbl_name; if (foreign_tbl_name == null) { // The FK table name might be null if we are retrieving the constraint from the PK side - db_name = parent_db_name_input; - tbl_name = parent_tbl_name_input; + db_name = parent_db_name; + tbl_name = parent_tbl_name; } else { - db_name = foreign_db_name_input; - tbl_name = foreign_tbl_name_input; + db_name = foreign_db_name; + tbl_name = foreign_tbl_name; } return new GetListHelper(catName, db_name, tbl_name, allowSql, allowJdo) { diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index fdb0dc4..d9fe042 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -19,9 +19,15 @@ package org.apache.hadoop.hive.metastore; import java.util.List; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; import static org.junit.Assert.assertEquals; @@ -51,13 +57,30 @@ public void assertInjectionsPerformed( } } + public class CallerArguments { + public String dbName; + public String tblName; + public String funcName; + public String constraintTblName; + + public CallerArguments() { + } + public CallerArguments(String dbName) { + this.dbName = dbName; + } + } + private static com.google.common.base.Function getTableModifier = com.google.common.base.Functions.identity(); + private static com.google.common.base.Function getPartitionModifier = + com.google.common.base.Functions.identity(); private static com.google.common.base.Function, List> listPartitionNamesModifier = com.google.common.base.Functions.identity(); private static com.google.common.base.Function getNextNotificationModifier = com.google.common.base.Functions.identity(); + private static com.google.common.base.Function callerVerifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; @@ -67,6 +90,15 @@ public static void resetGetTableBehaviour(){ setGetTableBehaviour(null); } + // Methods to set/reset getPartition modifier + public static void setGetPartitionBehaviour(com.google.common.base.Function modifier){ + getPartitionModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; + } + + public static void resetGetPartitionBehaviour(){ + setGetPartitionBehaviour(null); + } + // Methods to set/reset listPartitionNames modifier public static void setListPartitionNamesBehaviour(com.google.common.base.Function, List> modifier){ listPartitionNamesModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; @@ -86,6 +118,15 @@ public static void resetGetNextNotificationBehaviour(){ setGetNextNotificationBehaviour(null); } + // Methods to set/reset caller checker + public static void setCallerVerifier(com.google.common.base.Function verifier){ + callerVerifier = verifier; + } + + public static void resetCallerVerifier(){ + setCallerVerifier(null); + } + // ObjectStore methods to be overridden with injected behavior @Override public Table getTable(String catName, String dbName, String tableName) throws MetaException { @@ -93,6 +134,12 @@ public Table getTable(String catName, String dbName, String tableName) throws Me } @Override + public Partition getPartition(String catName, String dbName, String tableName, + List part_vals) throws NoSuchObjectException, MetaException { + return getPartitionModifier.apply(super.getPartition(catName, dbName, tableName, part_vals)); + } + + @Override public List listPartitionNames(String catName, String dbName, String tableName, short max) throws MetaException { return listPartitionNamesModifier.apply(super.listPartitionNames(catName, dbName, tableName, max)); } @@ -101,4 +148,62 @@ public Table getTable(String catName, String dbName, String tableName) throws Me public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { return getNextNotificationModifier.apply(super.getNextNotification(rqst)); } + + @Override + public void createTable(Table tbl) throws InvalidObjectException, MetaException { + if (callerVerifier != null) { + CallerArguments args = new CallerArguments(tbl.getDbName()); + args.tblName = tbl.getTableName(); + Boolean success = callerVerifier.apply(args); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid Create Table operation on DB: " + + args.dbName + " table: " + args.tblName); + } + } + super.createTable(tbl); + } + + @Override + public void createFunction(Function func) throws InvalidObjectException, MetaException { + if (callerVerifier != null) { + CallerArguments args = new CallerArguments(func.getDbName()); + args.funcName = func.getFunctionName(); + Boolean success = callerVerifier.apply(args); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid Create Function operation on DB: " + + args.dbName + " function: " + args.funcName); + } + } + super.createFunction(func); + } + + @Override + public List addPrimaryKeys(List pks) throws InvalidObjectException, + MetaException { + if (callerVerifier != null) { + CallerArguments args = new CallerArguments(pks.get(0).getTable_db()); + args.constraintTblName = pks.get(0).getTable_name(); + Boolean success = callerVerifier.apply(args); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid Add Primary Key operation on DB: " + + args.dbName + " table: " + args.constraintTblName); + } + } + return super.addPrimaryKeys(pks); + } + + @Override + public List addForeignKeys(List fks) throws InvalidObjectException, + MetaException { + if (callerVerifier != null) { + CallerArguments args = new CallerArguments(fks.get(0).getFktable_db()); + args.constraintTblName = fks.get(0).getFktable_name(); + Boolean success = callerVerifier.apply(args); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid Add Foreign Key operation on DB: " + + args.dbName + " table: " + args.constraintTblName); + } + } + return super.addForeignKeys(fks); + } }