diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5fb6d863d1..59f1ad0c4a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1606,13 +1606,10 @@ private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fu quoteString(names[0]), quoteString(names[1])); rs = pst.executeQuery(); if (rs.next()) { - long maxWriteId = rs.getLong(1); - if (maxWriteId > 0) { - writeIdHwm = (writeIdHwm > 0) ? Math.min(maxWriteId, writeIdHwm) : maxWriteId; - } + writeIdHwm = rs.getLong(1); } } - + boolean foundValidUncompactedWrite = false; // As writeIdHwm is known, query all writeIds under the writeId HWM. // If any writeId under HWM is allocated by txn > txnId HWM or belongs to open/aborted txns, // then will be added to invalid list. The results should be sorted in ascending order based @@ -1630,9 +1627,9 @@ private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fu long writeId = rs.getLong(2); if (validTxnList.isTxnValid(txnId)) { // Skip if the transaction under evaluation is already committed. + foundValidUncompactedWrite = true; continue; } - // The current txn is either in open or aborted state. // Mark the write ids state as per the txn state. invalidWriteIdList.add(writeId); @@ -1642,7 +1639,21 @@ private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fu minOpenWriteId = Math.min(minOpenWriteId, writeId); } } + // If we have compacted writes and some invalid writes on the table, + // return the lowest invalid write as a writeIdHwm and set it as invalid. + if (!foundValidUncompactedWrite) { + long writeId = invalidWriteIdList.isEmpty() ? -1 : invalidWriteIdList.get(0); + invalidWriteIdList = new ArrayList<>(); + abortedBits = new BitSet(); + if (writeId != -1) { + invalidWriteIdList.add(writeId); + writeIdHwm = writeId; + if (writeId != minOpenWriteId) { + abortedBits.set(0); + } + } + } ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer); if (minOpenWriteId < Long.MAX_VALUE) { @@ -1658,7 +1669,7 @@ private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fu @Override @RetrySemantics.Idempotent public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws MetaException { List txnIds; String dbName = rqst.getDbName().toLowerCase(); String tblName = rqst.getTableName().toLowerCase(); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index baa6247d2c..1dfc105958 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; @@ -26,13 +27,17 @@ import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,6 +50,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Collections; import java.util.List; /** @@ -341,6 +347,50 @@ public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { client.allocateTableWriteId(txnId, "db", "tbl"); } + @Test + public void testGetValidWriteIds() throws TException { + List tids = client.openTxns("me", 3).getTxn_ids(); + client.allocateTableWriteIdsBatch(tids, "db", "tbl"); + client.rollbackTxn(tids.get(0)); + + ValidTxnList validTxnList = client.getValidTxns(); + String fullTableName = TxnUtils.getFullTableName("db", "tbl"); + + List tableValidWriteIds = client.getValidWriteIds( + Collections.singletonList(fullTableName), validTxnList.writeToString()); + + Assert.assertEquals(tableValidWriteIds.size(), 1); + TableValidWriteIds writeIds = tableValidWriteIds.get(0); + Assert.assertNotNull(writeIds); + + ValidReaderWriteIdList writeIdList = TxnCommonUtils.createValidReaderWriteIdList(writeIds); + Assert.assertNotNull(writeIdList); + + Assert.assertEquals(writeIdList.getInvalidWriteIds().length, 1); + Assert.assertTrue(validTxnList.isTxnAborted(tids.get(0))); + Assert.assertEquals(writeIdList.getHighWatermark(), 1); + Assert.assertEquals(writeIdList.getMinOpenWriteId().longValue(), 2); + + client.commitTxn(tids.get(2)); + validTxnList = client.getValidTxns(); + + tableValidWriteIds = client.getValidWriteIds( + Collections.singletonList(fullTableName), validTxnList.writeToString()); + + Assert.assertEquals(tableValidWriteIds.size(), 1); + writeIds = tableValidWriteIds.get(0); + Assert.assertNotNull(writeIds); + + writeIdList = TxnCommonUtils.createValidReaderWriteIdList(writeIds); + Assert.assertNotNull(writeIdList); + + Assert.assertEquals(writeIdList.getInvalidWriteIds().length, 2); + Assert.assertTrue(validTxnList.isTxnAborted(tids.get(0))); + Assert.assertFalse(validTxnList.isTxnValid(tids.get(1))); + Assert.assertEquals(writeIdList.getHighWatermark(), 3); + Assert.assertEquals(writeIdList.getMinOpenWriteId().longValue(), 2); + } + @Before public void setUp() throws Exception { conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true);