diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6281208247..daa5820c57 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -671,7 +671,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); } @@ -766,6 +766,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept try { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -786,9 +787,10 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept txnid = targetTxnIds.get(0); } - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if (lockHandle == null) { TxnStatus status = findTxnState(txnid,stmt); - if(status == TxnStatus.ABORTED) { + if (status == TxnStatus.ABORTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); @@ -800,12 +802,14 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } raiseTxnUnexpectedState(status, txnid); } + TxnType txnType = TxnType.findByValue(lockHandle.getInt(2)); + abortTxns(dbConn, Collections.singletonList(txnid), true); if (rqst.isSetReplPolicy()) { deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -819,7 +823,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + close(lockHandle, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -829,25 +833,44 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept @Override @RetrySemantics.Idempotent - public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { - List txnids = rqst.getTxn_ids(); + public void abortTxns(AbortTxnsRequest rqst) throws MetaException { + List txnIds = rqst.getTxn_ids(); try { Connection dbConn = null; + Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - int numAborted = abortTxns(dbConn, txnids, false); - if (numAborted != txnids.size()) { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where "); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "TXN_ID", false, false); + + Map txnInfo = new HashMap<>(); + for (String query : queries) { + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + Long txnId = rs.getLong(1); + TxnType txnType = TxnType.findByValue(rs.getInt(2)); + txnInfo.put(txnId, txnType); + } + } + } + int numAborted = abortTxns(dbConn, txnIds, false); + if (numAborted != txnIds.size()) { LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + - txnids.size() + " transactions. It's possible that the other " + - (txnids.size() - numAborted) + + txnIds.size() + " transactions. It's possible that the other " + + (txnIds.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); } - for (Long txnId : txnids) { - if (transactionalListeners != null) { - MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); - } + for (Long txnId : txnIds) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnInfo.get(txnId)) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); + } } LOG.debug("Going to commit"); dbConn.commit(); @@ -858,6 +881,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (RetryException e) { @@ -1097,7 +1121,7 @@ public void commitTxn(CommitTxnRequest rqst) if (lockHandle == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); - if(actualTxnStatus == TxnStatus.COMMITTED) { + if (actualTxnStatus == TxnStatus.COMMITTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); @@ -1110,9 +1134,8 @@ public void commitTxn(CommitTxnRequest rqst) return; } raiseTxnUnexpectedState(actualTxnStatus, txnid); - shouldNeverHappen(txnid); - //dbConn is rolled back in finally{} } + TxnType txnType = TxnType.findByValue(lockHandle.getInt(2)); String conflictSQLSuffix = null; if (rqst.isSetReplPolicy()) { @@ -1148,7 +1171,7 @@ public void commitTxn(CommitTxnRequest rqst) * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct * even if it includes all of it's columns */ - int numCompsWritten = stmt.executeUpdate( + stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** @@ -1319,7 +1342,7 @@ public void commitTxn(CommitTxnRequest rqst) } } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -2310,9 +2333,10 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * @throws MetaException */ private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); + String query = "select TXN_STATE, TXN_TYPE from TXNS where TXN_ID = " + txnId + + (txnState != null ? " and TXN_STATE=" + quoteChar(txnState) : ""); ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); - if(rs.next()) { + if (rs.next()) { return rs; } close(rs); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index fc08dbcd57..a9243eefbf 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.listener.EventCountNotificationListener; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; @@ -45,6 +46,8 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -319,6 +322,41 @@ public void testTxnTypePersisted() throws Exception { Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY); } + @Test + public void testReplEventForReadOnlyTxn() throws Exception { + long txnId = client.openTxn("me", TxnType.READ_ONLY); + Assert.assertEquals(0, EventCountNotificationListener.getTxnEventCount()); + client.rollbackTxn(txnId); + Assert.assertEquals(0, EventCountNotificationListener.getTxnEventCount()); + + txnId = client.openTxn("me", TxnType.DEFAULT); + Assert.assertEquals(1, EventCountNotificationListener.getTxnEventCount()); + client.rollbackTxn(txnId); + Assert.assertEquals(2, EventCountNotificationListener.getTxnEventCount()); + + EventCountNotificationListener.reset(); + + long txnId1 = client.openTxn("me", TxnType.READ_ONLY); + long txnId2 = client.openTxn("me", TxnType.DEFAULT); + Assert.assertEquals(1, EventCountNotificationListener.getTxnEventCount()); + + client.abortTxns(Collections.singletonList(txnId1)); + Assert.assertEquals(1, EventCountNotificationListener.getTxnEventCount()); + client.abortTxns(Collections.singletonList(txnId2)); + Assert.assertEquals(2, EventCountNotificationListener.getTxnEventCount()); + + EventCountNotificationListener.reset(); + + txnId1 = client.openTxn("me", TxnType.READ_ONLY); + txnId2 = client.openTxn("me", TxnType.DEFAULT); + Assert.assertEquals(1, EventCountNotificationListener.getTxnEventCount()); + + client.commitTxn(txnId1); + Assert.assertEquals(1, EventCountNotificationListener.getTxnEventCount()); + client.commitTxn(txnId2); + Assert.assertEquals(2, EventCountNotificationListener.getTxnEventCount()); + } + @Test public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { thrown.expect(IllegalStateException.class); @@ -331,6 +369,11 @@ public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { @Before public void setUp() throws Exception { conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true); + + EventCountNotificationListener.reset(); + conf.setStrings(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname(), + EventCountNotificationListener.class.getCanonicalName()); + MetaStoreTestUtils.setConfForStandloneMode(conf); TxnDbUtil.setConfValues(conf); TxnDbUtil.prepDb(conf); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/EventCountNotificationListener.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/EventCountNotificationListener.java new file mode 100644 index 0000000000..3bc76f86a0 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/EventCountNotificationListener.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.hive.metastore.listener; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; +import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; +import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; + +import javax.annotation.concurrent.NotThreadSafe; +import java.sql.Connection; + +@NotThreadSafe +public final class EventCountNotificationListener extends TransactionalMetaStoreEventListener { + + private static int txnEvents = 0; + + public EventCountNotificationListener(Configuration config) { + super(config); + } + + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + public static int getTxnEventCount() { + return txnEvents; + } + + public static void reset(){ + txnEvents = 0; + } +} \ No newline at end of file