diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index b19c1aa..9667449 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -551,6 +551,81 @@ public void testBootstrapWithConcurrentDropPartition() throws IOException {
}
@Test
+ public void testBootstrapWithConcurrentRename() throws IOException {
+ String name = testName.getMethodName();
+ String dbName = createDB(name, driver);
+ String replDbName = dbName + "_dupe";
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+ String[] ptn_data = new String[]{ "eleven" , "twelve" };
+ String[] empty = new String[]{};
+ String ptn_locn = new Path(TEST_PATH, name + "_ptn").toUri().getPath();
+
+ createTestDataFile(ptn_locn, ptn_data);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
+
+ BehaviourInjection
ptnedTableRenamer = new BehaviourInjection(){
+ boolean success = false;
+
+ @Nullable
+ @Override
+ public Table apply(@Nullable Table table) {
+ if (injectionPathCalled) {
+ nonInjectedPathCalled = true;
+ } else {
+ // getTable is invoked after fetching the table names
+ injectionPathCalled = true;
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Entered new thread");
+ Driver driver2 = new Driver(hconf);
+ SessionState.start(new CliSessionState(hconf));
+ CommandProcessorResponse ret = driver2.run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)");
+ success = (ret.getException() == null);
+ assertFalse(success);
+ ret = driver2.run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed");
+ success = (ret.getException() == null);
+ assertFalse(success);
+ LOG.info("Exit new thread success - {}", success);
+ } catch (CommandNeedRetryException e) {
+ LOG.info("Hit Exception {} from new thread", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ t.start();
+ LOG.info("Created new thread {}", t.getName());
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return table;
+ }
+ };
+ InjectableBehaviourObjectStore.setGetTableBehaviour(ptnedTableRenamer);
+
+ // The intermediate rename would've failed as bootstrap dump in progress
+ bootstrapLoadAndVerify(dbName, replDbName);
+
+ ptnedTableRenamer.assertInjectionsPerformed(true,true);
+ InjectableBehaviourObjectStore.resetGetTableBehaviour(); // reset the behaviour
+
+ // The ptned table should be there in both source and target as rename was not successful
+ verifyRun("SELECT a from " + dbName + ".ptned WHERE (b=1) ORDER BY a", ptn_data, driver);
+ verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE (b=1) ORDER BY a", ptn_data, driverMirror);
+
+ // Verify if Rename after bootstrap is successful
+ run("ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=10)", driver);
+ verifyIfPartitionNotExist(dbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClient);
+ run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver);
+ verifyIfTableNotExist(dbName, "ptned", metaStoreClient);
+ verifyRun("SELECT a from " + dbName + ".ptned_renamed WHERE (b=10) ORDER BY a", ptn_data, driver);
+ }
+
+ @Test
public void testIncrementalAdds() throws IOException {
String name = testName.getMethodName();
String dbName = createDB(name, driver);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index acc2390..646bb23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -150,6 +150,7 @@
import org.apache.hadoop.hive.ql.parse.PreInsertTableDesc;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.AbortTxnsDesc;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
@@ -1159,6 +1160,12 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th
return 0;
}
+ String names[] = Utilities.getDbTableName(tableName);
+ if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+ LOG.error("DDLTask: Rename Partition not allowed as bootstrap dump in progress");
+ throw new HiveException("Rename Partition: Not allowed as bootstrap dump in progress");
+ }
+
Table tbl = db.getTable(tableName);
Partition oldPart = db.getPartition(tbl, oldPartSpec, false);
if (oldPart == null) {
@@ -3597,6 +3604,14 @@ static StringBuilder appendNonNull(StringBuilder builder, Object value, boolean
* Throws this exception if an unexpected error occurs.
*/
private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException {
+ if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
+ String names[] = Utilities.getDbTableName(alterTbl.getOldName());
+ if (Utils.isBootstrapDumpInProgress(db, names[0])) {
+ LOG.error("DDLTask: Rename Table not allowed as bootstrap dump in progress");
+ throw new HiveException("Rename Table: Not allowed as bootstrap dump in progress");
+ }
+ }
+
// alter the table
Table tbl = db.getTable(alterTbl.getOldName());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3ebd3cc..14d8618 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -176,9 +176,9 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws Sema
private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
// bootstrap case
- Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
-
- for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) {
+ Hive hiveDb = getHive();
+ Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
Utils.getAllTables(getHive(), dbName).size(),
@@ -186,14 +186,17 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
replLogger.startLog();
Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
dumpFunctionMetadata(dbName, dumpRoot);
- for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) {
+
+ String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
+ for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
dumpTable(dbName, tblName, dbRoot);
}
+ Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
replLogger.endLog(bootDumpBeginReplId.toString());
}
- Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+ Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
bootDumpEndReplId);
@@ -204,7 +207,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
IMetaStoreClient.NotificationFilter evFilter =
new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern);
EventUtils.MSClientNotificationFetcher evFetcher =
- new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+ new EventUtils.MSClientNotificationFetcher(hiveDb.getMSC());
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
evFetcher, bootDumpBeginReplId,
Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
@@ -223,7 +226,8 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
dmd.write();
// Set the correct last repl id to return to the user
- return bootDumpEndReplId;
+ // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap
+ return bootDumpBeginReplId;
}
private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 40c34bf..2026054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer;
@@ -237,6 +238,13 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database
// If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using
// Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
+ // Remove all the entries from the parameters which are added for bootstrap dump progress
+ Map parameters = dbObj.getParameters();
+ if (parameters != null) {
+ parameters.entrySet()
+ .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX));
+ dbObj.setParameters(parameters);
+ }
try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {
new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index a48a17e..a1da629 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -32,9 +33,18 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
public class Utils {
+ public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state.";
+
+ public enum ReplDumpState {
+ IDLE, ACTIVE
+ }
+
public static void writeOutput(List values, Path outputFile, HiveConf hiveConf)
throws SemanticException {
DataOutputStream outStream = null;
@@ -79,4 +89,61 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi
SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase());
});
}
+
+ public static String setDbBootstrapDumpState(Hive hiveDb, String dbName) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null) {
+ return null;
+ }
+
+ Map newParams = new HashMap<>();
+ String uniqueKey = BOOTSTRAP_DUMP_STATE_KEY_PREFIX + UUID.randomUUID().toString();
+ newParams.put(uniqueKey, ReplDumpState.ACTIVE.name());
+ Map params = database.getParameters();
+
+ // if both old params are not null, merge them
+ if (params != null) {
+ params.putAll(newParams);
+ database.setParameters(params);
+ } else {
+ // if one of them is null, replace the old params with the new one
+ database.setParameters(newParams);
+ }
+
+ hiveDb.alterDatabase(dbName, database);
+ return uniqueKey;
+ }
+
+ public static void resetDbBootstrapDumpState(Hive hiveDb, String dbName,
+ String uniqueKey) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database != null) {
+ Map params = database.getParameters();
+ if ((params != null) && params.containsKey(uniqueKey)) {
+ params.remove(uniqueKey);
+ database.setParameters(params);
+ hiveDb.alterDatabase(dbName, database);
+ }
+ }
+ }
+
+ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null) {
+ return false;
+ }
+
+ Map params = database.getParameters();
+ if (params == null) {
+ return false;
+ }
+
+ for (String key : params.keySet()) {
+ if (key.startsWith(BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
+ && params.get(key).equals(ReplDumpState.ACTIVE.name())) {
+ return true;
+ }
+ }
+ return false;
+ }
}