diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index be399db..838015a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.util.StringUtils; import java.sql.*; import java.util.*; @@ -34,6 +35,9 @@ static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); + // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS + // See TxnHandler for notes on how to deal with deadlocks. Follow those notes. + public CompactionTxnHandler(HiveConf conf) { super(conf); } @@ -64,6 +68,7 @@ public CompactionTxnHandler(HiveConf conf) { response.add(info); } + // Check for aborted txns s = "select tc_database, tc_table, tc_partition " + "from TXNS, TXN_COMPONENTS " + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + @@ -98,27 +103,34 @@ public CompactionTxnHandler(HiveConf conf) { * @param user user to run the jobs as */ public void setRunAs(long cq_id, String user) throws MetaException { - Connection dbConn = getDbConn(); - try { - Statement stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue, " + e.getMessage()); + try { + Connection dbConn = getDbConn(); try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } + Statement stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to update compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to update compaction queue, " + e.getMessage()); + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "setRunAs"); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + setRunAs(cq_id, user); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -129,53 +141,61 @@ public void setRunAs(long cq_id, String user) throws MetaException { * @return an info element for this compaction request, or null if there is no work to do now. */ public CompactionInfo findNextToCompact(String workerId) throws MetaException { - Connection dbConn = getDbConn(); - CompactionInfo info = new CompactionInfo(); - try { - Statement stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "' for update"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("No compactions found ready to compact"); - dbConn.rollback(); - return null; - } - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } + Connection dbConn = getDbConn(); + CompactionInfo info = new CompactionInfo(); - // Now, update this record as being worked on by this worker. - long now = System.currentTimeMillis(); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - return info; - } catch (SQLException e) { - LOG.error("Unable to select next element for compaction, " + e.getMessage()); try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "' for update"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("No compactions found ready to compact"); + dbConn.rollback(); + return null; + } + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; + case MINOR_TYPE: info.type = CompactionType.MINOR; break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + + // Now, update this record as being worked on by this worker. + long now = System.currentTimeMillis(); + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to update compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + return info; + } catch (SQLException e) { + LOG.error("Unable to select next element for compaction, " + e.getMessage()); + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "findNextToCompact"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } - return null; + } catch (DeadlockException e) { + return findNextToCompact(workerId); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -185,28 +205,37 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { * @param info info on the compaciton entry to mark as compacted. */ public void markCompacted(CompactionInfo info) throws MetaException { - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { + Connection dbConn = getDbConn(); try { - LOG.error("Unable to update compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + + "cq_worker_id = null where cq_id = " + info.id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to update compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.error("Unable to update compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "markCompacted"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + markCompacted(info); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -222,8 +251,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { Statement stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'" + - " for update"; + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -250,7 +278,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { dbConn.rollback(); } catch (SQLException e1) { } - return null; + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } @@ -262,81 +291,90 @@ public void markCompacted(CompactionInfo info) throws MetaException { * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to delete compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } + Connection dbConn = getDbConn(); + try { + Statement stmt = dbConn.createStatement(); + String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to delete compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } - // Remove entries from completed_txn_components as well, so we don't start looking there - // again. - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; - if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; - } - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { - LOG.error("Expected to remove at least one row from completed_txn_components when " + - "marking compaction entry as clean!"); - } + // Remove entries from completed_txn_components as well, so we don't start looking there + // again. + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + + "ctc_table = '" + info.tableName + "'"; + if (info.partName != null) { + s += " and ctc_partition = '" + info.partName + "'"; + } + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) < 1) { + LOG.error("Expected to remove at least one row from completed_txn_components when " + + "marking compaction entry as clean!"); + } - s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; - LOG.debug("Going to execute update <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - Set txnids = new HashSet(); - while (rs.next()) txnids.add(rs.getLong(1)); - if (txnids.size() > 0) { + s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + + info.tableName + "'"; + if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + LOG.debug("Going to execute update <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + Set txnids = new HashSet(); + while (rs.next()) txnids.add(rs.getLong(1)); + if (txnids.size() > 0) { - // Remove entries from txn_components, as there may be aborted txn components - StringBuffer buf = new StringBuffer(); - buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); - boolean first = true; - for (long id : txnids) { - if (first) first = false; - else buf.append(", "); - buf.append(id); - } + // Remove entries from txn_components, as there may be aborted txn components + StringBuffer buf = new StringBuffer(); + buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); + boolean first = true; + for (long id : txnids) { + if (first) first = false; + else buf.append(", "); + buf.append(id); + } - buf.append(") and tc_database = '"); - buf.append(info.dbname); - buf.append("' and tc_table = '"); - buf.append(info.tableName); - buf.append("'"); - if (info.partName != null) { - buf.append(" and tc_partition = '"); - buf.append(info.partName); + buf.append(") and tc_database = '"); + buf.append(info.dbname); + buf.append("' and tc_table = '"); + buf.append(info.tableName); buf.append("'"); - } - LOG.debug("Going to execute update <" + buf.toString() + ">"); - int rc = stmt.executeUpdate(buf.toString()); - LOG.debug("Removed " + rc + " records from txn_components"); + if (info.partName != null) { + buf.append(" and tc_partition = '"); + buf.append(info.partName); + buf.append("'"); + } + LOG.debug("Going to execute update <" + buf.toString() + ">"); + int rc = stmt.executeUpdate(buf.toString()); + LOG.debug("Removed " + rc + " records from txn_components"); - // Don't bother cleaning from the txns table. A separate call will do that. We don't - // know here which txns still have components from other tables or partitions in the - // table, so we don't know which ones we can and cannot clean. - } + // Don't bother cleaning from the txns table. A separate call will do that. We don't + // know here which txns still have components from other tables or partitions in the + // table, so we don't know which ones we can and cannot clean. + } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - try { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "markCleaned"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + markCleaned(info); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -344,40 +382,49 @@ public void markCleaned(CompactionInfo info) throws MetaException { * Clean up aborted transactions from txns that have no components in txn_components. */ public void cleanEmptyAbortedTxns() throws MetaException { - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - Set txnids = new HashSet(); - while (rs.next()) txnids.add(rs.getLong(1)); - if (txnids.size() > 0) { - StringBuffer buf = new StringBuffer("delete from TXNS where txn_id in ("); - boolean first = true; - for (long tid : txnids) { - if (first) first = false; - else buf.append(", "); - buf.append(tid); - } - buf.append(")"); - LOG.debug("Going to execute update <" + buf.toString() + ">"); - int rc = stmt.executeUpdate(buf.toString()); - LOG.debug("Removed " + rc + " records from txns"); - LOG.debug("Going to commit"); - dbConn.commit(); - } - } catch (SQLException e) { - LOG.error("Unable to delete from txns table " + e.getMessage()); - LOG.debug("Going to rollback"); + Connection dbConn = getDbConn(); try { - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + String s = "select txn_id from TXNS where " + + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + + "txn_state = '" + TXN_ABORTED + "'"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + Set txnids = new HashSet(); + while (rs.next()) txnids.add(rs.getLong(1)); + if (txnids.size() > 0) { + StringBuffer buf = new StringBuffer("delete from TXNS where txn_id in ("); + boolean first = true; + for (long tid : txnids) { + if (first) first = false; + else buf.append(", "); + buf.append(tid); + } + buf.append(")"); + LOG.debug("Going to execute update <" + buf.toString() + ">"); + int rc = stmt.executeUpdate(buf.toString()); + LOG.debug("Removed " + rc + " records from txns"); + LOG.debug("Going to commit"); + dbConn.commit(); + } + } catch (SQLException e) { + LOG.error("Unable to delete from txns table " + e.getMessage()); + LOG.debug("Going to rollback"); + try { + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "cleanEmptyAbortedTxns"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + cleanEmptyAbortedTxns(); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -391,28 +438,37 @@ public void cleanEmptyAbortedTxns() throws MetaException { * so that like hostname% will match the worker id. */ public void revokeFromLocalWorkers(String hostname) throws MetaException { - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; - LOG.debug("Going to execute update <" + s + ">"); - // It isn't an error if the following returns no rows, as the local workers could have died - // with nothing assigned to them. - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { + Connection dbConn = getDbConn(); try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + + hostname + "%'"; + LOG.debug("Going to execute update <" + s + ">"); + // It isn't an error if the following returns no rows, as the local workers could have died + // with nothing assigned to them. + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "revokeFromLocalWorkers"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + revokeFromLocalWorkers(hostname); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -426,29 +482,38 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { * declared dead. */ public void revokeTimedoutWorkers(long timeout) throws MetaException { - Connection dbConn = getDbConn(); - long latestValidStart = System.currentTimeMillis() - timeout; try { - Statement stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; - LOG.debug("Going to execute update <" + s + ">"); - // It isn't an error if the following returns no rows, as the local workers could have died - // with nothing assigned to them. - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { + Connection dbConn = getDbConn(); + long latestValidStart = System.currentTimeMillis() - timeout; try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + + latestValidStart; + LOG.debug("Going to execute update <" + s + ">"); + // It isn't an error if the following returns no rows, as the local workers could have died + // with nothing assigned to them. + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "revokeTimedoutWorkers"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + revokeTimedoutWorkers(timeout); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 27561d6..2cc1157 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -62,14 +62,37 @@ static final private char LOCK_SHARED = 'r'; static final private char LOCK_SEMI_SHARED = 'w'; - static final private String CLASS_NAME = TxnHandler.class.getName(); - static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private int ALLOWED_REPEATED_DEADLOCKS = 5; + static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private BoneCP connPool; + private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock + // method + + /** + * Number of consecutive deadlocks we have seen + */ + protected int deadlockCnt; + protected HiveConf conf; // Transaction timeout, in milliseconds. private long timeout; - protected HiveConf conf; + + // DEADLOCK DETECTION AND HANDLING + // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock + // between simultaneous accesses. ALWAYS access TXN_COMPONENTS before HIVE_LOCKS . + // + // Private methods should never catch SQLException and then throw MetaException. The public + // methods depend on SQLException coming back so they can detect and handle deadlocks. Private + // methods should only throw MetaException when they explicitly know there's a logic error and + // they want to throw past the public methods. + // + // All public methods that write to the database have to check for deadlocks when a SQLException + // comes back and handle it if they see one. This has to be done with the connection pooling + // in mind. To do this they should call detectDeadlock AFTER rolling back the db transaction, + // and then in an outer loop they should catch DeadlockException. In the catch for this they + // should increment the deadlock counter and recall themselves. See commitTxn for an example. + // the connection has been closed and returned to the pool. public TxnHandler(HiveConf conf) { this.conf = conf; @@ -78,7 +101,7 @@ public TxnHandler(HiveConf conf) { // Set up the JDBC connection pool try { - setupJdbcConnectionPool(); + setupJdbcConnectionPool(conf); } catch (SQLException e) { String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); LOG.error(msg); @@ -86,6 +109,7 @@ public TxnHandler(HiveConf conf) { } timeout = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 1000; + deadlockCnt = 0; buildJumpTable(); } @@ -149,7 +173,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { } } - public GetOpenTxnsResponse getOpenTxns() throws MetaException{ + public GetOpenTxnsResponse getOpenTxns() throws MetaException { // We need to figure out the current transaction number and the list of // open transactions. To avoid needing a transaction on the underlying // database we'll look at the current transaction number first. If it @@ -207,208 +231,261 @@ public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns) { public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); - Connection dbConn = getDbConn(); try { - // Make sure the user has not requested an insane amount of txns. - int maxTxns = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); - if (numTxns > maxTxns) numTxns = maxTxns; + Connection dbConn = getDbConn(); + try { + // Make sure the user has not requested an insane amount of txns. + int maxTxns = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) numTxns = maxTxns; - Statement stmt = dbConn.createStatement(); - LOG.debug("Going to execute query "); + ResultSet rs = + stmt.executeQuery("select ntxn_next from NEXT_TXN_ID for update"); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + long first = rs.getLong(1); + String s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + long now = System.currentTimeMillis(); + s = "insert into TXNS (txn_id, txn_state, txn_started, " + + "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + LOG.debug("Going to prepare statement <" + s + ">"); + PreparedStatement ps = dbConn.prepareStatement(s); + List txnIds = new ArrayList(numTxns); + for (long i = first; i < first + numTxns; i++) { + ps.setLong(1, i); + ps.executeUpdate(); + txnIds.add(i); + } - LOG.debug("Going to commit"); - dbConn.commit(); - return new OpenTxnsResponse(txnIds); - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + LOG.debug("Going to commit"); + dbConn.commit(); + return new OpenTxnsResponse(txnIds); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "openTxns"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); + } catch (DeadlockException e) { + return openTxns(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException { long txnid = rqst.getTxnid(); - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - long now = System.currentTimeMillis(); - String s = "update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - int updateCnt = stmt.executeUpdate(s); - if (updateCnt != 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); - } - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { + Connection dbConn = getDbConn(); try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + + // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS + String s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + s = "update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + int updateCnt = stmt.executeUpdate(s); + if (updateCnt != 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchTxnException("No such transaction: " + txnid); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "abortTxn"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); + } catch (DeadlockException e) { + abortTxn(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); - // Make sure the transaction has not already been aborted. This costs - // us an extra select against the database, but it is necessary to - // avoid the situation where the transaction has timed out but the - // client isn't aware of it and tries to commit it. The client needs - // to be informed that the commit failed and it should abort on its - // side. - // - // The select has to be done as a select for update to avoid a race - // condition where this client is committing it while some other - // client is declaring it timed out and aborting it. - String s = "select txn_state from TXNS where txn_id = " + - txnid + " for update"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); - } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); - } - - // Move the record from txn_components into completed_txn_components so that the compactor - // knows where to look to compact. - s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; - LOG.debug("Going to execute insert <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { - LOG.error("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn!"); - } - - s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - s = "delete from TXNS where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { + Connection dbConn = getDbConn(); try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + Statement stmt = dbConn.createStatement(); + // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to + // commit it, but it does two things. One, it makes sure the transaction is still valid. + // Two, it avoids the race condition where we time out between now and when we actually + // commit the transaction below. And it does this all in a dead-lock safe way by + // committing the heartbeat back to the database. + heartbeatTxn(dbConn, txnid); + + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute insert <" + s + ">"); + if (stmt.executeUpdate(s) < 1) { + LOG.warn("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn!"); + } + + // Always access TXN_COMPONENTS before HIVE_LOCKS; + s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + // Always access HIVE_LOCKS before TXNS + s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "commitTxn"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); + } catch (DeadlockException e) { + commitTxn(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { - Connection dbConn = getDbConn(); try { - return lock(dbConn, rqst, true); + Connection dbConn = getDbConn(); + try { + return lock(dbConn, rqst, true); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "lock"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + return lock(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public LockResponse lockNoWait(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { - Connection dbConn = getDbConn(); try { - return lock(dbConn, rqst, false); + Connection dbConn = getDbConn(); + try { + return lock(dbConn, rqst, false); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "lockNoWait"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + return lockNoWait(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { - Connection dbConn = getDbConn(); try { - long extLockId = rqst.getLockid(); - // Clean up timed out locks - timeOutLocks(dbConn); - - // Heartbeat on the lockid first, to assure that our lock is still valid. - // Then look up the lock info (hopefully in the cache). If these locks - // are associated with a transaction then heartbeat on that as well. - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - if (txnid > 0) heartbeatTxn(dbConn, txnid); - return checkLock(dbConn, extLockId, txnid, true); + Connection dbConn = getDbConn(); + try { + long extLockId = rqst.getLockid(); + // Clean up timed out locks + timeOutLocks(dbConn); + + // Heartbeat on the lockid first, to assure that our lock is still valid. + // Then look up the lock info (hopefully in the cache). If these locks + // are associated with a transaction then heartbeat on that as well. + heartbeatLock(dbConn, extLockId); + long txnid = getTxnIdFromLockId(dbConn, extLockId); + if (txnid > 0) heartbeatTxn(dbConn, txnid); + return checkLock(dbConn, extLockId, txnid, true); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "checkLock"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + return checkLock(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } + } public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { - Connection dbConn = getDbConn(); try { - // Odd as it seems, we need to heartbeat first because this touches the - // lock table and assures that our locks our still valid. If they are - // not, this will throw an exception and the heartbeat will fail. - long extLockId = rqst.getLockid(); - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - // If there is a valid txnid, throw an exception, - // as locks associated with transactions should be unlocked only when the - // transaction is committed or aborted. - if (txnid > 0) { - try { + Connection dbConn = getDbConn(); + try { + // Odd as it seems, we need to heartbeat first because this touches the + // lock table and assures that our locks our still valid. If they are + // not, this will throw an exception and the heartbeat will fail. + long extLockId = rqst.getLockid(); + heartbeatLock(dbConn, extLockId); + long txnid = getTxnIdFromLockId(dbConn, extLockId); + // If there is a valid txnid, throw an exception, + // as locks associated with transactions should be unlocked only when the + // transaction is committed or aborted. + if (txnid > 0) { LOG.debug("Going to rollback"); dbConn.rollback(); String msg = "Unlocking locks associated with transaction" + @@ -416,11 +493,7 @@ public void unlock(UnlockRequest rqst) "transaction " + txnid; LOG.error(msg); throw new TxnOpenException(msg); - } catch (SQLException e1) { - throw new MetaException("Unable to rollback " + StringUtils.stringifyException(e1)); } - } - try { Statement stmt = dbConn.createStatement(); String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); @@ -438,11 +511,16 @@ public void unlock(UnlockRequest rqst) dbConn.rollback(); } catch (SQLException e1) { } + detectDeadlock(e, "unlock"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } + } catch (DeadlockException e) { + unlock(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -498,93 +576,111 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { - Connection dbConn = getDbConn(); try { - heartbeatLock(dbConn, ids.getLockid()); - heartbeatTxn(dbConn, ids.getTxnid()); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Failed to commit: " + e.getMessage()); + Connection dbConn = getDbConn(); + try { + heartbeatLock(dbConn, ids.getLockid()); + heartbeatTxn(dbConn, ids.getTxnid()); + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "heartbeat"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + heartbeat(ids); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } public void compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. - Connection dbConn = getDbConn(); try { - Statement stmt = dbConn.createStatement(); + Connection dbConn = getDbConn(); + try { + Statement stmt = dbConn.createStatement(); - // Get the id for the next entry in the queue - String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID for update"; - LOG.debug("going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); - } - long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - StringBuffer buf = new StringBuffer("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); - String partName = rqst.getPartitionname(); - if (partName != null) buf.append("cq_partition, "); - buf.append("cq_state, cq_type"); - if (rqst.getRunas() != null) buf.append(", cq_run_as"); - buf.append(") values ("); - buf.append(id); - buf.append(", '"); - buf.append(rqst.getDbname()); - buf.append("', '"); - buf.append(rqst.getTablename()); - buf.append("', '"); - if (partName != null) { - buf.append(partName); + // Get the id for the next entry in the queue + String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID for update"; + LOG.debug("going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly initiated, " + + "no record found in next_compaction_queue_id"); + } + long id = rs.getLong(1); + s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + + "cq_table, "); + String partName = rqst.getPartitionname(); + if (partName != null) buf.append("cq_partition, "); + buf.append("cq_state, cq_type"); + if (rqst.getRunas() != null) buf.append(", cq_run_as"); + buf.append(") values ("); + buf.append(id); + buf.append(", '"); + buf.append(rqst.getDbname()); buf.append("', '"); - } - buf.append(INITIATED_STATE); - buf.append("', '"); - switch (rqst.getType()) { - case MAJOR: - buf.append(MAJOR_TYPE); - break; + buf.append(rqst.getTablename()); + buf.append("', '"); + if (partName != null) { + buf.append(partName); + buf.append("', '"); + } + buf.append(INITIATED_STATE); + buf.append("', '"); + switch (rqst.getType()) { + case MAJOR: + buf.append(MAJOR_TYPE); + break; - case MINOR: - buf.append(MINOR_TYPE); - break; + case MINOR: + buf.append(MINOR_TYPE); + break; - default: + default: + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); + } + if (rqst.getRunas() != null) { + buf.append("', '"); + buf.append(rqst.getRunas()); + } + buf.append("')"); + s = buf.toString(); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + try { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); - } - if (rqst.getRunas() != null) { - buf.append("', '"); - buf.append(rqst.getRunas()); - } - buf.append("')"); - s = buf.toString(); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { + } catch (SQLException e1) { + } + detectDeadlock(e, "compact"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); } - throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + } catch (DeadlockException e) { + compact(rqst); } finally { - closeDbConn(dbConn); + deadlockCnt = 0; } } @@ -663,6 +759,10 @@ long setTimeout(long milliseconds) { return previous_timeout; } + protected class DeadlockException extends Exception { + + } + protected Connection getDbConn() throws MetaException { try { Connection dbConn = connPool.getConnection(); @@ -683,6 +783,29 @@ protected void closeDbConn(Connection dbConn) { } } + /** + * Determine if an exception was a deadlock. Unfortunately there is no standard way to do + * this, so we have to inspect the error messages and catch the telltale signs for each + * different database. + * @param e exception that was thrown. + * @param caller name of the method calling this + * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock + * detected and retry count has not been exceeded. + */ + protected void detectDeadlock(SQLException e, String caller) throws DeadlockException { + final String mysqlDeadlock = + "Deadlock found when trying to get lock; try restarting transaction"; + if (e.getMessage().contains(mysqlDeadlock) || e instanceof SQLTransactionRollbackException) { + if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { + LOG.warn("Deadlock detected in " + caller + ", trying again."); + throw new DeadlockException(); + } else { + LOG.error("Too many repeated deadlocks in " + caller + ", giving up."); + deadlockCnt = 0; + } + } + } + private static class LockInfo { long extLockId; long intLockId; @@ -773,7 +896,6 @@ public int compare(LockInfo info1, LockInfo info2) { private static Map>> jumpTable; private void checkQFileTestHack() { - LOG.debug("In txnHandler conf object is " + conf.toString()); boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); @@ -808,107 +930,103 @@ private void checkQFileTestHack() { * @throws TxnAbortedException */ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) - throws NoSuchTxnException, TxnAbortedException, MetaException { - // Clean up timed out locks before we attempt to acquire any. - timeOutLocks(dbConn); - try { - Statement stmt = dbConn.createStatement(); - long txnid = rqst.getTxnid(); - if (txnid > 0) { - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + - txnid + " for update"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + // We want to minimize the number of concurrent lock requests being issued. If we do not we + // get a large number of deadlocks in the database, since this method has to both clean + // timedout locks and insert new locks. This synchronization barrier will not eliminiate all + // deadlocks, and the code is still resilient in the face of a database deadlock. But it + // will reduce the number. This could have been done via a lock table command in the + // underlying database, but was not for two reasons. One, different databases have different + // syntax for lock table, making it harder to use. Two, that would lock the HIVE_LOCKS table + // and prevent other operations (such as committing transactions, showing locks, + // etc.) that should not interfere with this one. + synchronized (lockLock) { + // Clean up timed out locks before we attempt to acquire any. + timeOutLocks(dbConn); + + try { + Statement stmt = dbConn.createStatement(); + + // Get the next lock id. We have to do this as select for update so no + // one else reads it and updates it under us. + LOG.debug("Going to execute query "); - ResultSet rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID " + - "for update"); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - String s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - long intLockId = 0; - for (LockComponent lc : rqst.getComponent()) { - intLockId++; - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: lockChar = LOCK_EXCLUSIVE; break; - case SHARED_READ: lockChar = LOCK_SHARED; break; - case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; + LockResponse rsp = checkLock(dbConn, extLockId, txnid, wait); + if (!wait && rsp.getState() != LockState.ACQUIRED) { + LOG.debug("Lock not acquired, going to rollback"); + dbConn.rollback(); + rsp = new LockResponse(); + rsp.setState(LockState.NOT_ACQUIRED); } - long now = System.currentTimeMillis(); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - LockResponse rsp = checkLock(dbConn, extLockId, txnid, wait); - if (!wait && rsp.getState() != LockState.ACQUIRED) { - LOG.debug("Lock not acquired, going to rollback"); - dbConn.rollback(); - rsp = new LockResponse(); - rsp.setState(LockState.NOT_ACQUIRED); + return rsp; + } catch (NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created!"); } - return rsp; - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " - + StringUtils.stringifyException(e)); - } catch (NoSuchLockException e) { - // This should never happen, as we just added the lock id - throw new MetaException("Couldn't find a lock we just created!"); } } @@ -916,25 +1034,48 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnid, boolean alwaysCommit) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); LockResponse response = new LockResponse(); response.setLockid(extLockId); long now = System.currentTimeMillis(); - try { - LOG.debug("Setting savepoint"); - Savepoint save = dbConn.setSavepoint(); - Statement stmt = dbConn.createStatement(); - StringBuffer query = new StringBuffer("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); - - Set strings = new HashSet(locksBeingChecked.size()); - for (LockInfo info : locksBeingChecked) { - strings.add(info.db); + LOG.debug("Setting savepoint"); + Savepoint save = dbConn.setSavepoint(); + Statement stmt = dbConn.createStatement(); + StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type from HIVE_LOCKS where hl_db in ("); + + Set strings = new HashSet(locksBeingChecked.size()); + for (LockInfo info : locksBeingChecked) { + strings.add(info.db); + } + boolean first = true; + for (String s : strings) { + if (first) first = false; + else query.append(", "); + query.append('\''); + query.append(s); + query.append('\''); + } + query.append(")"); + + // If any of the table requests are null, then I need to pull all the + // table locks for this db. + boolean sawNull = false; + strings.clear(); + for (LockInfo info : locksBeingChecked) { + if (info.table == null) { + sawNull = true; + break; + } else { + strings.add(info.table); } - boolean first = true; + } + if (!sawNull) { + query.append(" and (hl_table is null or hl_table in("); + first = true; for (String s : strings) { if (first) first = false; else query.append(", "); @@ -942,22 +1083,22 @@ private LockResponse checkLock(Connection dbConn, query.append(s); query.append('\''); } - query.append(")"); + query.append("))"); - // If any of the table requests are null, then I need to pull all the - // table locks for this db. - boolean sawNull = false; + // If any of the partition requests are null, then I need to pull all + // partition locks for this table. + sawNull = false; strings.clear(); for (LockInfo info : locksBeingChecked) { - if (info.table == null) { + if (info.partition == null) { sawNull = true; break; } else { - strings.add(info.table); + strings.add(info.partition); } } if (!sawNull) { - query.append(" and (hl_table is null or hl_table in("); + query.append(" and (hl_partition is null or hl_partition in("); first = true; for (String s : strings) { if (first) first = false; @@ -967,134 +1108,100 @@ private LockResponse checkLock(Connection dbConn, query.append('\''); } query.append("))"); + } + } + query.append(" for update"); - // If any of the partition requests are null, then I need to pull all - // partition locks for this table. - sawNull = false; - strings.clear(); - for (LockInfo info : locksBeingChecked) { - if (info.partition == null) { - sawNull = true; - break; - } else { - strings.add(info.partition); - } - } - if (!sawNull) { - query.append(" and (hl_partition is null or hl_partition in("); - first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); - } - query.append("))"); + LOG.debug("Going to execute query <" + query.toString() + ">"); + ResultSet rs = stmt.executeQuery(query.toString()); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); + while (rs.next()) { + lockSet.add(new LockInfo(rs)); + } + // Turn the tree set into an array so we can move back and forth easily + // in it. + LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + + for (LockInfo info : locksBeingChecked) { + // Find the lock record we're checking + int index = -1; + for (int i = 0; i < locks.length; i++) { + if (locks[i].equals(info)) { + index = i; + break; } } - query.append(" for update"); - LOG.debug("Going to execute query <" + query.toString() + ">"); - ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); - while (rs.next()) { - lockSet.add(new LockInfo(rs)); + // If we didn't find the lock, then it must not be in the table + if (index == -1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("How did we get here, we heartbeated our lock before we started!"); } - // Turn the tree set into an array so we can move back and forth easily - // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); - for (LockInfo info : locksBeingChecked) { - // Find the lock record we're checking - int index = -1; - for (int i = 0; i < locks.length; i++) { - if (locks[i].equals(info)) { - index = i; - break; - } - } - - // If we didn't find the lock, then it must not be in the table - if (index == -1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("How did we get here, " + - "we heartbeated our lock before we started!"); - } + // If we've found it and it's already been marked acquired, + // then just look at the other locks. + if (locks[index].state == LockState.ACQUIRED) { + continue; + } - // If we've found it and it's already been marked acquired, - // then just look at the other locks. - if (locks[index].state == LockState.ACQUIRED) { + // Look at everything in front of this lock to see if it should block + // it or not. + for (int i = index - 1; i >= 0; i--) { + // Check if we're operating on the same database, if not, move on + if (!locks[index].db.equals(locks[i].db)) { continue; } - // Look at everything in front of this lock to see if it should block - // it or not. - for (int i = index - 1; i >= 0; i--) { - // Check if we're operating on the same database, if not, move on - if (!locks[index].db.equals(locks[i].db)) { - continue; - } + // If table is null on either of these, then they are claiming to + // lock the whole database and we need to check it. Otherwise, + // check if they are operating on the same table, if not, move on. + if (locks[index].table != null && locks[i].table != null + && !locks[index].table.equals(locks[i].table)) { + continue; + } - // If table is null on either of these, then they are claiming to - // lock the whole database and we need to check it. Otherwise, - // check if they are operating on the same table, if not, move on. - if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { - continue; - } + // If partition is null on either of these, then they are claiming to + // lock the whole table and we need to check it. Otherwise, + // check if they are operating on the same partition, if not, move on. + if (locks[index].partition != null && locks[i].partition != null + && !locks[index].partition.equals(locks[i].partition)) { + continue; + } - // If partition is null on either of these, then they are claiming to - // lock the whole table and we need to check it. Otherwise, - // check if they are operating on the same partition, if not, move on. - if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { + // We've found something that matches what we're trying to lock, + // so figure out if we can lock it too. + switch (jumpTable.get(locks[index].type).get(locks[i].type).get + (locks[i].state)) { + case ACQUIRE: + acquire(dbConn, stmt, extLockId, info.intLockId); + break; + case WAIT: + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + return response; + case KEEP_LOOKING: continue; - } - - // We've found something that matches what we're trying to lock, - // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { - case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); - break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; - case KEEP_LOOKING: - continue; - } } - - // If we've arrived here it means there's nothing in the way of the - // lock, so acquire the lock. - acquire(dbConn, stmt, extLockId, info.intLockId); } - // We acquired all of the locks, so commit and return acquired. - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.ACQUIRED); - return response; - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " - + StringUtils.stringifyException(e)); + // If we've arrived here it means there's nothing in the way of the + // lock, so acquire the lock. + acquire(dbConn, stmt, extLockId, info.intLockId); } + + // We acquired all of the locks, so commit and return acquired. + LOG.debug("Going to commit"); + dbConn.commit(); + response.setState(LockState.ACQUIRED); + return response; } private void wait(Connection dbConn, Savepoint save) throws SQLException { @@ -1127,152 +1234,114 @@ private void acquire(Connection dbConn, Statement stmt, long extLockId, long int // acquire all. } - // Heartbeats on the lock table. This does not call commit as it assumes - // someone downstream will. However, it does lock rows in the lock table. + // Heartbeats on the lock table. This commits, so do not enter it with any state private void heartbeatLock(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException { + throws NoSuchLockException, SQLException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; - try { - Statement stmt = dbConn.createStatement(); - long now = System.currentTimeMillis(); + Statement stmt = dbConn.createStatement(); + long now = System.currentTimeMillis(); - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchLockException("No such lock: " + extLockId); - } - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database" + - StringUtils.stringifyException(e)); + String s = "update HIVE_LOCKS set hl_last_heartbeat = " + + now + " where hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchLockException("No such lock: " + extLockId); } + LOG.debug("Going to commit"); + dbConn.commit(); } - // Heartbeats on the txn table. This does not call commit as it assumes - // someone downstream will. However, it does lock rows in the txn table. + // Heartbeats on the txn table. This commits, so do not enter it with any state private void heartbeatTxn(Connection dbConn, long txnid) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, SQLException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; - try { - Statement stmt = dbConn.createStatement(); - long now = System.currentTimeMillis(); - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + - txnid + " for update"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); - } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); - } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " - + StringUtils.stringifyException(e)); + Statement stmt = dbConn.createStatement(); + long now = System.currentTimeMillis(); + // We need to check whether this transaction is valid and open + String s = "select txn_state from TXNS where txn_id = " + + txnid + " for update"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchTxnException("No such transaction: " + txnid); + } + if (rs.getString(1).charAt(0) == TXN_ABORTED) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new TxnAbortedException("Transaction " + txnid + + " already aborted"); } + s = "update TXNS set txn_last_heartbeat = " + now + + " where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); } // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException { - try { - Statement stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); - } - long txnid = rs.getLong(1); - LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); - return (rs.wasNull() ? -1 : txnid); - } catch (SQLException e) { - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + throws NoSuchLockException, MetaException, SQLException { + Statement stmt = dbConn.createStatement(); + String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); } + long txnid = rs.getLong(1); + LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); + return (rs.wasNull() ? -1 : txnid); } // NEVER call this function without first calling heartbeat(long, long) private List getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException { - try { - Statement stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List ourLockInfo = new ArrayList(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; - } - if (!sawAtLeastOne) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); - } - return ourLockInfo; - } catch (SQLException e) { - throw new MetaException("Unable to connect to transaction database " - + StringUtils.stringifyException(e)); + throws NoSuchLockException, MetaException, SQLException { + Statement stmt = dbConn.createStatement(); + String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + boolean sawAtLeastOne = false; + List ourLockInfo = new ArrayList(); + while (rs.next()) { + ourLockInfo.add(new LockInfo(rs)); + sawAtLeastOne = true; + } + if (!sawAtLeastOne) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); } + return ourLockInfo; } // Clean time out locks from the database. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. - private void timeOutLocks(Connection dbConn) throws MetaException { - try { - long now = System.currentTimeMillis(); - Statement stmt = dbConn.createStatement(); - // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " - + StringUtils.stringifyException(e)); - } + private void timeOutLocks(Connection dbConn) throws SQLException { + long now = System.currentTimeMillis(); + Statement stmt = dbConn.createStatement(); + // Remove any timed out locks from the table. + String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + + (now - timeout); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + return; } - private synchronized void setupJdbcConnectionPool() throws SQLException { + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); @@ -1288,7 +1357,7 @@ private synchronized void setupJdbcConnectionPool() throws SQLException { connPool = new BoneCP(config); } - private synchronized void buildJumpTable() { + private static synchronized void buildJumpTable() { if (jumpTable != null) return; jumpTable =