Index: metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (revision 1603195) +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (working copy) @@ -120,12 +120,13 @@ // database we'll look at the current transaction number first. If it // subsequently shows up in the open list that's ok. Connection dbConn = getDbConn(); + Statement stmt = null; + ResultSet rs = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); LOG.debug("Going to execute query "); - ResultSet rs = - stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); if (!rs.next()) { throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); @@ -203,7 +206,6 @@ while (rs.next()) { openList.add(rs.getLong(1)); } - stmt.close(); LOG.debug("Going to rollback"); dbConn.rollback(); return new GetOpenTxnsResponse(hwm, openList); @@ -216,6 +218,8 @@ throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { + closeRs(rs); + closeStmt(stmt); closeDbConn(dbConn); } } @@ -235,17 +239,18 @@ int numTxns = rqst.getNum_txns(); try { Connection dbConn = getDbConn(); + Statement stmt = null; + ResultSet rs = null; try { // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); LOG.debug("Going to execute query "); - ResultSet rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID " + - "for update"); + rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID for update"); if (!rs.next()) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -1179,6 +1239,9 @@ } catch (NoSuchLockException e) { // This should never happen, as we just added the lock id throw new MetaException("Couldn't find a lock we just created!"); + } finally { + closeRs(rs); + closeStmt(stmt); } } } @@ -1191,42 +1254,21 @@ LockResponse response = new LockResponse(); response.setLockid(extLockId); - LOG.debug("Setting savepoint"); - Savepoint save = dbConn.setSavepoint(); - Statement stmt = dbConn.createStatement(); - StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); - - Set strings = new HashSet(locksBeingChecked.size()); - for (LockInfo info : locksBeingChecked) { - strings.add(info.db); - } - boolean first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); - } - query.append(")"); - - // If any of the table requests are null, then I need to pull all the - // table locks for this db. - boolean sawNull = false; - strings.clear(); - for (LockInfo info : locksBeingChecked) { - if (info.table == null) { - sawNull = true; - break; - } else { - strings.add(info.table); + Statement stmt = null; + ResultSet rs = null; + try { + LOG.debug("Setting savepoint"); + Savepoint save = dbConn.setSavepoint(); + stmt = dbConn.createStatement(); + StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type from HIVE_LOCKS where hl_db in ("); + + Set strings = new HashSet(locksBeingChecked.size()); + for (LockInfo info : locksBeingChecked) { + strings.add(info.db); } - } - if (!sawNull) { - query.append(" and (hl_table is null or hl_table in("); - first = true; + boolean first = true; for (String s : strings) { if (first) first = false; else query.append(", "); @@ -1234,22 +1276,22 @@ query.append(s); query.append('\''); } - query.append("))"); - - // If any of the partition requests are null, then I need to pull all - // partition locks for this table. - sawNull = false; + query.append(")"); + + // If any of the table requests are null, then I need to pull all the + // table locks for this db. + boolean sawNull = false; strings.clear(); for (LockInfo info : locksBeingChecked) { - if (info.partition == null) { + if (info.table == null) { sawNull = true; break; } else { - strings.add(info.partition); + strings.add(info.table); } } if (!sawNull) { - query.append(" and (hl_partition is null or hl_partition in("); + query.append(" and (hl_table is null or hl_table in("); first = true; for (String s : strings) { if (first) first = false; @@ -1259,103 +1301,131 @@ query.append('\''); } query.append("))"); - } - } - query.append(" for update"); - - LOG.debug("Going to execute query <" + query.toString() + ">"); - ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); - while (rs.next()) { - lockSet.add(new LockInfo(rs)); - } - // Turn the tree set into an array so we can move back and forth easily - // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); - - for (LockInfo info : locksBeingChecked) { - // Find the lock record we're checking - int index = -1; - for (int i = 0; i < locks.length; i++) { - if (locks[i].equals(info)) { - index = i; - break; + + // If any of the partition requests are null, then I need to pull all + // partition locks for this table. + sawNull = false; + strings.clear(); + for (LockInfo info : locksBeingChecked) { + if (info.partition == null) { + sawNull = true; + break; + } else { + strings.add(info.partition); + } } - } - - // If we didn't find the lock, then it must not be in the table - if (index == -1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("How did we get here, we heartbeated our lock before we started!"); + if (!sawNull) { + query.append(" and (hl_partition is null or hl_partition in("); + first = true; + for (String s : strings) { + if (first) first = false; + else query.append(", "); + query.append('\''); + query.append(s); + query.append('\''); + } + query.append("))"); + } } - - - // If we've found it and it's already been marked acquired, - // then just look at the other locks. - if (locks[index].state == LockState.ACQUIRED) { - continue; + query.append(" for update"); + + LOG.debug("Going to execute query <" + query.toString() + ">"); + rs = stmt.executeQuery(query.toString()); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); + while (rs.next()) { + lockSet.add(new LockInfo(rs)); } - - // Look at everything in front of this lock to see if it should block - // it or not. - boolean acquired = false; - for (int i = index - 1; i >= 0; i--) { - // Check if we're operating on the same database, if not, move on - if (!locks[index].db.equals(locks[i].db)) { - continue; + // Turn the tree set into an array so we can move back and forth easily + // in it. + LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + + for (LockInfo info : locksBeingChecked) { + // Find the lock record we're checking + int index = -1; + for (int i = 0; i < locks.length; i++) { + if (locks[i].equals(info)) { + index = i; + break; + } } - - // If table is null on either of these, then they are claiming to - // lock the whole database and we need to check it. Otherwise, - // check if they are operating on the same table, if not, move on. - if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { - continue; + + // If we didn't find the lock, then it must not be in the table + if (index == -1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("How did we get here, we heartbeated our lock before we started!"); } - - // If partition is null on either of these, then they are claiming to - // lock the whole table and we need to check it. Otherwise, - // check if they are operating on the same partition, if not, move on. - if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { + + + // If we've found it and it's already been marked acquired, + // then just look at the other locks. + if (locks[index].state == LockState.ACQUIRED) { continue; } - - // We've found something that matches what we're trying to lock, - // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { - case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); - acquired = true; - break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; - case KEEP_LOOKING: + + // Look at everything in front of this lock to see if it should block + // it or not. + boolean acquired = false; + for (int i = index - 1; i >= 0; i--) { + // Check if we're operating on the same database, if not, move on + if (!locks[index].db.equals(locks[i].db)) { continue; + } + + // If table is null on either of these, then they are claiming to + // lock the whole database and we need to check it. Otherwise, + // check if they are operating on the same table, if not, move on. + if (locks[index].table != null && locks[i].table != null + && !locks[index].table.equals(locks[i].table)) { + continue; + } + + // If partition is null on either of these, then they are claiming to + // lock the whole table and we need to check it. Otherwise, + // check if they are operating on the same partition, if not, move on. + if (locks[index].partition != null && locks[i].partition != null + && !locks[index].partition.equals(locks[i].partition)) { + continue; + } + + // We've found something that matches what we're trying to lock, + // so figure out if we can lock it too. + switch (jumpTable.get(locks[index].type).get(locks[i].type).get + (locks[i].state)) { + case ACQUIRE: + acquire(dbConn, stmt, extLockId, info.intLockId); + acquired = true; + break; + case WAIT: + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + return response; + case KEEP_LOOKING: + continue; + } + if (acquired) break; // We've acquired this lock component, + // so get out of the loop and look at the next component. } - if (acquired) break; // We've acquired this lock component, - // so get out of the loop and look at the next component. + + // If we've arrived here and we have not already acquired, it means there's nothing in the + // way of the lock, so acquire the lock. + if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId); } - - // If we've arrived here and we have not already acquired, it means there's nothing in the - // way of the lock, so acquire the lock. - if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId); + + // We acquired all of the locks, so commit and return acquired. + LOG.debug("Going to commit"); + dbConn.commit(); + response.setState(LockState.ACQUIRED); + } finally { + closeRs(rs); + closeStmt(stmt); } - - // We acquired all of the locks, so commit and return acquired. - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.ACQUIRED); return response; } @@ -1394,20 +1464,26 @@ throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; - Statement stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); - - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchLockException("No such lock: " + extLockId); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + + long now = getDbTime(dbConn); + + String s = "update HIVE_LOCKS set hl_last_heartbeat = " + + now + " where hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchLockException("No such lock: " + extLockId); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); } - LOG.debug("Going to commit"); - dbConn.commit(); } // Heartbeats on the txn table. This commits, so do not enter it with any state @@ -1415,104 +1491,135 @@ throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; - Statement stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + - txnid + " for update"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); - } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + long now = getDbTime(dbConn); + // We need to check whether this transaction is valid and open + String s = "select txn_state from TXNS where txn_id = " + + txnid + " for update"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchTxnException("No such transaction: " + txnid); + } + if (rs.getString(1).charAt(0) == TXN_ABORTED) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new TxnAbortedException("Transaction " + txnid + + " already aborted"); + } + s = "update TXNS set txn_last_heartbeat = " + now + + " where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); } // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { - Statement stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = dbConn.createStatement(); + String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); + } + long txnid = rs.getLong(1); + LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); + return (rs.wasNull() ? -1 : txnid); + } finally { + closeRs(rs); + closeStmt(stmt); } - long txnid = rs.getLong(1); - LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); - return (rs.wasNull() ? -1 : txnid); } // NEVER call this function without first calling heartbeat(long, long) private List getLockInfoFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { - Statement stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List ourLockInfo = new ArrayList(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; - } - if (!sawAtLeastOne) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = dbConn.createStatement(); + String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + boolean sawAtLeastOne = false; + List ourLockInfo = new ArrayList(); + while (rs.next()) { + ourLockInfo.add(new LockInfo(rs)); + sawAtLeastOne = true; + } + if (!sawAtLeastOne) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); + } + return ourLockInfo; + } finally { + closeRs(rs); + closeStmt(stmt); } - return ourLockInfo; } // Clean time out locks from the database. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. private void timeOutLocks(Connection dbConn) throws SQLException, MetaException { - long now = getDbTime(dbConn); - Statement stmt = dbConn.createStatement(); - // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); + Statement stmt = null; + try { + long now = getDbTime(dbConn); + stmt = dbConn.createStatement(); + // Remove any timed out locks from the table. + String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + + (now - timeout); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); + } } // Abort timed out transactions. This calls abortTxn(), which does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions on the underlying database. - private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { - long now = getDbTime(dbConn); - Statement stmt = dbConn.createStatement(); - // Abort any timed out locks from the table. - String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - List deadTxns = new ArrayList(); - // Limit the number of timed out transactions we do in one pass to keep from generating a - // huge delete statement - for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { + Statement stmt = null; + ResultSet rs = null; + try { + long now = getDbTime(dbConn); + stmt = dbConn.createStatement(); + // Abort any timed out locks from the table. + String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + + "' and txn_last_heartbeat < " + (now - timeout); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + List deadTxns = new ArrayList(); + // Limit the number of timed out transactions we do in one pass to keep from generating a + // huge delete statement + for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); + // We don't care whether all of the transactions get deleted or not, + // if some didn't it most likely means someone else deleted them in the interum + if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + } finally { + closeRs(rs); + closeStmt(stmt); + } } private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {