diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b020351..5e6bea6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -562,6 +562,82 @@ public void testBootstrapWithConcurrentDropPartition() throws IOException {
}
@Test
+ public void testBootstrapWithConcurrentRename() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name, driver);
+ String replDbName = dbName + "_dupe";
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+ String[] ptn_data = new String[]{ "eleven" , "twelve" };
+ String[] empty = new String[]{};
+ String ptn_locn = new Path(TEST_PATH, name + "_ptn").toUri().getPath();
+
+ createTestDataFile(ptn_locn, ptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
+
+ BehaviourInjection
ptnedTableRenamer = new BehaviourInjection(){
+ boolean success = false;
+
+ @Nullable
+ @Override
+ public Table apply(@Nullable Table table) {
+ if (injectionPathCalled) {
+ nonInjectedPathCalled = true;
+ } else {
+ // getTable is invoked after fetching the table names
+ injectionPathCalled = true;
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Entered new thread");
+ Driver driver2 = new Driver(hconf);
+ SessionState.start(new CliSessionState(hconf));
+ CommandProcessorResponse ret = driver2.run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)");
+ success |= (ret.getException() == null);
+ assertFalse(success);
+ ret = driver2.run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed");
+ success |= (ret.getException() == null);
+ LOG.info("Exit new thread success - {}", success);
+ } catch (CommandNeedRetryException e) {
+ LOG.info("Hit Exception {} from new thread", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ t.start();
+ LOG.info("Created new thread {}", t.getName());
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // After exit from thread, both rename operations should be unsuccessful
+ assertFalse(success);
+ }
+ return table;
+ }
+ };
+ InjectableBehaviourObjectStore.setGetTableBehaviour(ptnedTableRenamer);
+
+ // The intermediate rename would've failed as bootstrap dump in progress
+ bootstrapLoadAndVerify(dbName, replDbName);
+
+ ptnedTableRenamer.assertInjectionsPerformed(true,true);
+ InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour
+
+ // The ptned table should be there in both source and target as rename was not successful
+ verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driver);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror);
+
+ // Verify if Rename after bootstrap is successful
+ run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)", driver);
+ verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClient);
+ run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver);
+ verifyIfTableNotExist(dbName, "ptned", metaStoreClient);
+ verifyRun("SELECT a from " + dbName + ".ptned_renamed WHERE (b=10) ORDER BY a", ptn_data, driver);
+ }
+
+ @Test
public void testIncrementalAdds() throws IOException {
String name = testName.getMethodName();
String dbName = createDB(name, driver);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 185ac1d..17e26f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -150,6 +150,7 @@
import org.apache.hadoop.hive.ql.parse.PreInsertTableDesc;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.AbortTxnsDesc;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
@@ -1139,6 +1140,12 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th
return 0;
}
+ String names[] = Utilities.getDbTableName(tableName);
+ if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+ LOG.error("DDLTask: Rename Partition not allowed as bootstrap dump in progress");
+ throw new HiveException("Rename Partition: Not allowed as bootstrap dump in progress");
+ }
+
Table tbl = db.getTable(tableName);
Partition oldPart = db.getPartition(tbl, oldPartSpec, false);
if (oldPart == null) {
@@ -3577,6 +3584,14 @@ static StringBuilder appendNonNull(StringBuilder builder, Object value, boolean
* Throws this exception if an unexpected error occurs.
*/
private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException {
+ if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
+ String names[] = Utilities.getDbTableName(alterTbl.getOldName());
+ if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+ LOG.error("DDLTask: Rename Table not allowed as bootstrap dump in progress");
+ throw new HiveException("Rename Table: Not allowed as bootstrap dump in progress");
+ }
+ }
+
// alter the table
Table tbl = db.getTable(alterTbl.getOldName());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 34b6737..c1d2956 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -171,7 +171,8 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws Sema
private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
// bootstrap case
- Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+ Hive hiveDb = getHive();
+ Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) {
REPL_STATE_LOG
.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP",
@@ -180,11 +181,14 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
dumpFunctionMetadata(dbName, dumpRoot);
+
+ Utils.setDbBootstrapDumpState(hiveDb, dbName, Utils.ReplDumpState.ACTIVE.toString());
for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
dumpTable(dbName, tblName, dbRoot);
}
+ Utils.setDbBootstrapDumpState(hiveDb, dbName, Utils.ReplDumpState.IDLE.toString());
}
Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
@@ -216,7 +220,8 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
dmd.write();
// Set the correct last repl id to return to the user
- return bootDumpEndReplId;
+ // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap
+ return bootDumpBeginReplId;
}
private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index f40c703..f67fd6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -31,9 +32,29 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class Utils {
+ public static final String BOOTSTRAP_DUMP_STATE_KEY = "bootstrap.dump.state";
+
+ public enum ReplDumpState {
+ IDLE("idle"),
+ ACTIVE("active")
+ ;
+ private final String stateName;
+
+ ReplDumpState(String s) {
+ this.stateName = s;
+ }
+
+ @Override
+ public String toString(){
+ return stateName;
+ }
+ }
+
public static void writeOutput(List values, Path outputFile, HiveConf hiveConf)
throws SemanticException {
DataOutputStream outStream = null;
@@ -74,4 +95,43 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi
return db.getTablesByPattern(dbName, tblPattern);
}
}
+
+ public static void setDbBootstrapDumpState(Hive hiveDb, String dbName, String dumpState) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null) {
+ return;
+ }
+
+ Map newParams = new HashMap<>();
+ newParams.put(BOOTSTRAP_DUMP_STATE_KEY, dumpState);
+ Map params = database.getParameters();
+
+ // if both old params are not null, merge them
+ if (params != null) {
+ params.putAll(newParams);
+ database.setParameters(params);
+ } else {
+ // if one of them is null, replace the old params with the new one
+ database.setParameters(newParams);
+ }
+
+ hiveDb.alterDatabase(dbName, database);
+ return;
+ }
+
+ private static String getDbBootstrapDumpStateFromParameters(Map params) {
+ if ((params != null) && (params.containsKey(BOOTSTRAP_DUMP_STATE_KEY))){
+ return params.get(BOOTSTRAP_DUMP_STATE_KEY);
+ }
+ return null;
+ }
+
+ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null) {
+ return false;
+ }
+ String dumpState = getDbBootstrapDumpStateFromParameters(database.getParameters());
+ return ((dumpState != null) && dumpState.equals(ReplDumpState.ACTIVE.toString()));
+ }
}