diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 8ff2195..50d8892 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -529,8 +529,7 @@ public LockResponse checkLock(CheckLockRequest rqst) else { heartbeatLock(dbConn, extLockId); } - closeDbConn(dbConn); - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -1099,6 +1098,10 @@ protected void checkRetryable(Connection conn, LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); } } + else { + //make sure we know we saw an error that we don't recognize + LOG.info("Non-retryable error: " + getMessage(e)); + } } finally { /*if this method ends with anything except a retry signal, the caller should fail the operation @@ -1577,7 +1580,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst) return checkLock(dbConn, extLockId); } catch (NoSuchLockException e) { // This should never happen, as we just added the lock id - throw new MetaException("Couldn't find a lock we just created!"); + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); } finally { close(rs); closeStmt(stmt); @@ -1706,7 +1709,7 @@ private LockResponse checkLock(Connection dbConn, if (index == -1) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new MetaException("How did we get here, we heartbeated our lock before we started!"); + throw new MetaException("How did we get here, we heartbeated our lock before we started! ( " + info + ")"); } @@ -1972,17 +1975,50 @@ private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) // open transactions. private void timeOutLocks(Connection dbConn, long now) { Statement stmt = null; + ResultSet rs = null; try { stmt = dbConn.createStatement(); - // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is + long maxHeartbeatTime = now - timeout; + //doing a SELECT first is less efficient but makes it easier to debug things + String s = "select distinct hl_lock_ext_id from HIVE_LOCKS where hl_last_heartbeat < " + + maxHeartbeatTime + " and hl_txnid = 0";//when txnid is <> 0, the lock is //associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself - LOG.debug("Going to execute update <" + s + ">"); - int deletedLocks = stmt.executeUpdate(s); + List extLockIDs = new ArrayList<>(); + rs = stmt.executeQuery(s); + while(rs.next()) { + extLockIDs.add(rs.getLong(1)); + } + rs.close(); + dbConn.commit(); + if(extLockIDs.size() <= 0) { + return; + } + int deletedLocks = 0; + //include same hl_last_heartbeat condition in case someone heartbeated since the select + s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + maxHeartbeatTime + " and hl_txnid = 0" + + " and hl_lock_ext_id IN ("; + int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; + for(int i = 0; i < numWholeBatches; i++) { + StringBuilder sb = new StringBuilder(s); + for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j++) { + sb.append(extLockIDs.get(j)).append(","); + } + sb.setCharAt(sb.length() - 1, ')'); + LOG.debug("Removing expired locks via: " + sb.toString()); + deletedLocks += stmt.executeUpdate(sb.toString()); + dbConn.commit(); + } + StringBuilder sb = new StringBuilder(s); + for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < extLockIDs.size(); i++) { + sb.append(extLockIDs.get(i)).append(","); + } + sb.setCharAt(sb.length() - 1, ')'); + LOG.debug("Removing expired locks via: " + sb.toString()); + deletedLocks += stmt.executeUpdate(sb.toString()); if(deletedLocks > 0) { - LOG.info("Deleted " + deletedLocks + " locks from HIVE_LOCKS due to timeout"); + LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs. " + + extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime); } LOG.debug("Going to commit"); dbConn.commit(); @@ -1993,6 +2029,7 @@ private void timeOutLocks(Connection dbConn, long now) { catch(Exception ex) { LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex); } finally { + close(rs); closeStmt(stmt); } } @@ -2265,7 +2302,8 @@ private static boolean isRetryable(Exception ex) { //in MSSQL this means Communication Link Failure return true; } - if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState())) { + if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) || + sqlException.getMessage().contains("consistent read failure; rollback data not available")) { return true; } //see also https://issues.apache.org/jira/browse/HIVE-9938 diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 7d58622..7fa57d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -52,6 +52,7 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private long MAX_SLEEP; + //longer term we should always have a txn id and then we won't need to track locks here at all private Set locks; private IMetaStoreClient client; private long nextSleep = 50; @@ -115,7 +116,27 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List 0) { + boolean logMsg = false; + for(DbHiveLock l : locks) { + if(l.txnId != hl.txnId) { + //locks from different transactions detected (or from transaction and read-only query in autocommit) + logMsg = true; + break; + } + else if(l.txnId == 0) { + if(!l.queryId.equals(hl.queryId)) { + //here means no open transaction, but different queries + logMsg = true; + break; + } + } + } + if(logMsg) { + LOG.warn("adding new DbHiveLock(" + hl + ") while we are already tracking locks: " + locks); + } + } locks.add(hl); if (res.getState() != LockState.ACQUIRED) { if(res.getState() == LockState.WAITING) { @@ -191,10 +212,12 @@ LockState checkLock(long extLockId) throws LockException { @Override public void unlock(HiveLock hiveLock) throws LockException { long lockId = ((DbHiveLock)hiveLock).lockId; + boolean removed = false; try { LOG.debug("Unlocking " + hiveLock); client.unlock(lockId); - boolean removed = locks.remove(hiveLock); + //important to remove after unlock() in case it fails + removed = locks.remove(hiveLock); Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { @@ -205,6 +228,9 @@ public void unlock(HiveLock hiveLock) throws LockException { } LOG.debug("Removed a lock " + removed); } catch (NoSuchLockException e) { + //if metastore has no record of this lock, it most likely timed out; either way + //there is no point tracking it here any longer + removed = locks.remove(hiveLock); LOG.error("Metastore could find no record of lock " + JavaUtils.lockIdToString(lockId)); throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); } catch (TxnOpenException e) { @@ -214,10 +240,16 @@ public void unlock(HiveLock hiveLock) throws LockException { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } + finally { + if(removed) { + LOG.debug("Removed a lock " + hiveLock); + } + } } @Override public void releaseLocks(List hiveLocks) { + LOG.info("releaseLocks: " + hiveLocks); for (HiveLock lock : hiveLocks) { try { unlock(lock); @@ -225,6 +257,8 @@ public void releaseLocks(List hiveLocks) { // Not sure why this method doesn't throw any exceptions, // but since the interface doesn't allow it we'll just swallow them and // move on. + //This OK-ish since releaseLocks() is only called for RO/AC queries; it + //would be really bad to eat exceptions here for write operations } } } @@ -271,10 +305,17 @@ public void refresh() { static class DbHiveLock extends HiveLock { long lockId; + String queryId; + long txnId; DbHiveLock(long id) { lockId = id; } + DbHiveLock(long id, String queryId, long txnId) { + lockId = id; + this.queryId = queryId; + this.txnId = txnId; + } @Override public HiveLockObject getHiveLockObject() { @@ -301,7 +342,7 @@ public int hashCode() { } @Override public String toString() { - return JavaUtils.lockIdToString(lockId); + return JavaUtils.lockIdToString(lockId) + " queryId=" + queryId + " " + JavaUtils.txnIdToString(txnId); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index b7d1d18..f82b85a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -198,9 +199,8 @@ private void runReaper() throws Exception { } @Test public void testExceptions() throws Exception { - WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); + addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.acquireLocks(qp, ctx, "PeterI"); txnMgr.openTxn("NicholasII"); runReaper(); LockException exception = null; @@ -240,6 +240,32 @@ public void testExceptions() throws Exception { } @Test + public void testLockTimeout() throws Exception { + addPartitionInput(newTable(true)); + QueryPlan qp = new MockQueryPlan(this); + //make sure it works with nothing to expire + expireLocks(txnMgr, 0); + //create a few read locks, all on the same resource + for(int i = 0; i < 5; i++) { + txnMgr.acquireLocks(qp, ctx, "PeterI" + i); + } + expireLocks(txnMgr, 5); + //create a lot of locks + for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { + txnMgr.acquireLocks(qp, ctx, "PeterI" + i); + } + expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); + } + private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception { + DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager(); + ShowLocksResponse resp = lockManager.getLocks(); + Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size()); + runReaper(); + resp = lockManager.getLocks(); + Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size()); + } + + @Test public void testReadWrite() throws Exception { Table t = newTable(true); addPartitionInput(t); @@ -359,6 +385,7 @@ public void concurrencyFalse() throws Exception { public void setUp() throws Exception { TxnDbUtil.prepDb(); txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr.getLockManager();//init lock manager Assert.assertTrue(txnMgr instanceof DbTxnManager); nextInput = 1; readEntities = new HashSet(); @@ -376,15 +403,13 @@ public void tearDown() throws Exception { } private static class MockQueryPlan extends QueryPlan { - private final HashSet inputs; - private final HashSet outputs; + private final HashSet inputs = new HashSet<>(); + private final HashSet outputs = new HashSet<>(); private final String queryId; MockQueryPlan(TestDbTxnManager test) { - HashSet r = test.readEntities; - HashSet w = test.writeEntities; - inputs = (r == null) ? new HashSet() : r; - outputs = (w == null) ? new HashSet() : w; + inputs.addAll(test.readEntities); + outputs.addAll(test.writeEntities); queryId = makeQueryId(); }