diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index d9a5feb..af92d61 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; @@ -2335,4 +2336,84 @@ public void testFairness2() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); } + + @Test + public void testValidWriteIdListSnapshot() throws Exception { + // Create a transactional table + dropTable(new String[] {"temp.T7"}); + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + // Open a base txn which allocates write ID and then committed. + long baseTxnId = txnMgr.openTxn(ctx, "u0"); + long baseWriteId = txnMgr.getTableWriteId("temp", "T7"); + Assert.assertEquals(1, baseWriteId); + txnMgr.commitTxn(); // committed baseTxnId + + // Open a txn with no writes. + HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + long underHwmOpenTxnId = txnMgr1.openTxn(ctx, "u1"); + Assert.assertTrue("Invalid txn ID",underHwmOpenTxnId > baseTxnId); + + // Open a txn to be tested for ValidWriteIdList. Get the ValidTxnList during open itself. + // Verify the ValidWriteIdList with no open/aborted write txns on this table. Write ID of committed txn should be valid. + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + long testTxnId = txnMgr2.openTxn(ctx, "u2"); + Assert.assertTrue("Invalid txn ID",testTxnId > underHwmOpenTxnId); + String testValidTxns = txnMgr2.getValidTxns().toString(); + ValidWriteIdList testValidWriteIds = txnMgr2.getValidWriteIds(Collections.singletonList("temp.t7"), testValidTxns) + .getTableValidWriteIdList("temp.t7"); + Assert.assertEquals(baseWriteId, testValidWriteIds.getHighWatermark()); + Assert.assertTrue("Invalid write ID list", testValidWriteIds.isWriteIdValid(baseWriteId)); + + // Open a txn which allocate write ID and remain open state. + HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + long aboveHwmOpenTxnId = txnMgr3.openTxn(ctx, "u3"); + Assert.assertTrue("Invalid txn ID",aboveHwmOpenTxnId > testTxnId); + long aboveHwmOpenWriteId = txnMgr3.getTableWriteId("temp", "T7"); + Assert.assertEquals(2, aboveHwmOpenWriteId); + + // Allocate writeId to txn under HWM. This will get Id greater than a txn > HWM. + long underHwmOpenWriteId = txnMgr1.getTableWriteId("temp", "T7"); + Assert.assertEquals(3, underHwmOpenWriteId); + + // Verify the ValidWriteIdList with one open txn on this table. Write ID of open txn should be invalid. + testValidWriteIds = txnMgr2.getValidWriteIds(Collections.singletonList("temp.t7"), testValidTxns) + .getTableValidWriteIdList("temp.t7"); + Assert.assertEquals(underHwmOpenWriteId, testValidWriteIds.getHighWatermark()); + Assert.assertTrue("Invalid write ID list", testValidWriteIds.isWriteIdValid(baseWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(underHwmOpenWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(aboveHwmOpenWriteId)); + + // Commit the txn under HWM. + // Verify the writeId of this committed txn should be invalid for test txn. + txnMgr1.commitTxn(); + testValidWriteIds = txnMgr2.getValidWriteIds(Collections.singletonList("temp.t7"), testValidTxns) + .getTableValidWriteIdList("temp.t7"); + Assert.assertEquals(underHwmOpenWriteId, testValidWriteIds.getHighWatermark()); + Assert.assertTrue("Invalid write ID list", testValidWriteIds.isWriteIdValid(baseWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(underHwmOpenWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(aboveHwmOpenWriteId)); + + // Allocate writeId from test txn and then verify ValidWriteIdList. + // Write Ids of committed and self test txn should be valid but writeId of open txn should be invalid. + // WriteId of recently committed txn which was open when get ValidTxnList snapshot should be invalid as well. + long testWriteId = txnMgr2.getTableWriteId("temp", "T7"); + Assert.assertEquals(4, testWriteId); + + testValidWriteIds = txnMgr2.getValidWriteIds(Collections.singletonList("temp.t7"), testValidTxns) + .getTableValidWriteIdList("temp.t7"); + Assert.assertEquals(testWriteId, testValidWriteIds.getHighWatermark()); + Assert.assertTrue("Invalid write ID list", testValidWriteIds.isWriteIdValid(baseWriteId)); + Assert.assertTrue("Invalid write ID list", testValidWriteIds.isWriteIdValid(testWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(underHwmOpenWriteId)); + Assert.assertFalse("Invalid write ID list", testValidWriteIds.isWriteIdValid(aboveHwmOpenWriteId)); + txnMgr2.commitTxn(); + txnMgr3.commitTxn(); + + cpr = driver.run("drop database if exists temp cascade"); + checkCmdOnDriver(cpr); + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6a74594..8986e73 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -935,20 +935,41 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT // shouldn't read any data long writeIdHwm = 0; List invalidWriteIdList = new ArrayList<>(); + long minOpenWriteId = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); long txnHwm = validTxnList.getHighWatermark(); - // The output includes all the txns which are under the high water mark. It includes + // Find the writeId high water mark based upon txnId high water mark. If found, then, need to + // traverse through all write Ids less than writeId HWM to make exceptions list. + String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]); + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + writeIdHwm = rs.getLong(1); + } + + // The output of below query includes all the txns which are under the high water mark. It includes // the committed transactions as well. The results should be sorted in ascending order based // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up // using binary search. - String s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm - + " and t2w_database = " + quoteString(names[0]) - + " and t2w_table = " + quoteString(names[1]) - + " order by t2w_writeid asc"; + if (writeIdHwm > 0) { + // If writeIdHwm is known, then query all writeIds under the writeId HWM. + // If any writeId under HWM is allocated by txn > txnId HWM, then will be added to invalid list. + s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]) + + " order by t2w_writeid asc"; + } else { + // If writeIdHwm is unknown, then query all writeIds allocated by txns under txns HWM. + s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]) + + " order by t2w_writeid asc"; + } LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); - long minOpenWriteId = Long.MAX_VALUE; - BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); long writeId = rs.getLong(2); @@ -960,11 +981,10 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT // The current txn is either in open or aborted state. // Mark the write ids state as per the txn state. + invalidWriteIdList.add(writeId); if (validTxnList.isTxnAborted(txnId)) { - invalidWriteIdList.add(writeId); abortedBits.set(invalidWriteIdList.size() - 1); } else { - invalidWriteIdList.add(writeId); minOpenWriteId = Math.min(minOpenWriteId, writeId); } }