commit 5c9d9d085a8b7012a19cbc2137c0ec78370d4239 Author: Alan Gates Date: Tue Apr 28 14:36:00 2015 -0700 HIVE-10521 Fixed timeOutTxns to timeout all transactions instead of only the first 20 diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f9a742d..a4125d4 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -74,6 +74,7 @@ static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 5; + static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private DataSource connPool; @@ -128,7 +129,8 @@ public TxnHandler(HiveConf conf) { timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); - retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, + TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); } @@ -1814,10 +1816,15 @@ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { List deadTxns = new ArrayList(); // Limit the number of timed out transactions we do in one pass to keep from generating a // huge delete statement - for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + do { + deadTxns.clear(); + for (int i = 0; i < TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) { + deadTxns.add(rs.getLong(1)); + } + // We don't care whether all of the transactions get deleted or not, + // if some didn't it most likely means someone else deleted them in the interum + if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + } while (deadTxns.size() > 0); } finally { closeStmt(stmt); } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index e88ce02..1406191 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -937,16 +938,16 @@ public void heartbeatTxnRangeOneAborted() throws Exception { @Test public void testLockTimeout() throws Exception { long timeout = txnHandler.setTimeout(1); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - Thread.currentThread().sleep(10); try { + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + Thread.currentThread().sleep(10); txnHandler.checkLock(new CheckLockRequest(res.getLockid())); fail("Told there was a lock, when it should have timed out."); } catch (NoSuchLockException e) { @@ -956,6 +957,27 @@ public void testLockTimeout() throws Exception { } @Test + public void testRecoverManyTimeouts() throws Exception { + long timeout = txnHandler.setTimeout(1); + try { + txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); + Thread.currentThread().sleep(10); + txnHandler.getOpenTxns(); + GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); + int numAborted = 0; + for (TxnInfo txnInfo : rsp.getOpen_txns()) { + assertEquals(TxnState.ABORTED, txnInfo.getState()); + numAborted++; + } + assertEquals(503, numAborted); + } finally { + txnHandler.setTimeout(timeout); + } + + + } + + @Test public void testHeartbeatNoLock() throws Exception { HeartbeatRequest h = new HeartbeatRequest(); h.setLockid(29389839L);