diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 83f38fa6a9..b5f14c8ac6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -22,6 +22,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -37,6 +40,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -44,9 +48,11 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -534,6 +540,116 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyResults(Arrays.asList("10", "20")); } + @Test + public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { + List loadWithClause = new ArrayList<>(); + loadWithClause.addAll(externalTableBasePathWithClause()); + + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + + WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("create table t3 as select * from t1") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tupleBootstrapWithoutExternal.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("1"); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t4 (id int)") + .run("insert into table t4 values (10)") + .run("create table t5 as select * from t4") + .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); + + // Fail setting ckpt property for table t4 but success for t2. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (args.tblName.equalsIgnoreCase("t4") && args.dbName.equalsIgnoreCase(replicatedDbName)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName); + return false; + } + return true; + } + }; + + // Fail repl load before the ckpt property is set for t4 and after it is set for t2. + // In the retry, these half baked tables should be dropped and bootstrap should be successful. + InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier); + try { + replica.loadFailure(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetAlterTableModifier(); + } + + // Insert into existing external table and then Drop it, add another managed table with same name + // and dump another bootstrap dump for external tables. + WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('chennai')") + .run("drop table t2") + .run("create table t2 as select * from t4") + .run("insert into table t4 values (20)") + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + + // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid. + // So, REPL LOAD fails. + loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleBootstrapWithoutExternal.dumpLocation + "'"); + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause); + loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleBootstrapWithoutExternal.dumpLocation + "'"); + + // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one. + loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleIncWithExternalBootstrap.dumpLocation + "'"); + + // Verify if bootstrapping with same dump is idempotent and return same result + for (int i = 0; i < 2; i++) { + replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[]{"t1"}) + .run("select id from t2") + .verifyResult("10") + .run("select id from t4") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t5") + .verifyResult("10"); + + // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD + // will also drop those tables which will cause data loss. + loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='" + + tupleIncWithExternalBootstrap.dumpLocation + "'"); + } + + // Re-bootstrapping from different bootstrap dump should fail. + tupleNewIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, tupleIncWithExternalBootstrap.lastReplicationId, dumpWithClause); + loadWithClause.clear(); + loadWithClause.addAll(externalTableBasePathWithClause()); + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7062eda98d..b5e39b8592 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; @@ -32,21 +36,29 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.FSTableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; -import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; -import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -279,6 +291,77 @@ a database ( directory ) return 0; } + /** + * Cleanup/drop tables from the given database which are bootstrapped by input dump dir. + * @throws HiveException Failed to drop the tables. + * @throws IOException File operations failure. + * @throws InvalidInputException Invalid input dump directory. + */ + private void cleanTablesFromBootstrap() throws HiveException, IOException, InvalidInputException { + Path bootstrapDirectory = new PathBuilder(work.bootstrapDumpToCleanTables) + .addDescendant(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME).build(); + FileSystem fs = bootstrapDirectory.getFileSystem(conf); + + if (!fs.exists(bootstrapDirectory)) { + throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is invalid: " + + bootstrapDirectory); + } + + FileStatus[] fileStatuses = fs.listStatus(bootstrapDirectory, EximUtil.getDirectoryFilter(fs)); + if ((fileStatuses == null) || (fileStatuses.length == 0)) { + throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from is empty: " + + bootstrapDirectory); + } + + if (StringUtils.isNotBlank(work.dbNameToLoadIn) && (fileStatuses.length > 1)) { + throw new InvalidInputException("Input bootstrap dump directory specified to clean tables from has multiple" + + " DB dirs in the dump: " + bootstrapDirectory + + " which is not allowed on single target DB: " + work.dbNameToLoadIn); + } + + // Iterate over the DBs and tables listed in the input bootstrap dump directory to clean tables from. + BootstrapEventsIterator bootstrapEventsIterator + = new BootstrapEventsIterator(bootstrapDirectory.toString(), work.dbNameToLoadIn, false, conf); + + // This map will have only one entry if target database is renamed using input DB name from REPL LOAD. + // For multiple DBs case, this map maintains the table names list against each DB. + Map> dbToTblsListMap = new HashMap<>(); + while (bootstrapEventsIterator.hasNext()) { + BootstrapEvent event = bootstrapEventsIterator.next(); + if (event.eventType().equals(BootstrapEvent.EventType.Table)) { + FSTableEvent tableEvent = (FSTableEvent) event; + String dbName = (StringUtils.isBlank(work.dbNameToLoadIn) ? tableEvent.getDbName() : work.dbNameToLoadIn); + List tableNames; + if (dbToTblsListMap.containsKey(dbName)) { + tableNames = dbToTblsListMap.get(dbName); + } else { + tableNames = new ArrayList<>(); + dbToTblsListMap.put(dbName, tableNames); + } + tableNames.add(tableEvent.getTableName()); + } + } + + // No tables listed in the given bootstrap dump directory specified to clean tables. + if (dbToTblsListMap.isEmpty()) { + LOG.info("No DB/tables are listed in the bootstrap dump: {} specified to clean tables.", + bootstrapDirectory); + return; + } + + Hive db = getHive(); + for (Map.Entry> dbEntry : dbToTblsListMap.entrySet()) { + String dbName = dbEntry.getKey(); + List tableNames = dbEntry.getValue(); + + for (String table : tableNames) { + db.dropTable(dbName + "." + table, true); + } + LOG.info("Tables listed in the Database: {} in the bootstrap dump: {} are cleaned", + dbName, bootstrapDirectory); + } + } + private void createEndReplLogTask(Context context, Scope scope, ReplLogger replLogger) throws SemanticException { Map dbProps; @@ -366,6 +449,12 @@ private void createBuilderTask(List> rootTasks) { private int executeIncrementalLoad(DriverContext driverContext) { try { + // If user has requested to cleanup any bootstrap dump, then just do it before incremental load. + if (work.needCleanTablesFromBootstrap) { + cleanTablesFromBootstrap(); + work.needCleanTablesFromBootstrap = false; + } + IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 7539281f1f..c5e083142a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,6 +44,9 @@ final String dbNameToLoadIn; final String tableNameToLoadIn; final String dumpDirectory; + final String bootstrapDumpToCleanTables; + boolean needCleanTablesFromBootstrap; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -65,6 +69,9 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; + this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); + this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables); + rootTask = null; if (isIncrementalDump) { incrementalLoadTasksBuilder = @@ -78,14 +85,15 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { - this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, + true, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; this.constraintsIterator = null; } } else { - this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, true, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); incrementalLoadTasksBuilder = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index ef6e31f2a6..a1580f82bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -75,9 +75,10 @@ private final String dumpDirectory; private final String dbNameToLoadIn; private final HiveConf hiveConf; + private final boolean needLogger; private ReplLogger replLogger; - public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, HiveConf hiveConf) + public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf) throws IOException { Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); @@ -107,6 +108,7 @@ public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, Hive this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; + this.needLogger = needLogger; this.hiveConf = hiveConf; } @@ -116,7 +118,9 @@ public boolean hasNext() { if (currentDatabaseIterator == null) { if (dbEventsIterator.hasNext()) { currentDatabaseIterator = dbEventsIterator.next(); - initReplLogger(); + if (needLogger) { + initReplLogger(); + } } else { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 874edb960d..ae2e1db959 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -106,7 +106,7 @@ private void getSortedFileList(Path eventPath, List fileStatu } } - public Path dbLevelPath() { + Path dbLevelPath() { return this.dbLevelPath; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 4b382f2762..22b6e98d5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -63,6 +63,13 @@ } } + public String getDbName() { + return metadata.getTable().getDbName(); + } + public String getTableName() { + return metadata.getTable().getTableName(); + } + public boolean shouldNotReplicate() { ReplicationSpec spec = replicationSpec(); return spec.isNoop() || !spec.isInReplicationScope(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index cb81dd2130..a5ed840879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -66,6 +66,10 @@ // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; + // Configuration to be received via WITH clause of REPL LOAD to clean tables from any previously failed + // bootstrap load. + public static final String REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG = "hive.repl.clean.tables.from.bootstrap"; + public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";