diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 3040f6c..f074428 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -364,6 +364,85 @@ public void run() { } @Test + public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable { + HiveConf primaryConf = primary.getConf(); + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)"); + + // Perform concurrent write + drop on the acid table t1 when bootstrap dump in progress. Bootstrap + // won't dump the table but the subsequent incremental repl with new table with same name should be seen. + BehaviourInjection callerInjectedBehavior + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (injectionPathCalled) { + nonInjectedPathCalled = true; + } else { + // Insert another row to t1 and drop the table from another txn when bootstrap dump in progress. + injectionPathCalled = true; + Thread t = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Entered new thread"); + IDriver driver = DriverFactory.newDriver(primaryConf); + SessionState.start(new CliSessionState(primaryConf)); + CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)"); + boolean success = (ret.getException() == null); + assertTrue(success); + ret = driver.run("drop table " + primaryDbName + ".t1"); + success = (ret.getException() == null); + assertTrue(success); + LOG.info("Exit new thread success - {}", success, ret.getException()); + } + }); + t.start(); + LOG.info("Created new thread {}", t.getName()); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return true; + } + }; + + InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior); + WarehouseInstance.Tuple bootstrapDump = null; + try { + bootstrapDump = primary.dump(primaryDbName, null); + callerInjectedBehavior.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } + + // Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped. + replica.load(replicatedDbName, bootstrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("show tables") + .verifyResult(null); + + // Create another ACID table with same name and insert a row. It should be properly replicated. + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(100)") + .dump(primaryDbName, bootstrapDump.lastReplicationId); + + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResult("100"); + } + + @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index c0725ad..aa65f35 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -1672,7 +1672,7 @@ public void testReplAllocWriteId() throws Exception { } catch (IllegalStateException e) { failed = true; } - assertTrue(failed); + assertFalse(failed); replAbortTxnForTest(srcTxnIdList, "destdb.*"); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f5e4905..695d545 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1404,7 +1404,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds if (srcTxnIds.size() != txnIds.size()) { // Idempotent case where txn was already closed but gets allocate write id event. // So, just ignore it and return empty list. - LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() + + LOG.info("Idempotent case: Target txn id is missing for source txn id : " + srcTxnIds.toString() + " and repl policy " + rqst.getReplPolicy()); return new AllocateTableWriteIdsResponse(txnToWriteIds); } @@ -1422,10 +1422,6 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds throw new RuntimeException("This should never happen for txnIds: " + txnIds); } - long writeId; - String s; - long allocatedTxnsCount = 0; - long txnId; List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -1440,6 +1436,10 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "t2w_txnid", false, false); + + long allocatedTxnsCount = 0; + long txnId; + long writeId = 0; for (String query : queries) { LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); @@ -1464,10 +1464,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + assert((writeId == 0) && txnToWriteIds.isEmpty()); + if (rqst.isSetReplPolicy()) { + // In replication flow, we always need to allocate write ID equal to that of source. + assert(srcTxnToWriteIds != null); + writeId = srcTxnToWriteIds.get(0).getWriteId(); + } + // There are some txns in the list which does not have write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID - s = sqlGenerator.addForUpdateClause( + String s = sqlGenerator.addForUpdateClause( "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + " and nwi_table = " + quoteString(tblName)); LOG.debug("Going to execute query <" + s + ">"); @@ -1475,13 +1482,14 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds if (!rs.next()) { // First allocation of write id should add the table to the next_write_id meta table // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here - writeId = 1; + writeId = (writeId <= 0) ? 1 : writeId; s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")"; + + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")"; LOG.debug("Going to execute insert <" + s + ">"); stmt.execute(s); } else { - writeId = rs.getLong(1); + writeId = (writeId <= 0) ? rs.getLong(1) : writeId; + // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds) + " where nwi_database = " + quoteString(dbName) @@ -1500,16 +1508,6 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds writeId++; } - if (rqst.isSetReplPolicy()) { - int lastIdx = txnToWriteIds.size()-1; - if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) || - (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) { - LOG.error("Allocated write id range {} is not matching with the input write id range {}.", - txnToWriteIds, srcTxnToWriteIds); - throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds); - } - } - // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids List inserts = sqlGenerator.createInsertValuesStmt( "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);