diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f9a742d..33afa7a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -74,6 +74,7 @@ static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 5; + static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private DataSource connPool; @@ -128,7 +129,8 @@ public TxnHandler(HiveConf conf) { timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); - retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, + TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); } @@ -330,9 +332,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept Connection dbConn = null; try { dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - List txnids = new ArrayList(1); - txnids.add(txnid); - if (abortTxns(dbConn, txnids) != 1) { + if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new NoSuchTxnException("No such transaction: " + txnid); @@ -1308,8 +1308,6 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException LOG.debug("Going to execute update <" + buf.toString() + ">"); updateCnt = stmt.executeUpdate(buf.toString()); - LOG.debug("Going to commit"); - dbConn.commit(); } finally { closeStmt(stmt); } @@ -1798,10 +1796,10 @@ private void timeOutLocks(Connection dbConn) throws SQLException, MetaException } } - // Abort timed out transactions. This calls abortTxn(), which does a commit, + // Abort timed out transactions. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions on the underlying database. - private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { + private void timeOutTxns(Connection dbConn) throws SQLException, MetaException, RetryException { long now = getDbTime(dbConn); Statement stmt = null; try { @@ -1814,10 +1812,23 @@ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { List deadTxns = new ArrayList(); // Limit the number of timed out transactions we do in one pass to keep from generating a // huge delete statement - for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + do { + deadTxns.clear(); + for (int i = 0; i < TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) { + deadTxns.add(rs.getLong(1)); + } + // We don't care whether all of the transactions get deleted or not, + // if some didn't it most likely means someone else deleted them in the interum + if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + } while (deadTxns.size() > 0); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index e88ce02..5f67555 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -937,16 +937,16 @@ public void heartbeatTxnRangeOneAborted() throws Exception { @Test public void testLockTimeout() throws Exception { long timeout = txnHandler.setTimeout(1); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - Thread.currentThread().sleep(10); try { + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + Thread.currentThread().sleep(10); txnHandler.checkLock(new CheckLockRequest(res.getLockid())); fail("Told there was a lock, when it should have timed out."); } catch (NoSuchLockException e) { @@ -956,6 +956,27 @@ public void testLockTimeout() throws Exception { } @Test + public void testRecoverManyTimeouts() throws Exception { + long timeout = txnHandler.setTimeout(1); + try { + txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); + Thread.currentThread().sleep(10); + txnHandler.getOpenTxns(); + GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); + int numAborted = 0; + for (TxnInfo txnInfo : rsp.getOpen_txns()) { + assertEquals(TxnState.ABORTED, txnInfo.getState()); + numAborted++; + } + assertEquals(503, numAborted); + } finally { + txnHandler.setTimeout(timeout); + } + + + } + + @Test public void testHeartbeatNoLock() throws Exception { HeartbeatRequest h = new HeartbeatRequest(); h.setLockid(29389839L);