diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 63b32c83db..9a699db6b3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -399,7 +399,7 @@ public void testOpenTxnEvent() throws Throwable { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); - primary.testEventCounts(primaryDbName, lastReplId, null, null, 22); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 20); // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index be37b2a286..01e5b82401 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -210,7 +210,7 @@ public void testAbortTxn() throws Exception { Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); - + gotException = false; try { txnHandler.abortTxn(new AbortTxnRequest(4)); @@ -222,6 +222,19 @@ public void testAbortTxn() throws Exception { Assert.assertTrue(gotException); } + @Test + public void testAbortTxns() throws Exception { + OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + List txnList = openedTxns.getTxn_ids(); + txnHandler.abortTxns(new AbortTxnsRequest(txnList)); + + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(3, txnsInfo.getOpen_txns().size()); + txnsInfo.getOpen_txns().forEach(txn -> + assertEquals(TxnState.ABORTED, txn.getState()) + ); + } + @Test public void testAbortInvalidTxn() throws Exception { boolean caught = false; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6281208247..df27b48cfa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -671,7 +671,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } } - if (transactionalListeners != null) { + if (transactionalListeners != null && txnType != TxnType.READ_ONLY) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); } @@ -786,9 +786,10 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept txnid = targetTxnIds.get(0); } - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - TxnStatus status = findTxnState(txnid,stmt); - if(status == TxnStatus.ABORTED) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { + TxnStatus status = findTxnState(txnid, stmt); + if (status == TxnStatus.ABORTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); @@ -800,12 +801,13 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } raiseTxnUnexpectedState(status, txnid); } + abortTxns(dbConn, Collections.singletonList(txnid), true); if (rqst.isSetReplPolicy()) { deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - if (transactionalListeners != null) { + if (transactionalListeners != null && txnRecord.type != TxnType.READ_ONLY) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -829,22 +831,41 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept @Override @RetrySemantics.Idempotent - public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { - List txnids = rqst.getTxn_ids(); + public void abortTxns(AbortTxnsRequest rqst) throws MetaException { + List txnIds = rqst.getTxn_ids(); try { Connection dbConn = null; + Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - int numAborted = abortTxns(dbConn, txnids, false); - if (numAborted != txnids.size()) { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder("select TXN_ID from TXNS where TXN_STATE = ") + .append(quoteChar(TXN_OPEN)).append(" and TXN_TYPE != ").append(TxnType.READ_ONLY.getValue()) + .append(" and "); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "TXN_ID", false, false); + + List nonReadOnlyTxns = new ArrayList<>(); + for (String query : queries) { + try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { + while (rs.next()) { + nonReadOnlyTxns.add(rs.getLong(1)); + } + } + } + int numAborted = abortTxns(dbConn, txnIds, false); + if (numAborted != txnIds.size()) { LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + - txnids.size() + " transactions. It's possible that the other " + - (txnids.size() - numAborted) + + txnIds.size() + " transactions. It's possible that the other " + + (txnIds.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); } - for (Long txnId : txnids) { - if (transactionalListeners != null) { + if (transactionalListeners != null){ + for (Long txnId : nonReadOnlyTxns) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); } @@ -858,6 +879,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (RetryException e) { @@ -1061,7 +1083,6 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = null; Statement stmt = null; List insertPreparedStmts = null; - ResultSet lockHandle = null; ResultSet commitIdRs = null, rs; try { lockInternal(); @@ -1093,11 +1114,11 @@ public void commitTxn(CommitTxnRequest rqst) * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if (lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); - if(actualTxnStatus == TxnStatus.COMMITTED) { + if (actualTxnStatus == TxnStatus.COMMITTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); @@ -1110,8 +1131,6 @@ public void commitTxn(CommitTxnRequest rqst) return; } raiseTxnUnexpectedState(actualTxnStatus, txnid); - shouldNeverHappen(txnid); - //dbConn is rolled back in finally{} } String conflictSQLSuffix = null; @@ -1148,7 +1167,7 @@ public void commitTxn(CommitTxnRequest rqst) * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct * even if it includes all of it's columns */ - int numCompsWritten = stmt.executeUpdate( + stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** @@ -1319,7 +1338,7 @@ public void commitTxn(CommitTxnRequest rqst) } } - if (transactionalListeners != null) { + if (transactionalListeners != null && txnRecord.type != TxnType.READ_ONLY) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -1339,8 +1358,7 @@ public void commitTxn(CommitTxnRequest rqst) closeStmt(pst); } } - close(commitIdRs); - close(lockHandle, stmt, dbConn); + close(commitIdRs, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -2301,22 +2319,26 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but * to the same value as before thus forcing db to acquire write lock for duration of the transaction. * - * There is no real reason to return the ResultSet here other than to make sure the reference to it - * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock - * to be released. + * SELECT ... FOR UPDATE locks the row until the transaction commits or rolls back. + * Second connection using `SELECT ... FOR UPDATE` will suspend until the lock is released. * @param txnState the state this txn is expected to be in. may be null * @return null if no row was found * @throws SQLException * @throws MetaException */ - private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); - ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); - if(rs.next()) { - return rs; + private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_TYPE from TXNS where TXN_ID = " + txnId + + (txnState != null ? " and TXN_STATE=" + quoteChar(txnState) : ""); + try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { + return rs.next() ? new TxnRecord(rs.getInt(1)) : null; + } + } + private static final class TxnRecord { + private final TxnType type; + + private TxnRecord(int txnType) { + this.type = TxnType.findByValue(txnType); } - close(rs); - return null; } /** @@ -2336,7 +2358,6 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc PreparedStatement pStmt = null; List insertPreparedStmts = null; ResultSet rs = null; - ResultSet lockHandle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -2344,8 +2365,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); - if(lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (txnRecord == null) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); } @@ -2548,7 +2569,6 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc closeStmt(pst); } } - close(lockHandle); closeStmt(pStmt); close(rs, stmt, null); if (!success) { @@ -3223,15 +3243,14 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; - ResultSet lockHandle = null; List insertPreparedStmts = null; try { try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); - if(lockHandle == null) { + TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if (txnRecord == null) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); @@ -3275,7 +3294,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) closeStmt(pst); } } - close(lockHandle, stmt, dbConn); + close(null, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index fc08dbcd57..384e75148a 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.listener.NotificationEventsCountListener; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; @@ -45,6 +46,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Collections; import java.util.List; /** @@ -319,6 +321,37 @@ public void testTxnTypePersisted() throws Exception { Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY); } + @Test + public void testReplEventForReadOnlyTxn() throws Exception { + long txnId = client.openTxn("me", TxnType.READ_ONLY); + NotificationEventsCountListener.testEventCounts(0, 0); + client.rollbackTxn(txnId); + NotificationEventsCountListener.testEventCounts(0, 0); + + txnId = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(0, 1); + client.rollbackTxn(txnId); + NotificationEventsCountListener.testEventCounts(1, 1); + + long txnId1 = client.openTxn("me", TxnType.READ_ONLY); + long txnId2 = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(2, 1); + + client.abortTxns(Collections.singletonList(txnId1)); + NotificationEventsCountListener.testEventCounts(3, 0); + client.abortTxns(Collections.singletonList(txnId2)); + NotificationEventsCountListener.testEventCounts(3, 1); + + txnId1 = client.openTxn("me", TxnType.READ_ONLY); + txnId2 = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(4, 1); + + client.commitTxn(txnId1); + NotificationEventsCountListener.testEventCounts(5, 0); + client.commitTxn(txnId2); + NotificationEventsCountListener.testEventCounts(5, 1); + } + @Test public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { thrown.expect(IllegalStateException.class); @@ -331,6 +364,11 @@ public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { @Before public void setUp() throws Exception { conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true); + + NotificationEventsCountListener.reset(); + conf.setStrings(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname(), + NotificationEventsCountListener.class.getCanonicalName()); + MetaStoreTestUtils.setConfForStandloneMode(conf); TxnDbUtil.setConfValues(conf); TxnDbUtil.prepDb(conf); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java new file mode 100644 index 0000000000..d4ca709e0d --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.listener; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; +import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; +import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.junit.Assert; + +import javax.annotation.concurrent.NotThreadSafe; +import java.sql.Connection; + +/** + * NotificationEvents counter used in test context. + */ +@NotThreadSafe +public final class NotificationEventsCountListener extends TransactionalMetaStoreEventListener { + + private static long txnEvents = 0; + + public NotificationEventsCountListener(Configuration config) { + super(config); + } + + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + public static void reset(){ + txnEvents = 0; + } + + public static void testEventCounts(long lastReplId, long expectedCount) { + Assert.assertEquals(txnEvents - lastReplId, expectedCount); + } +} \ No newline at end of file