diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index c4a9581..acfc07a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -127,7 +127,7 @@ public void setRunAs(long cq_id, String user) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "setRunAs"); + detectDeadlock(dbConn, e, "setRunAs"); } finally { closeDbConn(dbConn); closeStmt(stmt); @@ -192,7 +192,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "findNextToCompact"); + detectDeadlock(dbConn, e, "findNextToCompact"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -234,7 +234,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "markCompacted"); + detectDeadlock(dbConn, e, "markCompacted"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -377,7 +377,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "markCleaned"); + detectDeadlock(dbConn, e, "markCleaned"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -429,7 +429,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "cleanEmptyAbortedTxns"); + detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -475,7 +475,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "revokeFromLocalWorkers"); + detectDeadlock(dbConn, e, "revokeFromLocalWorkers"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -522,7 +522,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "revokeTimedoutWorkers"); + detectDeadlock(dbConn, e, "revokeTimedoutWorkers"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 567e2e6..50f58d0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -65,13 +65,13 @@ static final protected char TXN_OPEN = 'o'; // Lock states - static final private char LOCK_ACQUIRED = 'a'; - static final private char LOCK_WAITING = 'w'; + static final protected char LOCK_ACQUIRED = 'a'; + static final protected char LOCK_WAITING = 'w'; // Lock types - static final private char LOCK_EXCLUSIVE = 'e'; - static final private char LOCK_SHARED = 'r'; - static final private char LOCK_SEMI_SHARED = 'w'; + static final protected char LOCK_EXCLUSIVE = 'e'; + static final protected char LOCK_SHARED = 'r'; + static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 5; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); @@ -301,7 +301,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "openTxns"); + detectDeadlock(dbConn, e, "openTxns"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -336,7 +336,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "abortTxn"); + detectDeadlock(dbConn, e, "abortTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -393,7 +393,7 @@ public void commitTxn(CommitTxnRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "commitTxn"); + detectDeadlock(dbConn, e, "commitTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -419,7 +419,7 @@ public LockResponse lock(LockRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "lock"); + detectDeadlock(dbConn, e, "lock"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -444,7 +444,7 @@ public LockResponse lockNoWait(LockRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "lockNoWait"); + detectDeadlock(dbConn, e, "lockNoWait"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -479,7 +479,7 @@ public LockResponse checkLock(CheckLockRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "checkLock"); + detectDeadlock(dbConn, e, "checkLock"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -534,7 +534,7 @@ public void unlock(UnlockRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "unlock"); + detectDeadlock(dbConn, e, "unlock"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -613,7 +613,7 @@ public void heartbeat(HeartbeatRequest ids) dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "heartbeat"); + detectDeadlock(dbConn, e, "heartbeat"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -652,7 +652,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "heartbeatTxnRange"); + detectDeadlock(dbConn, e, "heartbeatTxnRange"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -735,7 +735,7 @@ public void compact(CompactionRequest rqst) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - detectDeadlock(e, "compact"); + detectDeadlock(dbConn, e, "compact"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -898,15 +898,30 @@ void close(ResultSet rs, Statement stmt, Connection dbConn) { * Determine if an exception was a deadlock. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each * different database. + * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock * detected and retry count has not been exceeded. */ - protected void detectDeadlock(SQLException e, String caller) throws DeadlockException { - final String mysqlDeadlock = - "Deadlock found when trying to get lock; try restarting transaction"; - if (e.getMessage().contains(mysqlDeadlock) || e instanceof SQLTransactionRollbackException) { + protected void detectDeadlock(Connection conn, + SQLException e, + String caller) throws DeadlockException, MetaException { + + // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() + // to test these changes. + // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01. + // Oracle seems to return different SQLStates each time, but the message always contains + // "deadlock detected", so I've used that instead. + // Derby and newer MySQL driver use the new SQLTransactionRollbackException + if (dbProduct == null) { + determineDatabaseProduct(conn); + } + if (e instanceof SQLTransactionRollbackException || + ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || + dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || + (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || + (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")))) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { LOG.warn("Deadlock detected in " + caller + ", trying again."); throw new DeadlockException(); diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 787952e..446b174 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -20,13 +20,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.*; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -1082,6 +1087,115 @@ public void showLocks() throws Exception { for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]); } + @Test + @Ignore + public void deadlockDetected() throws Exception { + Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = conn.createStatement(); + long now = txnHandler.getDbTime(conn); + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + + "'scooby.com')"); + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + + "'scooby.com')"); + conn.commit(); + txnHandler.closeDbConn(conn); + + final MetaStoreThread.BooleanPointer sawDeadlock = new MetaStoreThread.BooleanPointer(); + + final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + try { + + for (int i = 0; i < 5; i++) { + Thread t1 = new Thread() { + @Override + public void run() { + try { + try { + updateTxns(conn1); + updateLocks(conn1); + Thread.sleep(1000); + conn1.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + txnHandler.detectDeadlock(conn1, e, "thread t1"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.DeadlockException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.boolVal = true; + } + } + conn1.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + Thread t2 = new Thread() { + @Override + public void run() { + try { + try { + updateLocks(conn2); + updateTxns(conn2); + Thread.sleep(1000); + conn2.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + txnHandler.detectDeadlock(conn2, e, "thread t2"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.DeadlockException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.boolVal = true; + } + } + conn2.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + if (sawDeadlock.boolVal) break; + } + assertTrue(sawDeadlock.boolVal); + } finally { + conn1.rollback(); + txnHandler.closeDbConn(conn1); + conn2.rollback(); + txnHandler.closeDbConn(conn2); + } + } + + private void updateTxns(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); + } + + private void updateLocks(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); + } + @Before public void setUp() throws Exception { TxnDbUtil.prepDb();