diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 567e2e6..c70faca 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -65,13 +65,13 @@ static final protected char TXN_OPEN = 'o'; // Lock states - static final private char LOCK_ACQUIRED = 'a'; - static final private char LOCK_WAITING = 'w'; + static final protected char LOCK_ACQUIRED = 'a'; + static final protected char LOCK_WAITING = 'w'; // Lock types - static final private char LOCK_EXCLUSIVE = 'e'; - static final private char LOCK_SHARED = 'r'; - static final private char LOCK_SEMI_SHARED = 'w'; + static final protected char LOCK_EXCLUSIVE = 'e'; + static final protected char LOCK_SHARED = 'r'; + static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 5; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); @@ -904,9 +904,14 @@ void close(ResultSet rs, Statement stmt, Connection dbConn) { * detected and retry count has not been exceeded. */ protected void detectDeadlock(SQLException e, String caller) throws DeadlockException { - final String mysqlDeadlock = - "Deadlock found when trying to get lock; try restarting transaction"; - if (e.getMessage().contains(mysqlDeadlock) || e instanceof SQLTransactionRollbackException) { + + // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() + // to test these changes. + // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01. + // Oracle uses 72000 + // Derby uses the new SQLTransactionRollbackException + if (e.getSQLState().equals("40001") || e.getSQLState().equals("40P01") || + e.getSQLState().equals("72000") || e instanceof SQLTransactionRollbackException) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { LOG.warn("Deadlock detected in " + caller + ", trying again."); throw new DeadlockException(); diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 787952e..c02792f 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -20,13 +20,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.api.*; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -1082,6 +1087,124 @@ public void showLocks() throws Exception { for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]); } + private static class DeadlockCreator extends TxnHandler { + + public DeadlockCreator(HiveConf conf) { + super(conf); + } + + boolean sawDeadlock() throws Exception { + Connection conn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = conn.createStatement(); + long now = getDbTime(conn); + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + + "'scooby.com')"); + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + LOCK_WAITING + "', '" + LOCK_EXCLUSIVE + "', " + now + ", 'fred', 'scooby.com')"); + conn.commit(); + + final MetaStoreThread.BooleanPointer sawDeadlock = new MetaStoreThread.BooleanPointer(); + + final Connection conn1 = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + final Connection conn2 = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + + try { + + for (int i = 0; i < 5; i++) { + Thread t1 = new Thread() { + @Override + public void run() { + try { + updateTxns(conn1); + updateLocks(conn1); + try { + Thread.sleep(1000); + } catch (InterruptedException whatever) { + } + conn1.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + detectDeadlock(e, "thread t1"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (DeadlockException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); + sawDeadlock.boolVal = true; + } + } + } + }; + + Thread t2 = new Thread() { + @Override + public void run() { + try { + updateLocks(conn2); + updateTxns(conn2); + try { + Thread.sleep(1000); + } catch (InterruptedException whatever) { + } + conn2.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + detectDeadlock(e, "thread t2"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (DeadlockException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); + sawDeadlock.boolVal = true; + } + } + } + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + if (sawDeadlock.boolVal) return true; + } + return false; + } finally { + conn1.rollback(); + conn1.close(); + conn2.rollback(); + conn2.close(); + } + } + + private void updateTxns(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); + } + + private void updateLocks(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); + } + } + + // Marking this test as ignore since it's highly time dependent and will be flaky in general. + // But leaving it here because it's very useful when you want to test how the code is handling + // deadlocks. + @Test + @Ignore + public void deadlockIsDetected() throws Exception { + DeadlockCreator deadlock = new DeadlockCreator(conf); + assertTrue(deadlock.sawDeadlock()); + + } + @Before public void setUp() throws Exception { TxnDbUtil.prepDb();