diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b19c1aa..9667449 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -551,6 +551,81 @@ public void testBootstrapWithConcurrentDropPartition() throws IOException { } @Test + public void testBootstrapWithConcurrentRename() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + String[] ptn_data = new String[]{ "eleven" , "twelve" }; + String[] empty = new String[]{}; + String ptn_locn = new Path(TEST_PATH, name + "_ptn").toUri().getPath(); + + createTestDataFile(ptn_locn, ptn_data); + run("LOAD DATA LOCAL INPATH '" + ptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + + BehaviourInjection ptnedTableRenamer = new BehaviourInjection(){ + boolean success = false; + + @Nullable + @Override + public Table apply(@Nullable Table table) { + if (injectionPathCalled) { + nonInjectedPathCalled = true; + } else { + // getTable is invoked after fetching the table names + injectionPathCalled = true; + Thread t = new Thread(new Runnable() { + public void run() { + try { + LOG.info("Entered new thread"); + Driver driver2 = new Driver(hconf); + SessionState.start(new CliSessionState(hconf)); + CommandProcessorResponse ret = driver2.run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)"); + success = (ret.getException() == null); + assertFalse(success); + ret = driver2.run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed"); + success = (ret.getException() == null); + assertFalse(success); + LOG.info("Exit new thread success - {}", success); + } catch (CommandNeedRetryException e) { + LOG.info("Hit Exception {} from new thread", e.getMessage()); + throw new RuntimeException(e); + } + } + }); + t.start(); + LOG.info("Created new thread {}", t.getName()); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return table; + } + }; + InjectableBehaviourObjectStore.setGetTableBehaviour(ptnedTableRenamer); + + // The intermediate rename would've failed as bootstrap dump in progress + bootstrapLoadAndVerify(dbName, replDbName); + + ptnedTableRenamer.assertInjectionsPerformed(true,true); + InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour + + // The ptned table should be there in both source and target as rename was not successful + verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driver); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror); + + // Verify if Rename after bootstrap is successful + run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)", driver); + verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClient); + run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver); + verifyIfTableNotExist(dbName, "ptned", metaStoreClient); + verifyRun("SELECT a from " + dbName + ".ptned_renamed WHERE (b=10) ORDER BY a", ptn_data, driver); + } + + @Test public void testIncrementalAdds() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acc2390..646bb23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hive.ql.parse.PreInsertTableDesc; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.AbortTxnsDesc; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; @@ -1159,6 +1160,12 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th return 0; } + String names[] = Utilities.getDbTableName(tableName); + if (Utils.isBootstrapDumpInProgress(db, names[0])) { + LOG.error("DDLTask: Rename Partition not allowed as bootstrap dump in progress"); + throw new HiveException("Rename Partition: Not allowed as bootstrap dump in progress"); + } + Table tbl = db.getTable(tableName); Partition oldPart = db.getPartition(tbl, oldPartSpec, false); if (oldPart == null) { @@ -3597,6 +3604,14 @@ static StringBuilder appendNonNull(StringBuilder builder, Object value, boolean * Throws this exception if an unexpected error occurs. */ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { + if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) { + String names[] = Utilities.getDbTableName(alterTbl.getOldName()); + if (Utils.isBootstrapDumpInProgress(db, names[0])) { + LOG.error("DDLTask: Rename Table not allowed as bootstrap dump in progress"); + throw new HiveException("Rename Table: Not allowed as bootstrap dump in progress"); + } + } + // alter the table Table tbl = db.getTable(alterTbl.getOldName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 67a67fd..eace34d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -173,8 +173,9 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws Sema private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case - Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); - for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) { + Hive hiveDb = getHive(); + Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { REPL_STATE_LOG .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName); @@ -182,13 +183,16 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); - for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) { + + String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); } + Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); } - Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); @@ -199,7 +203,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws IMetaStoreClient.NotificationFilter evFilter = new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern); EventUtils.MSClientNotificationFetcher evFetcher = - new EventUtils.MSClientNotificationFetcher(getHive().getMSC()); + new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, bootDumpBeginReplId, Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1, @@ -218,7 +222,8 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws dmd.write(); // Set the correct last repl id to return to the user - return bootDumpEndReplId; + // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap + return bootDumpBeginReplId; } private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 22094c0..c327484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer; @@ -236,6 +237,13 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using // Replv2 semantics, i.e. with listFiles laziness (no copy at export time) + // Remove all the entries from the parameters which are added for bootstrap dump progress + Map parameters = dbObj.getParameters(); + if (parameters != null) { + parameters.entrySet() + .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)); + dbObj.setParameters(parameters); + } try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) { new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index f40c703..df79fef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -31,9 +32,18 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; public class Utils { + public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state."; + + public enum ReplDumpState { + IDLE, ACTIVE + } + public static void writeOutput(List values, Path outputFile, HiveConf hiveConf) throws SemanticException { DataOutputStream outStream = null; @@ -74,4 +84,61 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi return db.getTablesByPattern(dbName, tblPattern); } } + + public static String setDbBootstrapDumpState(Hive hiveDb, String dbName) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database == null) { + return null; + } + + Map newParams = new HashMap<>(); + String uniqueKey = BOOTSTRAP_DUMP_STATE_KEY_PREFIX + UUID.randomUUID().toString(); + newParams.put(uniqueKey, ReplDumpState.ACTIVE.name()); + Map params = database.getParameters(); + + // if both old params are not null, merge them + if (params != null) { + params.putAll(newParams); + database.setParameters(params); + } else { + // if one of them is null, replace the old params with the new one + database.setParameters(newParams); + } + + hiveDb.alterDatabase(dbName, database); + return uniqueKey; + } + + public static void resetDbBootstrapDumpState(Hive hiveDb, String dbName, + String uniqueKey) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database != null) { + Map params = database.getParameters(); + if ((params != null) && params.containsKey(uniqueKey)) { + params.remove(uniqueKey); + database.setParameters(params); + hiveDb.alterDatabase(dbName, database); + } + } + } + + public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) throws HiveException { + Database database = hiveDb.getDatabase(dbName); + if (database == null) { + return false; + } + + Map params = database.getParameters(); + if (params == null) { + return false; + } + + for (String key : params.keySet()) { + if (key.startsWith(BOOTSTRAP_DUMP_STATE_KEY_PREFIX) + && params.get(key).equals(ReplDumpState.ACTIVE.name())) { + return true; + } + } + return false; + } }