diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 63b32c83db..9a699db6b3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -399,7 +399,7 @@ public void testOpenTxnEvent() throws Throwable { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); - primary.testEventCounts(primaryDbName, lastReplId, null, null, 22); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 20); // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6281208247..9ab87c5e3c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -671,7 +671,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); } @@ -766,6 +766,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept try { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -786,9 +787,10 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept txnid = targetTxnIds.get(0); } - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - TxnStatus status = findTxnState(txnid,stmt); - if(status == TxnStatus.ABORTED) { + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (lockHandle == null) { + TxnStatus status = findTxnState(txnid, stmt); + if (status == TxnStatus.ABORTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); @@ -800,12 +802,14 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } raiseTxnUnexpectedState(status, txnid); } + TxnType txnType = TxnType.findByValue(lockHandle.getInt(2)); + abortTxns(dbConn, Collections.singletonList(txnid), true); if (rqst.isSetReplPolicy()) { deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -819,7 +823,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + close(lockHandle, stmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -829,24 +833,43 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept @Override @RetrySemantics.Idempotent - public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { - List txnids = rqst.getTxn_ids(); + public void abortTxns(AbortTxnsRequest rqst) throws MetaException { + List txnIds = rqst.getTxn_ids(); try { Connection dbConn = null; + Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - int numAborted = abortTxns(dbConn, txnids, false); - if (numAborted != txnids.size()) { + stmt = dbConn.createStatement(); + + List queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where "); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "TXN_ID", false, false); + + Map txnInfo = new HashMap<>(); + for (String query : queries) { + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + Long txnId = rs.getLong(1); + TxnType txnType = TxnType.findByValue(rs.getInt(2)); + txnInfo.put(txnId, txnType); + } + } + } + int numAborted = abortTxns(dbConn, txnIds, false); + if (numAborted != txnIds.size()) { LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + - txnids.size() + " transactions. It's possible that the other " + - (txnids.size() - numAborted) + + txnIds.size() + " transactions. It's possible that the other " + + (txnIds.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); } - for (Long txnId : txnids) { - if (transactionalListeners != null) { + for (Long txnId : txnIds) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnInfo.get(txnId)) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); + EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator); } } LOG.debug("Going to commit"); @@ -858,6 +881,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (RetryException e) { @@ -1097,7 +1121,7 @@ public void commitTxn(CommitTxnRequest rqst) if (lockHandle == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); - if(actualTxnStatus == TxnStatus.COMMITTED) { + if (actualTxnStatus == TxnStatus.COMMITTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); @@ -1110,9 +1134,8 @@ public void commitTxn(CommitTxnRequest rqst) return; } raiseTxnUnexpectedState(actualTxnStatus, txnid); - shouldNeverHappen(txnid); - //dbConn is rolled back in finally{} } + TxnType txnType = TxnType.findByValue(lockHandle.getInt(2)); String conflictSQLSuffix = null; if (rqst.isSetReplPolicy()) { @@ -1148,7 +1171,7 @@ public void commitTxn(CommitTxnRequest rqst) * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct * even if it includes all of it's columns */ - int numCompsWritten = stmt.executeUpdate( + stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** @@ -1319,7 +1342,7 @@ public void commitTxn(CommitTxnRequest rqst) } } - if (transactionalListeners != null) { + if (transactionalListeners != null && TxnType.READ_ONLY != txnType) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); } @@ -2310,9 +2333,10 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * @throws MetaException */ private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); + String query = "select TXN_STATE, TXN_TYPE from TXNS where TXN_ID = " + txnId + + (txnState != null ? " and TXN_STATE=" + quoteChar(txnState) : ""); ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); - if(rs.next()) { + if (rs.next()) { return rs; } close(rs); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index fc08dbcd57..384e75148a 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.listener.NotificationEventsCountListener; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; @@ -45,6 +46,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Collections; import java.util.List; /** @@ -319,6 +321,37 @@ public void testTxnTypePersisted() throws Exception { Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY); } + @Test + public void testReplEventForReadOnlyTxn() throws Exception { + long txnId = client.openTxn("me", TxnType.READ_ONLY); + NotificationEventsCountListener.testEventCounts(0, 0); + client.rollbackTxn(txnId); + NotificationEventsCountListener.testEventCounts(0, 0); + + txnId = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(0, 1); + client.rollbackTxn(txnId); + NotificationEventsCountListener.testEventCounts(1, 1); + + long txnId1 = client.openTxn("me", TxnType.READ_ONLY); + long txnId2 = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(2, 1); + + client.abortTxns(Collections.singletonList(txnId1)); + NotificationEventsCountListener.testEventCounts(3, 0); + client.abortTxns(Collections.singletonList(txnId2)); + NotificationEventsCountListener.testEventCounts(3, 1); + + txnId1 = client.openTxn("me", TxnType.READ_ONLY); + txnId2 = client.openTxn("me", TxnType.DEFAULT); + NotificationEventsCountListener.testEventCounts(4, 1); + + client.commitTxn(txnId1); + NotificationEventsCountListener.testEventCounts(5, 0); + client.commitTxn(txnId2); + NotificationEventsCountListener.testEventCounts(5, 1); + } + @Test public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { thrown.expect(IllegalStateException.class); @@ -331,6 +364,11 @@ public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { @Before public void setUp() throws Exception { conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true); + + NotificationEventsCountListener.reset(); + conf.setStrings(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname(), + NotificationEventsCountListener.class.getCanonicalName()); + MetaStoreTestUtils.setConfForStandloneMode(conf); TxnDbUtil.setConfValues(conf); TxnDbUtil.prepDb(conf); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java new file mode 100644 index 0000000000..d4ca709e0d --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/listener/NotificationEventsCountListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.listener; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; +import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; +import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.junit.Assert; + +import javax.annotation.concurrent.NotThreadSafe; +import java.sql.Connection; + +/** + * NotificationEvents counter used in test context. + */ +@NotThreadSafe +public final class NotificationEventsCountListener extends TransactionalMetaStoreEventListener { + + private static long txnEvents = 0; + + public NotificationEventsCountListener(Configuration config) { + super(config); + } + + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + @Override + public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) { + txnEvents++; + } + + public static void reset(){ + txnEvents = 0; + } + + public static void testEventCounts(long lastReplId, long expectedCount) { + Assert.assertEquals(txnEvents - lastReplId, expectedCount); + } +} \ No newline at end of file