Index: metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (revision 1604310) +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (working copy) @@ -54,8 +54,9 @@ public Set findPotentialCompactions(int maxAborted) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Set response = new HashSet(); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); // Check for completed transactions String s = "select distinct ctc_database, ctc_table, " + "ctc_partition from COMPLETED_TXN_COMPONENTS"; @@ -93,6 +94,7 @@ LOG.error("Unable to connect to transaction database " + e.getMessage()); } finally { closeDbConn(dbConn); + closeStmt(stmt); } return response; } @@ -106,8 +108,9 @@ public void setRunAs(long cq_id, String user) throws MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { @@ -127,6 +130,7 @@ detectDeadlock(e, "setRunAs"); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { setRunAs(cq_id, user); @@ -146,8 +150,9 @@ Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); CompactionInfo info = new CompactionInfo(); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); @@ -192,6 +197,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { return findNextToCompact(workerId); @@ -208,8 +214,9 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); @@ -232,6 +239,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { markCompacted(info); @@ -249,8 +257,9 @@ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); List rc = new ArrayList(); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); @@ -283,6 +292,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } @@ -294,8 +304,9 @@ public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { @@ -371,6 +382,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { markCleaned(info); @@ -385,8 +397,9 @@ public void cleanEmptyAbortedTxns() throws MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + "txn_state = '" + TXN_ABORTED + "'"; @@ -421,6 +434,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { cleanEmptyAbortedTxns(); @@ -441,8 +455,9 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + hostname + "%'"; @@ -465,6 +480,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { revokeFromLocalWorkers(hostname); @@ -486,8 +502,9 @@ try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); long latestValidStart = getDbTime(dbConn) - timeout; + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + latestValidStart; @@ -510,6 +527,7 @@ StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + closeStmt(stmt); } } catch (DeadlockException e) { revokeTimedoutWorkers(timeout); Index: metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (revision 1604310) +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (working copy) @@ -120,8 +120,9 @@ // database we'll look at the current transaction number first. If it // subsequently shows up in the open list that's ok. Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -157,7 +158,6 @@ } txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); } - stmt.close(); LOG.debug("Going to rollback"); dbConn.rollback(); return new GetOpenTxnsInfoResponse(hwm, txnInfo); @@ -170,6 +170,7 @@ throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } @@ -180,9 +181,10 @@ // database we'll look at the current transaction number first. If it // subsequently shows up in the open list that's ok. Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = null; try { timeOutTxns(dbConn); - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -203,7 +205,6 @@ while (rs.next()) { openList.add(rs.getLong(1)); } - stmt.close(); LOG.debug("Going to rollback"); dbConn.rollback(); return new GetOpenTxnsResponse(hwm, openList); @@ -216,6 +217,7 @@ throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } @@ -235,13 +237,14 @@ int numTxns = rqst.getNum_txns(); try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select ntxn_next from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -279,6 +282,7 @@ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (DeadlockException e) { @@ -327,8 +331,9 @@ long txnid = rqst.getTxnid(); try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to // commit it, but it does two things. One, it makes sure the transaction is still valid. // Two, it avoids the race condition where we time out between now and when we actually @@ -369,6 +374,7 @@ throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (DeadlockException e) { @@ -468,6 +474,7 @@ throws NoSuchLockException, TxnOpenException, MetaException { try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { // Odd as it seems, we need to heartbeat first because this touches the // lock table and assures that our locks our still valid. If they are @@ -487,7 +494,7 @@ LOG.error(msg); throw new TxnOpenException(msg); } - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -508,6 +515,7 @@ throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (DeadlockException e) { @@ -521,8 +529,9 @@ Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); ShowLocksResponse rsp = new ShowLocksResponse(); List elems = new ArrayList(); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; @@ -561,6 +570,7 @@ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } rsp.setLocks(elems); @@ -634,8 +644,9 @@ // Put a compaction request in the queue. try { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); // Get the id for the next entry in the queue String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID"; @@ -705,6 +716,7 @@ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } } catch (DeadlockException e) { @@ -717,8 +729,9 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(); Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + "cq_start, cq_run_as from COMPACTION_QUEUE"; LOG.debug("Going to execute query <" + s + ">"); @@ -755,6 +768,7 @@ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(stmt); closeDbConn(dbConn); } return response; @@ -765,8 +779,9 @@ */ int numLocksInLockTable() throws SQLException, MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); String s = "select count(*) from HIVE_LOCKS"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -777,6 +792,7 @@ return rc; } finally { closeDbConn(dbConn); + closeStmt(stmt); } } @@ -819,6 +835,18 @@ LOG.warn("Failed to close db connection " + e.getMessage()); } } + + /** + * Close statement instance. + * @param stmt statement instance. + */ + protected void closeStmt(Statement stmt) { + try { + if (stmt != null) stmt.close(); + } catch (SQLException e) { + LOG.warn("Failed to close statement " + e.getMessage()); + } + } /** * Determine if an exception was a deadlock. Unfortunately there is no standard way to do @@ -850,10 +878,10 @@ * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined */ protected long getDbTime(Connection conn) throws MetaException { + Statement stmt = null; try { - Statement stmt = conn.createStatement(); + stmt = conn.createStatement(); String s; - ResultSet rs; DatabaseProduct prod = determineDatabaseProduct(conn); switch (prod) { case DERBY: @@ -876,13 +904,15 @@ throw new MetaException(msg); } LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + ResultSet rs = stmt.executeQuery(s); if (!rs.next()) throw new MetaException("No results from date query"); return rs.getTimestamp(1).getTime(); } catch (SQLException e) { String msg = "Unable to determine current time: " + e.getMessage(); LOG.error(msg); throw new MetaException(msg); + } finally { + closeStmt(stmt); } } @@ -1042,33 +1072,39 @@ * @throws SQLException */ private int abortTxns(Connection dbConn, List txnids) throws SQLException { - Statement stmt = dbConn.createStatement(); - - // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS - StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); - boolean first = true; - for (Long id : txnids) { - if (first) first = false; - else buf.append(','); - buf.append(id); + Statement stmt = null; + int updateCnt = 0; + try { + stmt = dbConn.createStatement(); + + // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS + StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); + boolean first = true; + for (Long id : txnids) { + if (first) first = false; + else buf.append(','); + buf.append(id); + } + buf.append(')'); + LOG.debug("Going to execute update <" + buf.toString() + ">"); + stmt.executeUpdate(buf.toString()); + + buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); + first = true; + for (Long id : txnids) { + if (first) first = false; + else buf.append(','); + buf.append(id); + } + buf.append(')'); + LOG.debug("Going to execute update <" + buf.toString() + ">"); + updateCnt = stmt.executeUpdate(buf.toString()); + + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); } - buf.append(')'); - LOG.debug("Going to execute update <" + buf.toString() + ">"); - stmt.executeUpdate(buf.toString()); - - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); - first = true; - for (Long id : txnids) { - if (first) first = false; - else buf.append(','); - buf.append(id); - } - buf.append(')'); - LOG.debug("Going to execute update <" + buf.toString() + ">"); - int updateCnt = stmt.executeUpdate(buf.toString()); - - LOG.debug("Going to commit"); - dbConn.commit(); return updateCnt; } @@ -1102,9 +1138,9 @@ synchronized (lockLock) { // Clean up timed out locks before we attempt to acquire any. timeOutLocks(dbConn); - + Statement stmt = null; try { - Statement stmt = dbConn.createStatement(); + stmt = dbConn.createStatement(); // Get the next lock id. String s = "select nl_next from NEXT_LOCK_ID"; @@ -1183,6 +1219,8 @@ } catch (NoSuchLockException e) { // This should never happen, as we just added the lock id throw new MetaException("Couldn't find a lock we just created!"); + } finally { + closeStmt(stmt); } } } @@ -1197,7 +1235,6 @@ LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); - Statement stmt = dbConn.createStatement(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type from HIVE_LOCKS where hl_db in ("); @@ -1267,98 +1304,104 @@ } LOG.debug("Going to execute query <" + query.toString() + ">"); - ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); - while (rs.next()) { - lockSet.add(new LockInfo(rs)); - } - // Turn the tree set into an array so we can move back and forth easily - // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + ResultSet rs = stmt.executeQuery(query.toString()); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); + while (rs.next()) { + lockSet.add(new LockInfo(rs)); + } + // Turn the tree set into an array so we can move back and forth easily + // in it. + LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); - for (LockInfo info : locksBeingChecked) { - // Find the lock record we're checking - int index = -1; - for (int i = 0; i < locks.length; i++) { - if (locks[i].equals(info)) { - index = i; - break; + for (LockInfo info : locksBeingChecked) { + // Find the lock record we're checking + int index = -1; + for (int i = 0; i < locks.length; i++) { + if (locks[i].equals(info)) { + index = i; + break; + } } - } - // If we didn't find the lock, then it must not be in the table - if (index == -1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("How did we get here, we heartbeated our lock before we started!"); - } + // If we didn't find the lock, then it must not be in the table + if (index == -1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("How did we get here, we heartbeated our lock before we started!"); + } - // If we've found it and it's already been marked acquired, - // then just look at the other locks. - if (locks[index].state == LockState.ACQUIRED) { - continue; - } - - // Look at everything in front of this lock to see if it should block - // it or not. - boolean acquired = false; - for (int i = index - 1; i >= 0; i--) { - // Check if we're operating on the same database, if not, move on - if (!locks[index].db.equals(locks[i].db)) { + // If we've found it and it's already been marked acquired, + // then just look at the other locks. + if (locks[index].state == LockState.ACQUIRED) { continue; } - // If table is null on either of these, then they are claiming to - // lock the whole database and we need to check it. Otherwise, - // check if they are operating on the same table, if not, move on. - if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { - continue; - } + // Look at everything in front of this lock to see if it should block + // it or not. + boolean acquired = false; + for (int i = index - 1; i >= 0; i--) { + // Check if we're operating on the same database, if not, move on + if (!locks[index].db.equals(locks[i].db)) { + continue; + } - // If partition is null on either of these, then they are claiming to - // lock the whole table and we need to check it. Otherwise, - // check if they are operating on the same partition, if not, move on. - if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { - continue; - } + // If table is null on either of these, then they are claiming to + // lock the whole database and we need to check it. Otherwise, + // check if they are operating on the same table, if not, move on. + if (locks[index].table != null && locks[i].table != null + && !locks[index].table.equals(locks[i].table)) { + continue; + } - // We've found something that matches what we're trying to lock, - // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { - case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); - acquired = true; - break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; - case KEEP_LOOKING: + // If partition is null on either of these, then they are claiming to + // lock the whole table and we need to check it. Otherwise, + // check if they are operating on the same partition, if not, move on. + if (locks[index].partition != null && locks[i].partition != null + && !locks[index].partition.equals(locks[i].partition)) { continue; + } + + // We've found something that matches what we're trying to lock, + // so figure out if we can lock it too. + switch (jumpTable.get(locks[index].type).get(locks[i].type).get + (locks[i].state)) { + case ACQUIRE: + acquire(dbConn, stmt, extLockId, info.intLockId); + acquired = true; + break; + case WAIT: + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + return response; + case KEEP_LOOKING: + continue; + } + if (acquired) break; // We've acquired this lock component, + // so get out of the loop and look at the next component. } - if (acquired) break; // We've acquired this lock component, - // so get out of the loop and look at the next component. + + // If we've arrived here and we have not already acquired, it means there's nothing in the + // way of the lock, so acquire the lock. + if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId); } - // If we've arrived here and we have not already acquired, it means there's nothing in the - // way of the lock, so acquire the lock. - if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId); + // We acquired all of the locks, so commit and return acquired. + LOG.debug("Going to commit"); + dbConn.commit(); + response.setState(LockState.ACQUIRED); + } finally { + closeStmt(stmt); } - - // We acquired all of the locks, so commit and return acquired. - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.ACQUIRED); return response; } @@ -1397,20 +1440,25 @@ throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; - Statement stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchLockException("No such lock: " + extLockId); + String s = "update HIVE_LOCKS set hl_last_heartbeat = " + + now + " where hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchLockException("No such lock: " + extLockId); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); } - LOG.debug("Going to commit"); - dbConn.commit(); } // Heartbeats on the txn table. This commits, so do not enter it with any state @@ -1418,68 +1466,83 @@ throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; - Statement stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchTxnException("No such transaction: " + txnid); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + long now = getDbTime(dbConn); + // We need to check whether this transaction is valid and open + String s = "select txn_state from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchTxnException("No such transaction: " + txnid); + } + if (rs.getString(1).charAt(0) == TXN_ABORTED) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new TxnAbortedException("Transaction " + txnid + + " already aborted"); + } + s = "update TXNS set txn_last_heartbeat = " + now + + " where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); - } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); } // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { - Statement stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); + } + long txnid = rs.getLong(1); + LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); + return (rs.wasNull() ? -1 : txnid); + } finally { + closeStmt(stmt); } - long txnid = rs.getLong(1); - LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); - return (rs.wasNull() ? -1 : txnid); } // NEVER call this function without first calling heartbeat(long, long) private List getLockInfoFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { - Statement stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List ourLockInfo = new ArrayList(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + boolean sawAtLeastOne = false; + List ourLockInfo = new ArrayList(); + while (rs.next()) { + ourLockInfo.add(new LockInfo(rs)); + sawAtLeastOne = true; + } + if (!sawAtLeastOne) { + throw new MetaException("This should never happen! We already " + + "checked the lock existed but now we can't find it!"); + } + return ourLockInfo; + } finally { + closeStmt(stmt); } - if (!sawAtLeastOne) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); - } - return ourLockInfo; } // Clean time out locks from the database. This does a commit, @@ -1487,14 +1550,19 @@ // open transactions. private void timeOutLocks(Connection dbConn) throws SQLException, MetaException { long now = getDbTime(dbConn); - Statement stmt = dbConn.createStatement(); - // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + // Remove any timed out locks from the table. + String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + + (now - timeout); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } finally { + closeStmt(stmt); + } } // Abort timed out transactions. This calls abortTxn(), which does a commit, @@ -1502,19 +1570,24 @@ // open transactions on the underlying database. private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { long now = getDbTime(dbConn); - Statement stmt = dbConn.createStatement(); - // Abort any timed out locks from the table. - String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - List deadTxns = new ArrayList(); - // Limit the number of timed out transactions we do in one pass to keep from generating a - // huge delete statement - for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + Statement stmt = null; + try { + stmt = dbConn.createStatement(); + // Abort any timed out locks from the table. + String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + + "' and txn_last_heartbeat < " + (now - timeout); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + List deadTxns = new ArrayList(); + // Limit the number of timed out transactions we do in one pass to keep from generating a + // huge delete statement + for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); + // We don't care whether all of the transactions get deleted or not, + // if some didn't it most likely means someone else deleted them in the interum + if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + } finally { + closeStmt(stmt); + } } private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {