diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 182a772..c66c6ec 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -29,7 +29,10 @@ import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.After; @@ -53,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -795,6 +799,8 @@ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { .verifyResult(tuple.lastReplicationId) .run("show tables") .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) .run("select country from t2") .verifyResults(Arrays.asList("india", "uk", "us")); @@ -811,4 +817,218 @@ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); } + + @Test + public void testIfBootstrapReplLoadFailWhenRetryWithAnotherDump() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Retry with different dump should fail on non-empty DB but shouldn't impact the state of it. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Retry with same dump with which it was already loaded should be noop internally and return same state. + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Retry from another dump when the database is empty is allowed. + replica.run("drop table t1") + .run("drop table t2") + .load(replicatedDbName, tuple_2.dumpLocation) + .run("repl status " + replicatedDbName) + .verifyResult(tuple_2.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Check if ckpt key-value is updated to new dump location in all the objects created. + Database db = replica.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), tuple_2.dumpLocation); + Table t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptSet(t1.getParameters(), tuple_2.dumpLocation); + Table t2 = replica.getTable(replicatedDbName, "t2"); + verifyIfCkptSet(t2.getParameters(), tuple_2.dumpLocation); + Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tuple_2.dumpLocation); + Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); + verifyIfCkptSet(us.getParameters(), tuple_2.dumpLocation); + Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); + verifyIfCkptSet(uk.getParameters(), tuple_2.dumpLocation); + } + + @Test + public void testBootstrapReplLoadRetryWithAnotherDump() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Retry with different dump should fail on non-empty DB but shouldn't impact the state of it. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Retry with same dump with which it was already loaded should be noop internally and return same state. + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Retry from another dump when the database is empty is allowed. + replica.run("drop table t1") + .run("drop table t2") + .load(replicatedDbName, tuple_2.dumpLocation) + .run("repl status " + replicatedDbName) + .verifyResult(tuple_2.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + // Check if ckpt key-value is updated to new dump location in all the objects created. + Database db = replica.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), tuple_2.dumpLocation); + Table t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptSet(t1.getParameters(), tuple_2.dumpLocation); + Table t2 = replica.getTable(replicatedDbName, "t2"); + verifyIfCkptSet(t2.getParameters(), tuple_2.dumpLocation); + Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tuple_2.dumpLocation); + Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); + verifyIfCkptSet(us.getParameters(), tuple_2.dumpLocation); + Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); + verifyIfCkptSet(uk.getParameters(), tuple_2.dumpLocation); + } + + @Test + public void testBootstrapReplLoadRetryAfterFailure() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". + // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed. + BehaviourInjection getPartitionStub + = new BehaviourInjection(){ + @Nullable + @Override + public Partition apply(@Nullable Partition ptn) { + if (ptn.getValues().get(0).equals("india")) { + injectionPathCalled = true; + LOG.warn("####getPartition Stub called"); + return null; + } + return ptn; + } + }; + InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub); + + replica.loadFailure(replicatedDbName, tuple.dumpLocation); + getPartitionStub.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india")); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 17fd799..2def0c3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -130,6 +131,8 @@ private void initialize(String cmRoot, String warehouseRoot, if (!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); } + hiveConf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, + "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); @@ -249,8 +252,6 @@ WarehouseInstance status(String replicatedDbName, List withClauseOptions } WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable { - runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); - printOutput(); runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); return this; } @@ -323,15 +324,27 @@ private void printOutput() throws IOException { } public Database getDatabase(String dbName) throws Exception { - return client.getDatabase(dbName); + try { + return client.getDatabase(dbName); + } catch (NoSuchObjectException e) { + return null; + } } public Table getTable(String dbName, String tableName) throws Exception { - return client.getTable(dbName, tableName); + try { + return client.getTable(dbName, tableName); + } catch (NoSuchObjectException e) { + return null; + } } public Partition getPartition(String dbName, String tableName, List partValues) throws Exception { - return client.getPartition(dbName, tableName, partValues); + try { + return client.getPartition(dbName, tableName, partValues); + } catch (NoSuchObjectException e) { + return null; + } } ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java index cbec3ad..3e379e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java @@ -20,21 +20,68 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; public class ReplUtils { public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + public enum LoadOpType { + LOAD_NEW, LOAD_SKIP, LOAD_REPLACE, LOAD_INVALID + } + + public static Map> genPartSpecs( + Table table, List> partitions) throws SemanticException { + Map> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if (partitions.size() > 0) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List partitionDesc = new ArrayList<>(); + for (Map ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + partitionDesc.add(expr); + } + } + if (partitionDesc.size() > 0) { + partSpecs.put(partPrefixLength, partitionDesc); + } + return partSpecs; + } + public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) throws SemanticException { ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); @@ -55,4 +102,11 @@ } return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); } + + public static LoadOpType getLoadOpType(Map props, String dumpRoot) { + if (props.containsKey(REPL_CHECKPOINT_KEY)) { + return props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot) ? LoadOpType.LOAD_SKIP : LoadOpType.LOAD_INVALID; + } + return LoadOpType.LOAD_REPLACE; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 97917f8..76fb2a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -112,8 +112,10 @@ a database ( directory ) loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); } work.updateDbEventState(dbEvent.toState()); - scope.database = true; - scope.rootTasks.addAll(dbTracker.tasks()); + if (dbTracker.hasTasks()) { + scope.rootTasks.addAll(dbTracker.tasks()); + scope.database = true; + } dbTracker.debugLog("database"); break; case Table: { @@ -129,11 +131,11 @@ a database ( directory ) LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); tableTracker = loadTable.tasks(); - if (!scope.database) { + setUpDependencies(dbTracker, tableTracker); + if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); scope.table = true; } - setUpDependencies(dbTracker, tableTracker); /* for table replication if we reach the max number of tasks then for the next run we will try to reload the same table again, this is mainly for ease of understanding the code @@ -285,9 +287,15 @@ private void partitionsPostProcessing(BootstrapEventsIterator iterator, This sets up dependencies such that a child task is dependant on the parent to be complete. */ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { - for (Task parentTask : parentTasks.tasks()) { + if (parentTasks.hasTasks()) { + for (Task parentTask : parentTasks.tasks()) { + for (Task childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } else { for (Task childTask : childTasks.tasks()) { - parentTask.addDependentTask(childTask); + parentTasks.addTask(childTask); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 0fabf5a..c13fd61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -54,7 +54,7 @@ } public boolean shouldNotReplicate() { - ReplicationSpec spec = metadata.getReplicationSpec(); + ReplicationSpec spec = replicationSpec(); return spec.isNoop() || !spec.isInReplicationScope(); } @@ -69,7 +69,7 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { Table table = new Table(metadata.getTable()); ImportTableDesc tableDesc = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); - tableDesc.setReplicationSpec(metadata.getReplicationSpec()); + tableDesc.setReplicationSpec(replicationSpec()); return tableDesc; } catch (Exception e) { throw new SemanticException(e); @@ -122,7 +122,7 @@ private AddPartitionDesc partitionDesc(Path fromPath, partDesc.setSortCols(partition.getSd().getSortCols()); partDesc.setLocation(new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); - partsDesc.setReplicationSpec(metadata.getReplicationSpec()); + partsDesc.setReplicationSpec(replicationSpec()); return partsDesc; } catch (Exception e) { throw new SemanticException(e); @@ -131,7 +131,9 @@ private AddPartitionDesc partitionDesc(Path fromPath, @Override public ReplicationSpec replicationSpec() { - return metadata.getReplicationSpec(); + ReplicationSpec replSpec = metadata.getReplicationSpec(); + replSpec.setReplSpecType(ReplicationSpec.Type.BOOTSTRAP_LOAD); + return replSpec; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index c5f2779..d846ffa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.LoadOpType; import java.io.Serializable; import java.util.HashMap; @@ -58,9 +59,27 @@ public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, public TaskTracker tasks() throws SemanticException { try { Database dbInMetadata = readDbMetadata(); - Task dbRootTask = existEmptyDb(dbInMetadata.getName()) - ? alterDbTask(dbInMetadata, context.hiveConf) - : createDbTask(dbInMetadata); + String dbName = dbInMetadata.getName(); + Task dbRootTask = null; + LoadOpType opType = getLoadOpType(dbName); + switch (opType) { + case LOAD_NEW: + dbRootTask = createDbTask(dbInMetadata); + break; + case LOAD_REPLACE: + case LOAD_INVALID: + // LOAD_INVALID: Even if the Db is check-pointed with different dumpDir and if it is empty, + // then should allow load. This is needed for "default" database. + if (existEmptyDb(dbName)) { + dbRootTask = alterDbTask(dbInMetadata); + } else { + throw new InvalidOperationException( + "Database " + dbName + " is not empty. One or more tables/functions exist."); + } + break; + case LOAD_SKIP: + return tracker; + } dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); tracker.addTask(dbRootTask); return tracker; @@ -73,24 +92,26 @@ Database readDbMetadata() throws SemanticException { return event.dbInMetadata(dbNameToLoadIn); } + private LoadOpType getLoadOpType(String dbName) throws InvalidOperationException, HiveException { + Database db = context.hiveDb.getDatabase(dbName); + if (db == null) { + return LoadOpType.LOAD_NEW; + } + return ReplUtils.getLoadOpType(db.getParameters(), context.dumpDirectory); + } + + private boolean existEmptyDb(String dbName) throws HiveException { + List allTables = context.hiveDb.getAllTables(dbName); + List allFunctions = context.hiveDb.getFunctions(dbName, "*"); + return allTables.isEmpty() && allFunctions.isEmpty(); + } + private Task createDbTask(Database dbObj) { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbObj.getName()); createDbDesc.setComment(dbObj.getDescription()); + createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory)); - /* - explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going - to run multiple times and explicit logic is in place which prevents updates to tables when db level - last repl id is set and we create a AlterDatabaseTask at the end of processing a database. - */ - Map parameters = new HashMap<>(dbObj.getParameters()); - parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - - // Add the checkpoint key to the Database binding it to current dump directory. - // So, if retry using same dump, we shall skip Database object update. - parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory); - - createDbDesc.setDatabaseProperties(parameters); // note that we do not set location - for repl load, we want that auto-created. createDbDesc.setIfNotExists(false); // If it exists, we want this to be an error condition. Repl Load is not intended to replace a @@ -100,11 +121,8 @@ Database readDbMetadata() throws SemanticException { return TaskFactory.get(work, context.hiveConf); } - private static Task alterDbTask(Database dbObj, HiveConf hiveConf) { - AlterDatabaseDesc alterDbDesc = - new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null); - DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); - return TaskFactory.get(work, hiveConf); + private Task alterDbTask(Database dbObj) { + return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory), context.hiveConf); } private Task setOwnerInfoTask(Database dbObj) { @@ -115,18 +133,27 @@ Database readDbMetadata() throws SemanticException { return TaskFactory.get(work, context.hiveConf); } - private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { - Database db = context.hiveDb.getDatabase(dbName); - if (db == null) { - return false; - } - List allTables = context.hiveDb.getAllTables(dbName); - List allFunctions = context.hiveDb.getFunctions(dbName, "*"); - if (allTables.isEmpty() && allFunctions.isEmpty()) { - return true; - } - throw new InvalidOperationException( - "Database " + db.getName() + " is not empty. One or more tables/functions exist."); + private static Map updateDbProps(Database dbObj, String dumpDirectory) { + /* + explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going + to run multiple times and explicit logic is in place which prevents updates to tables when db level + last repl id is set and we create a AlterDatabaseTask at the end of processing a database. + */ + Map parameters = new HashMap<>(dbObj.getParameters()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + + // Add the checkpoint key to the Database binding it to current dump directory. + // So, if retry using same dump, we shall skip Database object update. + parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory); + return parameters; + } + + private static Task alterDbTask(String dbName, Map props, + HiveConf hiveConf) { + AlterDatabaseDesc alterDbDesc = + new AlterDatabaseDesc(dbName, props, null); + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); + return TaskFactory.get(work, hiveConf); } public static class AlterDatabase extends LoadDatabase { @@ -138,7 +165,8 @@ public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn @Override public TaskTracker tasks() throws SemanticException { - tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf)); + Database dbObj = readDbMetadata(); + tracker.addTask(alterDbTask(dbObj.getName(), dbObj.getParameters(), context.hiveConf)); return tracker; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 870f70a..1e9f365 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.LoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -37,11 +39,12 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -167,6 +170,13 @@ private ReplicationState initialReplicationState() throws SemanticException { } private TaskTracker forNewTable() throws Exception { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + // This in-turn applicable for partitions creation as well. + if ((parentDb != null) && (!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) { + return tracker; + } + Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc currentPartitionDesc = iterator.next(); @@ -174,13 +184,14 @@ private TaskTracker forNewTable() throws Exception { the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the current retrieved lastReplicatedPartition */ - addPartition(iterator.hasNext(), currentPartitionDesc); + addPartition(iterator.hasNext(), currentPartitionDesc, null); } return tracker; } - private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc) throws Exception { - tracker.addTask(tasksForAddPartition(table, addPartitionDesc)); + private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc, Task ptnRootTask) + throws Exception { + tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask)); if (hasMorePartitions && !tracker.canAddMoreTasks()) { ReplicationState currentReplicationState = new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); @@ -191,29 +202,36 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti /** * returns the root task for adding a partition */ - private Task tasksForAddPartition(Table table, - AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { + private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc, Task ptnRootTask) + throws MetaException, IOException, HiveException { + Task addPartTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), + context.hiveConf + ); + if (event.replicationSpec().isMetadataOnly()) { + if (ptnRootTask == null) { + ptnRootTask = addPartTask; + } else { + ptnRootTask.addDependentTask(addPartTask); + } + return ptnRootTask; + } + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, context.hiveConf ); - - Task addPartTask = TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), - context.hiveConf - ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for @@ -225,11 +243,16 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti context.hiveConf ); + if (ptnRootTask == null) { + ptnRootTask = copyTask; + } else { + ptnRootTask.addDependentTask(copyTask); + } copyTask.addDependentTask(addPartTask); addPartTask.addDependentTask(movePartitionTask); movePartitionTask.addDependentTask(ckptTask); - return copyTask; + return ptnRootTask; } /** @@ -271,17 +294,18 @@ private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartiti } } - private Task alterSinglePartition(AddPartitionDesc desc, - ReplicationSpec replicationSpec, Partition ptn) { - desc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - desc.setReplicationSpec(replicationSpec); + private Task dropPartitionTask(Table table, Map partSpec) throws SemanticException { + Task dropPtnTask = null; + Map> partSpecsExpr = + ReplUtils.genPartSpecs(table, Collections.singletonList(partSpec)); + if (partSpecsExpr.size() > 0) { + DropTableDesc dropPtnDesc = new DropTableDesc(table.getFullyQualifiedName(), + partSpecsExpr, null, true, event.replicationSpec()); + dropPtnTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), dropPtnDesc), context.hiveConf + ); } - desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location - return TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), desc), - context.hiveConf - ); + return dropPtnTask; } private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception { @@ -293,7 +317,6 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t StringUtils.mapToString(lastReplicatedPartSpec)); } - ReplicationSpec replicationSpec = event.replicationSpec(); Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); @@ -304,33 +327,40 @@ private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) t while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Partition ptn = context.hiveDb.getPartition(table, partSpec, false); - if (ptn == null) { - if (!replicationSpec.isMetadataOnly()) { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())) { - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { - tracker.setReplicationState( - new ReplicationState( - new PartitionState(table.getTableName(), addPartitionDesc) - ) - ); - } - } else { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // ignore this ptn, do nothing, not an error. - } + Task ptnRootTask = null; + LoadOpType opType = getLoadOpType(partSpec); + switch (opType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + ptnRootTask = dropPartitionTask(table, partSpec); + break; + case LOAD_INVALID: + throw new InvalidOperationException("Load not allowed on table " + table.getFullyQualifiedName() + + " for partition " + partSpec + + " as it was already bootstrap loaded from another dump."); + case LOAD_SKIP: + continue; } + addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); } return tracker; } + + private LoadOpType getLoadOpType(Map partSpec) throws HiveException { + Partition ptn = context.hiveDb.getPartition(table, partSpec, false); + if (ptn == null) { + // If partition doesn't exist, allow creating a new one only if the table state is older than the update. + assert(table != null); + return event.replicationSpec().allowReplacementInto(table.getParameters()) + ? LoadOpType.LOAD_NEW : LoadOpType.LOAD_SKIP; + } + LoadOpType opType = ReplUtils.getLoadOpType(ptn.getParameters(), context.dumpDirectory); + if (opType == LoadOpType.LOAD_REPLACE) { + return event.replicationSpec().allowReplacementInto(ptn.getParameters()) + ? LoadOpType.LOAD_REPLACE : LoadOpType.LOAD_SKIP; + } + return opType; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index f2b7fa4..b40e7e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -27,17 +28,21 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.LoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -88,7 +93,6 @@ public TaskTracker tasks() throws SemanticException { // Executed if relevant, and used to contain all the other details about the table if not. ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); - ReplicationSpec replicationSpec = event.replicationSpec(); // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying @@ -106,25 +110,25 @@ public TaskTracker tasks() throws SemanticException { } } - if (table == null) { - // If table doesn't exist, allow creating a new one only if the database state is older than the update. - if ((parentDb != null) && (!replicationSpec - .allowReplacementInto(parentDb.getParameters()))) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + Task tblRootTask = null; + ReplUtils.LoadOpType opType = getLoadOpType(table, parentDb); + switch (opType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: // Table exist and need replace + tblRootTask = dropTableTask(table); + break; + case LOAD_INVALID: + throw new InvalidOperationException("Load not allowed on table " + table.getFullyQualifiedName() + + " as it was already bootstrap loaded from another dump."); + case LOAD_SKIP: return tracker; - } - } else { - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. - return tracker; - } } if (tableDesc.getLocation() == null) { tableDesc.setLocation(location(tableDesc, parentDb)); } - /* Note: In the following section, Metadata-only import handling logic is interleaved with regular repl-import logic. The rule of thumb being followed here is that MD-only imports are essentially ALTERs. They do @@ -134,11 +138,7 @@ public TaskTracker tasks() throws SemanticException { or in the case of an unpartitioned table. In all other cases, it should behave like a noop or a pure MD alter. */ - if (table == null) { - newTableTasks(tableDesc); - } else { - existingTableTasks(tableDesc, table, replicationSpec); - } + newTableTasks(tableDesc, tblRootTask); // Set Checkpoint task as dependant to create table task. So, if same dump is retried for // bootstrap, we skip current table update. @@ -160,54 +160,52 @@ public TaskTracker tasks() throws SemanticException { } } - private void existingTableTasks(ImportTableDesc tblDesc, Table table, - ReplicationSpec replicationSpec) { - if (!table.isPartitioned()) { - - LOG.debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - return; // silently return, table is newer than our replacement. - } - - Task alterTableTask = alterTableTask(tblDesc, replicationSpec); - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterTableTask); - } else { - Task loadTableTask = - loadTableTask(table, replicationSpec, table.getDataLocation(), event.metadataPath()); - alterTableTask.addDependentTask(loadTableTask); - tracker.addTask(alterTableTask); - } + private LoadOpType getLoadOpType(Table table, Database parentDb) throws HiveException { + if (table == null) { + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + return ((parentDb == null) || event.replicationSpec().allowReplacementInto(parentDb.getParameters())) + ? LoadOpType.LOAD_NEW : LoadOpType.LOAD_SKIP; + } + LoadOpType opType = ReplUtils.getLoadOpType(table.getParameters(), context.dumpDirectory); + if (opType == LoadOpType.LOAD_REPLACE) { + return event.replicationSpec().allowReplacementInto(table.getParameters()) + ? LoadOpType.LOAD_REPLACE : LoadOpType.LOAD_SKIP; } + return opType; } - private void newTableTasks(ImportTableDesc tblDesc) throws Exception { + private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask) throws Exception { Table table = tblDesc.toTable(context.hiveConf); - // Either we're dropping and re-creating, or the table didn't exist, and we're creating. + ReplicationSpec replicationSpec = event.replicationSpec(); Task createTableTask = tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); - if (event.replicationSpec().isMetadataOnly()) { - tracker.addTask(createTableTask); + if (tblRootTask == null) { + tblRootTask = createTableTask; + } else { + tblRootTask.addDependentTask(createTableTask); + } + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(tblRootTask); return; } Task parentTask = createTableTask; - if (event.replicationSpec().isTransactionalTableDump()) { + if (replicationSpec.isTransactionalTableDump()) { List partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null; ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames, - event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); + replicationSpec.getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); Task replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); - createTableTask.addDependentTask(replTxnTask); + parentTask.addDependentTask(replTxnTask); parentTask = replTxnTask; } if (!isPartitioned(tblDesc)) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); Task loadTableTask = - loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), + loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()), event.metadataPath()); parentTask.addDependentTask(loadTableTask); } - tracker.addTask(createTableTask); + tracker.addTask(tblRootTask); } private String location(ImportTableDesc tblDesc, Database parentDb) @@ -249,12 +247,10 @@ private String location(ImportTableDesc tblDesc, Database parentDb) return copyTask; } - private Task alterTableTask(ImportTableDesc tableDesc, - ReplicationSpec replicationSpec) { - tableDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - tableDesc.setReplicationSpec(replicationSpec); - } - return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + private Task dropTableTask(Table table) { + assert(table != null); + DropTableDesc dropTblDesc = new DropTableDesc(table.getFullyQualifiedName(), table.getTableType(), + true, false, event.replicationSpec()); + return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), context.hiveConf); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 7d901f9..ea6429b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -70,7 +70,7 @@ public String toString(){ public enum SCOPE { NO_REPL, MD_ONLY, REPL } - public enum Type { DEFAULT, INCREMENTAL_DUMP, IMPORT } + public enum Type { DEFAULT, INCREMENTAL_DUMP, BOOTSTRAP_LOAD, IMPORT } /** * Constructor to construct spec based on either the ASTNode that @@ -174,11 +174,11 @@ public boolean allowReplacement(String currReplState, String replacementReplStat long replacementReplStateLong = Long.parseLong(replacementReplState.replaceAll("\\D","")); // Failure handling of IMPORT command and REPL LOAD commands are different. - // IMPORT will set the last repl ID before copying data files and hence need to allow + // IMPORT/BOOTSTRAP_LOAD will set the last repl ID before copying data files and hence need to allow // replacement if loaded from same dump twice after failing to copy in previous attempt. // But, REPL LOAD will set the last repl ID only after the successful copy of data files and // hence need not allow if same event is applied twice. - if (specType == Type.IMPORT) { + if ((specType == Type.IMPORT) || (specType == Type.BOOTSTRAP_LOAD)) { return (currReplStateLong <= replacementReplStateLong); } else { return (currReplStateLong < replacementReplStateLong); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 7281a1c..939884d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -20,21 +20,15 @@ import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +41,7 @@ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; Map> partSpecs = - genPartSpecs(new Table(msg.getTableObj()), + ReplUtils.genPartSpecs(new Table(msg.getTableObj()), msg.getPartitions()); if (partSpecs.size() > 0) { DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, @@ -70,37 +64,4 @@ : new SemanticException("Error reading message members", e); } } - - private Map> genPartSpecs(Table table, - List> partitions) throws SemanticException { - Map> partSpecs = new HashMap<>(); - int partPrefixLength = 0; - if (partitions.size() > 0) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List partitionDesc = new ArrayList<>(); - for (Map ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - partitionDesc.add(expr); - } - } - if (partitionDesc.size() > 0) { - partSpecs.put(partPrefixLength, partitionDesc); - } - return partSpecs; - } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index fdb0dc4..1134f3e 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import static org.junit.Assert.assertEquals; @@ -53,6 +55,8 @@ public void assertInjectionsPerformed( private static com.google.common.base.Function getTableModifier = com.google.common.base.Functions.identity(); + private static com.google.common.base.Function getPartitionModifier = + com.google.common.base.Functions.identity(); private static com.google.common.base.Function, List> listPartitionNamesModifier = com.google.common.base.Functions.identity(); private static com.google.common.base.Function @@ -67,6 +71,15 @@ public static void resetGetTableBehaviour(){ setGetTableBehaviour(null); } + // Methods to set/reset getPartition modifier + public static void setGetPartitionBehaviour(com.google.common.base.Function modifier){ + getPartitionModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; + } + + public static void resetGetPartitionBehaviour(){ + setGetPartitionBehaviour(null); + } + // Methods to set/reset listPartitionNames modifier public static void setListPartitionNamesBehaviour(com.google.common.base.Function, List> modifier){ listPartitionNamesModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; @@ -93,6 +106,12 @@ public Table getTable(String catName, String dbName, String tableName) throws Me } @Override + public Partition getPartition(String catName, String dbName, String tableName, + List part_vals) throws NoSuchObjectException, MetaException { + return getPartitionModifier.apply(super.getPartition(catName, dbName, tableName, part_vals)); + } + + @Override public List listPartitionNames(String catName, String dbName, String tableName, short max) throws MetaException { return listPartitionNamesModifier.apply(super.listPartitionNames(catName, dbName, tableName, max)); }