diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 83f38fa6a9..3fd734331b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -47,6 +48,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_ROLLBACK_BOOTSTRAP_LOAD_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -534,6 +536,90 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyResults(Arrays.asList("10", "20")); } + @Test + public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { + List loadWithClause = new ArrayList<>(); + loadWithClause.addAll(externalTableBasePathWithClause()); + + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + + WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("create table t3 as select * from t1") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tupleBootstrapWithoutExternal.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("1"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t4 (id int)") + .run("insert into table t4 values (10)") + .run("create table t5 as select * from t4") + .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); + + // Verify if bootstrapping with same dump is idempotent and return same result + for (int i = 0; i < 2; i++) { + replica.load(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tupleIncWithExternalBootstrap.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select id from t4") + .verifyResult("10") + .run("select id from t5") + .verifyResult("10"); + } + + // Drop an external table, add another managed table with same name, insert into existing external table + // and dump another bootstrap dump for external tables. + WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('chennai')") + .run("drop table t2") + .run("create table t2 as select * from t4") + .run("insert into table t4 values (20)") + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + + // Set previous dump as bootstrap to be rolled-back. Now, new bootstrap should overwrite the old one. + loadWithClause.add("'" + REPL_ROLLBACK_BOOTSTRAP_LOAD_CONFIG + "'='" + + tupleIncWithExternalBootstrap.dumpLocation + "'"); + replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("select id from t2") + .verifyResult("10") + .run("select id from t4") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t5") + .verifyResult("10"); + + // Re-bootstrapping from different dump should fail. + tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + loadWithClause.clear(); + loadWithClause.addAll(externalTableBasePathWithClause()); + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7062eda98d..7acf7e2c51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -17,13 +17,19 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; @@ -35,18 +41,25 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; -import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; -import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -279,6 +292,72 @@ a database ( directory ) return 0; } + /** + * Cleanup/drop tables from the given database which are bootstrapped by input dump dir. + * @throws HiveException Failed to drop the tables. + * @throws IOException File operations failure. + * @throws InvalidInputException Invalid input dump directory. + */ + private void bootstrapRollbackTask() throws HiveException, IOException, InvalidInputException { + Path bootstrapDirectory = new PathBuilder(work.bootstrapDumpToRollback) + .addDescendant(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME).build(); + FileSystem fs = bootstrapDirectory.getFileSystem(conf); + + if (!fs.exists(bootstrapDirectory)) { + throw new InvalidInputException("Input bootstrap dump directory to rollback doesn't exist: " + + bootstrapDirectory); + } + + FileStatus[] fileStatuses = fs.listStatus(bootstrapDirectory, EximUtil.getDirectoryFilter(fs)); + if ((fileStatuses == null) || (fileStatuses.length == 0)) { + throw new InvalidInputException("Input bootstrap dump directory to rollback is empty: " + + bootstrapDirectory); + } + + if (StringUtils.isNotBlank(work.dbNameToLoadIn) && (fileStatuses.length > 1)) { + throw new InvalidInputException("Multiple DB dirs in the dump: " + bootstrapDirectory + + " is not allowed to load to single target DB: " + work.dbNameToLoadIn); + } + + for (FileStatus dbDir : fileStatuses) { + Path dbLevelPath = dbDir.getPath(); + String dbNameInDump = dbLevelPath.getName(); + + List tableNames = new ArrayList<>(); + RemoteIterator filesIterator = fs.listFiles(dbLevelPath, true); + while (filesIterator.hasNext()) { + Path nextFile = filesIterator.next().getPath(); + String filePath = nextFile.toString(); + if (filePath.endsWith(EximUtil.METADATA_NAME)) { + // Remove dbLevelPath from the current path to check if this _metadata file is under DB or + // table level directory. + String replacedString = filePath.replace(dbLevelPath.toString(), ""); + if (!replacedString.equalsIgnoreCase(EximUtil.METADATA_NAME)) { + tableNames.add(nextFile.getParent().getName()); + } + } + } + + // No tables listed in the DB level directory to be dropped. + if (tableNames.isEmpty()) { + LOG.info("No tables are listed to be dropped for Database: {} in bootstrap dump: {}", + dbNameInDump, bootstrapDirectory); + continue; + } + + // Drop all tables bootstrapped from previous dump. + // Get the target DB in which previously bootstrapped tables to be dropped. If user specified + // DB name as input in REPL LOAD command, then use it. + String dbName = (StringUtils.isNotBlank(work.dbNameToLoadIn) ? work.dbNameToLoadIn : dbNameInDump); + + Hive db = getHive(); + for (String table : tableNames) { + db.dropTable(dbName + "." + table, true); + } + LOG.info("Database: {} is cleaned for the bootstrap dump: {}", dbName, bootstrapDirectory); + } + } + private void createEndReplLogTask(Context context, Scope scope, ReplLogger replLogger) throws SemanticException { Map dbProps; @@ -366,6 +445,12 @@ private void createBuilderTask(List> rootTasks) { private int executeIncrementalLoad(DriverContext driverContext) { try { + // If user has requested to cleanup any bootstrap dump, then just do it before incremental load. + if (work.isNeedBootstrapRollback) { + bootstrapRollbackTask(); + work.isNeedBootstrapRollback = false; + } + IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 7539281f1f..d2d46fb849 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,6 +44,9 @@ final String dbNameToLoadIn; final String tableNameToLoadIn; final String dumpDirectory; + final String bootstrapDumpToRollback; + boolean isNeedBootstrapRollback; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -65,6 +69,9 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; + this.bootstrapDumpToRollback = hiveConf.get(ReplUtils.REPL_ROLLBACK_BOOTSTRAP_LOAD_CONFIG); + this.isNeedBootstrapRollback = StringUtils.isNotBlank(this.bootstrapDumpToRollback); + rootTask = null; if (isIncrementalDump) { incrementalLoadTasksBuilder = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index cb81dd2130..2874b1e988 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -66,6 +66,10 @@ // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; + // Configuration to be received via WITH clause of REPL LOAD to rollback any previously failed + // bootstrap load. + public static final String REPL_ROLLBACK_BOOTSTRAP_LOAD_CONFIG = "hive.repl.rollback.bootstrap.load"; + public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";