diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 5f9d809ab2..3a8cb39113 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -553,6 +554,9 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { @Override public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + if (openTxnEvent.getTxnType() == TxnType.READ_ONLY) { + return; + } int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1; OpenTxnMessage msg = MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0), @@ -571,6 +575,9 @@ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator @Override public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + if (commitTxnEvent.getTxnType() == TxnType.READ_ONLY) { + return; + } CommitTxnMessage msg = MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId()); @@ -588,6 +595,9 @@ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGen @Override public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + if (abortTxnEvent.getTxnType() == TxnType.READ_ONLY) { + return; + } AbortTxnMessage msg = MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId()); NotificationEvent event = diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index edf861f5b3..66bdee1f48 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -25,10 +25,10 @@ import static org.junit.Assert.fail; import java.util.concurrent.TimeUnit; -import java.lang.reflect.Field; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -1057,6 +1058,88 @@ public void dropFunction() throws Exception { testEventCounts(defaultDbName, firstEventId, null, null, 3); } + @Test + public void openTxn() throws Exception { + msClient.openTxn("me", TxnType.READ_ONLY); + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(0, rsp.getEventsSize()); + + msClient.openTxn("me", TxnType.DEFAULT); + rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 1, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.OPEN_TXN.toString(), event.getEventType()); + } + + @Test + public void abortTxn() throws Exception { + long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY); + long txnId2 = msClient.openTxn("me", TxnType.DEFAULT); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + + msClient.abortTxns(Collections.singletonList(txnId1)); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(0, rsp.getEventsSize()); + + msClient.abortTxns(Collections.singletonList(txnId2)); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(1, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.ABORT_TXN.toString(), event.getEventType()); + } + + @Test + public void rollbackTxn() throws Exception { + long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY); + long txnId2 = msClient.openTxn("me", TxnType.DEFAULT); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + + msClient.rollbackTxn(txnId1); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(0, rsp.getEventsSize()); + + msClient.rollbackTxn(txnId2); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(1, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.ABORT_TXN.toString(), event.getEventType()); + } + + @Test + public void commitTxn() throws Exception { + long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY); + long txnId2 = msClient.openTxn("me", TxnType.DEFAULT); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + + msClient.commitTxn(txnId1); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(0, rsp.getEventsSize()); + + msClient.commitTxn(txnId2); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(1, rsp.getEventsSize()); + + NotificationEvent event = rsp.getEvents().get(0); + assertEquals(firstEventId + 2, event.getEventId()); + assertTrue(event.getEventTime() >= startTime); + assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType()); + } + @Test public void insertTable() throws Exception { String defaultDbName = "default"; diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index be37b2a286..01e5b82401 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -210,7 +210,7 @@ public void testAbortTxn() throws Exception { Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); - + gotException = false; try { txnHandler.abortTxn(new AbortTxnRequest(4)); @@ -222,6 +222,19 @@ public void testAbortTxn() throws Exception { Assert.assertTrue(gotException); } + @Test + public void testAbortTxns() throws Exception { + OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + List txnList = openedTxns.getTxn_ids(); + txnHandler.abortTxns(new AbortTxnsRequest(txnList)); + + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(3, txnsInfo.getOpen_txns().size()); + txnsInfo.getOpen_txns().forEach(txn -> + assertEquals(TxnState.ABORTED, txn.getState()) + ); + } + @Test public void testAbortInvalidTxn() throws Exception { boolean caught = false; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java index fe4b97432a..5e00fcc751 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.api.TxnType; /** * AbortTxnEvent @@ -31,15 +32,25 @@ public class AbortTxnEvent extends ListenerEvent { private final Long txnId; + private final TxnType txnType; + + public AbortTxnEvent(Long transactionId, IHMSHandler handler) { + this(transactionId, null, handler); + } + + public AbortTxnEvent(Long transactionId, TxnType txnType) { + this(transactionId, txnType, null); + } /** - * * @param transactionId Unique identification for the transaction that got rolledback. + * @param txnType type of transaction * @param handler handler that is firing the event */ - public AbortTxnEvent(Long transactionId, IHMSHandler handler) { + public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) { super(true, handler); - txnId = transactionId; + this.txnId = transactionId; + this.txnType = txnType; } /** @@ -48,4 +59,11 @@ public AbortTxnEvent(Long transactionId, IHMSHandler handler) { public Long getTxnId() { return txnId; } + + /** + * @return txnType + */ + public TxnType getTxnType() { + return txnType; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java index 84a9a4e811..2a719f2bcb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java @@ -36,6 +36,10 @@ private final String tableName; private final String dbName; + public AllocWriteIdEvent(List txnToWriteIdList, String dbName, String tableName) { + this(txnToWriteIdList, dbName, tableName, null); + } + public AllocWriteIdEvent(List txnToWriteIdList, String dbName, String tableName, IHMSHandler handler) { super(true, handler); this.txnToWriteIdList = txnToWriteIdList; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java index ba382cd175..b357dbb48c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.api.TxnType; /** * CommitTxnEvent @@ -31,15 +32,25 @@ public class CommitTxnEvent extends ListenerEvent { private final Long txnId; + private final TxnType txnType; + + public CommitTxnEvent(Long transactionId, IHMSHandler handler) { + this(transactionId, null, handler); + } + + public CommitTxnEvent(Long transactionId, TxnType txnType) { + this(transactionId, txnType, null); + } /** - * * @param transactionId Unique identification for the transaction just got committed. + * @param txnType type of transaction * @param handler handler that is firing the event */ - public CommitTxnEvent(Long transactionId, IHMSHandler handler) { + public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) { super(true, handler); this.txnId = transactionId; + this.txnType = txnType; } /** @@ -48,4 +59,11 @@ public CommitTxnEvent(Long transactionId, IHMSHandler handler) { public Long getTxnId() { return txnId; } + + /** + * @return txnType + */ + public TxnType getTxnType() { + return txnType; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java index d935ed1af4..289dfd0bb9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.TxnType; + import java.util.List; /** @@ -31,15 +33,27 @@ @InterfaceAudience.Public @InterfaceStability.Stable public class OpenTxnEvent extends ListenerEvent { - private List txnIds; + + private final List txnIds; + private final TxnType txnType; + + public OpenTxnEvent(List txnIds, IHMSHandler handler) { + this(txnIds, null, handler); + } + + public OpenTxnEvent(List txnIds, TxnType txnType) { + this(txnIds, txnType, null); + } /** * @param txnIds List of unique identification for the transaction just opened. + * @param txnType type of transaction * @param handler handler that is firing the event */ - public OpenTxnEvent(List txnIds, IHMSHandler handler) { + public OpenTxnEvent(List txnIds, TxnType txnType, IHMSHandler handler) { super(true, handler); this.txnIds = Lists.newArrayList(txnIds); + this.txnType = txnType; } /** @@ -48,4 +62,11 @@ public OpenTxnEvent(List txnIds, IHMSHandler handler) { public List getTxnIds() { return txnIds; } + + /** + * @return txnType + */ + public TxnType getTxnType() { + return txnType; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5fb6d863d1..3d2641b1d8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -677,7 +677,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); + EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, txnType), dbConn, sqlGenerator); } return txnIds; } finally { @@ -790,9 +790,10 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept txnid = targetTxnIds.get(0); } - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - TxnStatus status = findTxnState(txnid,stmt); - if(status == TxnStatus.ABORTED) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { + TxnStatus status = findTxnState(txnid, stmt); + if (status == TxnStatus.ABORTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); @@ -804,6 +805,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } raiseTxnUnexpectedState(status, txnid); } + abortTxns(dbConn, Collections.singletonList(txnid), true); if (rqst.isSetReplPolicy()) { deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); @@ -811,7 +813,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator); + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator); } LOG.debug("Going to commit"); @@ -833,24 +835,46 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept @Override @RetrySemantics.Idempotent - public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { - List txnids = rqst.getTxn_ids(); + public void abortTxns(AbortTxnsRequest rqst) throws MetaException { + List txnIds = rqst.getTxn_ids(); try { Connection dbConn = null; + Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - int numAborted = abortTxns(dbConn, txnids, false); - if (numAborted != txnids.size()) { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where TXN_STATE = ") + .append(quoteChar(TXN_OPEN)).append(" and TXN_TYPE != ").append(TxnType.READ_ONLY.getValue()) + .append(" and "); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "TXN_ID", false, false); + + Map nonReadOnlyTxns = new HashMap<>(); + for (String query : queries) { + LOG.debug("Going to execute query<" + query + ">"); + try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { + while (rs.next()) { + TxnType txnType = TxnType.findByValue(rs.getInt(2)); + nonReadOnlyTxns.put(rs.getLong(1), txnType); + } + } + } + int numAborted = abortTxns(dbConn, txnIds, false); + if (numAborted != txnIds.size()) { LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + - txnids.size() + " transactions. It's possible that the other " + - (txnids.size() - numAborted) + + txnIds.size() + " transactions. It's possible that the other " + + (txnIds.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); } - for (Long txnId : txnids) { - if (transactionalListeners != null) { + if (transactionalListeners != null){ + for (Long txnId : txnIds) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, + nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY)), dbConn, sqlGenerator); } } LOG.debug("Going to commit"); @@ -862,6 +886,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (RetryException e) { @@ -1065,7 +1090,6 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = null; Statement stmt = null; List insertPreparedStmts = null; - ResultSet lockHandle = null; ResultSet commitIdRs = null, rs; try { lockInternal(); @@ -1097,11 +1121,11 @@ public void commitTxn(CommitTxnRequest rqst) * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if (lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); - if(actualTxnStatus == TxnStatus.COMMITTED) { + if (actualTxnStatus == TxnStatus.COMMITTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); @@ -1114,8 +1138,6 @@ public void commitTxn(CommitTxnRequest rqst) return; } raiseTxnUnexpectedState(actualTxnStatus, txnid); - shouldNeverHappen(txnid); - //dbConn is rolled back in finally{} } String conflictSQLSuffix = null; @@ -1152,7 +1174,7 @@ public void commitTxn(CommitTxnRequest rqst) * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct * even if it includes all of it's columns */ - int numCompsWritten = stmt.executeUpdate( + stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** @@ -1325,7 +1347,7 @@ public void commitTxn(CommitTxnRequest rqst) if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); + EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator); } LOG.debug("Going to commit"); @@ -1343,8 +1365,7 @@ public void commitTxn(CommitTxnRequest rqst) closeStmt(pst); } } - close(commitIdRs); - close(lockHandle, stmt, dbConn); + close(commitIdRs, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -1841,7 +1862,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ALLOC_WRITE_ID, - new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null), + new AllocWriteIdEvent(txnToWriteIds, dbName, tblName), dbConn, sqlGenerator); } @@ -2305,22 +2326,27 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but * to the same value as before thus forcing db to acquire write lock for duration of the transaction. * - * There is no real reason to return the ResultSet here other than to make sure the reference to it - * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock - * to be released. + * SELECT ... FOR UPDATE locks the row until the transaction commits or rolls back. + * Second connection using `SELECT ... FOR UPDATE` will suspend until the lock is released. * @param txnState the state this txn is expected to be in. may be null * @return null if no row was found * @throws SQLException * @throws MetaException */ - private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); - ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); - if(rs.next()) { - return rs; + private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_TYPE from TXNS where TXN_ID = " + txnId + + (txnState != null ? " and TXN_STATE = " + quoteChar(txnState) : ""); + try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { + return rs.next() ? new TxnRecord(rs.getInt(1)) : null; + } + } + + private static final class TxnRecord { + private final TxnType type; + + private TxnRecord(int txnType) { + this.type = TxnType.findByValue(txnType); } - close(rs); - return null; } /** @@ -2340,7 +2366,6 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc PreparedStatement pStmt = null; List insertPreparedStmts = null; ResultSet rs = null; - ResultSet lockHandle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -2348,8 +2373,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if(lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); } @@ -2552,7 +2577,6 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc closeStmt(pst); } } - close(lockHandle); closeStmt(pStmt); close(rs, stmt, null); if (!success) { @@ -3227,15 +3251,14 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; - ResultSet lockHandle = null; List insertPreparedStmts = null; try { try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); - if(lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if (txnRecord == null) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); @@ -3279,7 +3302,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) closeStmt(pst); } } - close(lockHandle, stmt, dbConn); + close(null, stmt, dbConn); unlockInternal(); } } catch (RetryException e) {