diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 83f38fa6a9..d9f4af50b9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -22,6 +22,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -37,6 +40,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -44,9 +48,11 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -534,6 +540,116 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyResults(Arrays.asList("10", "20")); } + @Test + public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { + List loadWithClause = new ArrayList<>(); + loadWithClause.addAll(externalTableBasePathWithClause()); + + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + + WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("create table t3 as select * from t1") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tupleBootstrapWithoutExternal.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("1"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t4 (id int)") + .run("insert into table t4 values (10)") + .run("create table t5 as select * from t4") + .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); + + // Fail setting ckpt property for table t4 but success for t2. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (args.tblName.equalsIgnoreCase("t4") && args.dbName.equalsIgnoreCase(replicatedDbName)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName); + return false; + } + return true; + } + }; + + // Fail repl load before the ckpt property is set for t4 and after it is set for t2. + // In the retry, these half baked tables should be dropped and bootstrap should ve successful. + InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier); + try { + replica.loadFailure(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetAlterTableModifier(); + } + + // Insert into existing external table and then Drop it, add another managed table with same name + // and dump another bootstrap dump for external tables. + WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('chennai')") + .run("drop table t2") + .run("create table t2 as select * from t4") + .run("insert into table t4 values (20)") + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + + // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid. + // So, REPL LOAD fails. + loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleBootstrapWithoutExternal.dumpLocation + "'"); + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause); + loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleBootstrapWithoutExternal.dumpLocation + "'"); + + // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one. + loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleIncWithExternalBootstrap.dumpLocation + "'"); + + // Verify if bootstrapping with same dump is idempotent and return same result + for (int i = 0; i < 2; i++) { + replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("select id from t2") + .verifyResult("10") + .run("select id from t4") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t5") + .verifyResult("10"); + + // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD + // will also drop those tables which will cause data loss. + loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleIncWithExternalBootstrap.dumpLocation + "'"); + } + + // Re-bootstrapping from different bootstrap dump should fail. + tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + loadWithClause.clear(); + loadWithClause.addAll(externalTableBasePathWithClause()); + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7062eda98d..624dea224d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -17,13 +17,19 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; @@ -35,18 +41,25 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; -import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; -import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -279,6 +292,73 @@ a database ( directory ) return 0; } + /** + * Cleanup/drop tables from the given database which are bootstrapped by input dump dir. + * @throws HiveException Failed to drop the tables. + * @throws IOException File operations failure. + * @throws InvalidInputException Invalid input dump directory. + */ + private void cleanTablesFromBootstrap() throws HiveException, IOException, InvalidInputException { + Path bootstrapDirectory = new PathBuilder(work.bootstrapDumpToCleanTables) + .addDescendant(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME).build(); + FileSystem fs = bootstrapDirectory.getFileSystem(conf); + + if (!fs.exists(bootstrapDirectory)) { + throw new InvalidInputException("Input bootstrap dump directory to clean tables is invalid: " + + bootstrapDirectory); + } + + FileStatus[] fileStatuses = fs.listStatus(bootstrapDirectory, EximUtil.getDirectoryFilter(fs)); + if ((fileStatuses == null) || (fileStatuses.length == 0)) { + throw new InvalidInputException("Input bootstrap dump directory to clean tables is empty: " + + bootstrapDirectory); + } + + if (StringUtils.isNotBlank(work.dbNameToLoadIn) && (fileStatuses.length > 1)) { + throw new InvalidInputException("Input bootstrap dump directory to clean tables has multiple" + + " DB dirs in the dump: " + bootstrapDirectory + + " which is not allowed on single target DB: " + work.dbNameToLoadIn); + } + + for (FileStatus dbDir : fileStatuses) { + Path dbLevelPath = dbDir.getPath(); + String dbNameInDump = dbLevelPath.getName(); + + List tableNames = new ArrayList<>(); + RemoteIterator filesIterator = fs.listFiles(dbLevelPath, true); + while (filesIterator.hasNext()) { + Path nextFile = filesIterator.next().getPath(); + String filePath = nextFile.toString(); + if (filePath.endsWith(EximUtil.METADATA_NAME)) { + // Remove dbLevelPath from the current path to check if this _metadata file is under DB or + // table level directory. + String replacedString = filePath.replace(dbLevelPath.toString(), ""); + if (!replacedString.equalsIgnoreCase(EximUtil.METADATA_NAME)) { + tableNames.add(nextFile.getParent().getName()); + } + } + } + + // No tables listed in the DB level directory to be dropped. + if (tableNames.isEmpty()) { + LOG.info("No tables are listed to be dropped for Database: {} in bootstrap dump: {}", + dbNameInDump, bootstrapDirectory); + continue; + } + + // Drop all tables bootstrapped from previous dump. + // Get the target DB in which previously bootstrapped tables to be dropped. If user specified + // DB name as input in REPL LOAD command, then use it. + String dbName = (StringUtils.isNotBlank(work.dbNameToLoadIn) ? work.dbNameToLoadIn : dbNameInDump); + + Hive db = getHive(); + for (String table : tableNames) { + db.dropTable(dbName + "." + table, true); + } + LOG.info("Database: {} is cleaned for the bootstrap dump: {}", dbName, bootstrapDirectory); + } + } + private void createEndReplLogTask(Context context, Scope scope, ReplLogger replLogger) throws SemanticException { Map dbProps; @@ -366,6 +446,12 @@ private void createBuilderTask(List> rootTasks) { private int executeIncrementalLoad(DriverContext driverContext) { try { + // If user has requested to cleanup any bootstrap dump, then just do it before incremental load. + if (work.needCleanTablesFromBootstrap) { + cleanTablesFromBootstrap(); + work.needCleanTablesFromBootstrap = false; + } + IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 7539281f1f..ad167ac27c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,6 +44,9 @@ final String dbNameToLoadIn; final String tableNameToLoadIn; final String dumpDirectory; + final String bootstrapDumpToCleanTables; + boolean needCleanTablesFromBootstrap; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -65,6 +69,9 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; + this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); + this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables); + rootTask = null; if (isIncrementalDump) { incrementalLoadTasksBuilder = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index cb81dd2130..a5ed840879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -66,6 +66,10 @@ // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; + // Configuration to be received via WITH clause of REPL LOAD to clean tables from any previously failed + // bootstrap load. + public static final String REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG = "hive.repl.clean.tables.from.bootstrap"; + public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";