diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 4b6bc3e1e3..fb5a306ac4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -112,6 +112,8 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List transactionalListeners; @@ -2409,7 +2417,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc success = true; return new ConnectionLockIdPair(dbConn, extLockId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("enqueueLock failed for request: {}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + @@ -2425,6 +2433,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } } catch(RetryException e) { + LOG.debug("Going to retry enqueueLock for request: {}, after catching RetryException with message: {}", + rqst, e.getMessage()); return enqueueLockWithRetry(rqst); } } @@ -2434,7 +2444,7 @@ private long getNextLockIdForUpdate(Connection dbConn, Statement stmt) throws SQ LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { if (!rs.next()) { - LOG.debug("Going to rollback"); + LOG.error("Failure to get next lock ID for update! SELECT query returned empty ResultSet."); dbConn.rollback(); throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_lock_id"); @@ -2603,12 +2613,12 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn pstmt.addBatch(); if (intLockId % maxBatchSize == 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize); + LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } if (intLockId % maxBatchSize != 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize); + LOG.debug("Executing a batch of <" + insertLocksQuery + "> queries. Batch size: " + intLockId % maxBatchSize); pstmt.executeBatch(); } } @@ -2629,7 +2639,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long } return checkLock(dbConn, extLockId, txnId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: {}", extLockId, txnId, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); throw new MetaException("Unable to update transaction database " + @@ -2640,6 +2650,8 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long } } catch(RetryException e) { + LOG.debug("Going to retry checkLock for extLockId={}/txnId={} after catching RetryException with message: {}", + extLockId, txnId, e.getMessage()); return checkLockWithRetry(dbConn, extLockId, txnId); } } @@ -2674,12 +2686,10 @@ public LockResponse checkLock(CheckLockRequest rqst) // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); - } - if (info.txnId > 0) { - heartbeatTxn(dbConn, info.txnId); + LockInfo lockInfo = getLockFromLockId(dbConn, extLockId) + .orElseThrow(() -> new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId))); + if (lockInfo.txnId > 0) { + heartbeatTxn(dbConn, lockInfo.txnId); } else { heartbeatLock(dbConn, extLockId); @@ -2687,9 +2697,9 @@ public LockResponse checkLock(CheckLockRequest rqst) //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId, info.txnId); + return checkLock(dbConn, extLockId, lockInfo.txnId); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + @@ -2699,6 +2709,8 @@ public LockResponse checkLock(CheckLockRequest rqst) unlockInternal(); } } catch (RetryException e) { + LOG.debug("Going to retry checkLock for request={} after catching RetryException with message: {}", + rqst, e.getMessage()); return checkLock(rqst); } @@ -2712,8 +2724,7 @@ public LockResponse checkLock(CheckLockRequest rqst) * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ @RetrySemantics.Idempotent - public void unlock(UnlockRequest rqst) - throws NoSuchLockException, TxnOpenException, MetaException { + public void unlock(UnlockRequest rqst) throws TxnOpenException, MetaException { try { Connection dbConn = null; Statement stmt = null; @@ -2740,10 +2751,10 @@ public void unlock(UnlockRequest rqst) LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { - LOG.debug("Going to rollback"); + LOG.info("Failure to unlock any locks with extLockId={}.", extLockId); dbConn.rollback(); - LockInfo info = getTxnIdFromLockId(dbConn, extLockId); - if(info == null) { + Optional optLockInfo = getLockFromLockId(dbConn, extLockId); + if (!optLockInfo.isPresent()) { //didn't find any lock with extLockId but at ReadCommitted there is a possibility that //it existed when above delete ran but it didn't have the expected state. LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" + @@ -2751,25 +2762,25 @@ public void unlock(UnlockRequest rqst) //bail here to make the operation idempotent return; } - if(info.txnId != 0) { - String msg = "Unlocking locks associated with transaction not permitted. " + info; + LockInfo lockInfo = optLockInfo.get(); + if (isValidTxn(lockInfo.txnId)) { + String msg = "Unlocking locks associated with transaction not permitted. " + lockInfo; //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state // which really means that the caller wants to give up waiting for the lock LOG.error(msg); throw new TxnOpenException(msg); - } - if(info.txnId == 0) { + } else { //we didn't see this lock when running DELETE stmt above but now it showed up //so should "should never happen" happened... - String msg = "Found lock in unexpected state " + info; + String msg = "Found lock in unexpected state " + lockInfo; LOG.error(msg); throw new MetaException(msg); } } - LOG.debug("Going to commit"); + LOG.debug("Successfully unlocked at least 1 lock with extLockId={}", extLockId); dbConn.commit(); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("Unlock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + @@ -4045,8 +4056,9 @@ private void determineDatabaseProduct(Connection conn) { char lockChar = rs.getString("HL_LOCK_TYPE").charAt(0); type = LockTypeUtil.getLockTypeFromEncoding(lockChar) .orElseThrow(() -> new MetaException("Unknown lock type: " + lockChar)); - txnId = rs.getLong("HL_TXNID");//returns 0 if value is NULL + txnId = rs.getLong("HL_TXNID"); //returns 0 if value is NULL } + LockInfo(ShowLocksResponseElement e) { extLockId = e.getLockid(); intLockId = e.getLockIdInternal(); @@ -4223,8 +4235,8 @@ private static boolean isValidTxn(long txnId) { return txnId != 0; } /** - * hl_lock_ext_id by only checking earlier locks. * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller + * hl_lock_ext_id by only checking earlier locks. * * For any given SQL statement all locks required by it are grouped under single extLockId and are * granted all at once or all locks wait. @@ -4253,10 +4265,10 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) */ boolean isPartOfDynamicPartitionInsert = true; try { - List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); //being acquired now + List locksBeingChecked = getLocksFromLockId(dbConn, extLockId); //being acquired now response.setLockid(extLockId); - //This the set of entities that the statement represented by extLockId wants to update + //This is the set of entities that the statement represented by extLockId wants to update List writeSet = new ArrayList<>(); for (LockInfo info : locksBeingChecked) { @@ -4372,23 +4384,24 @@ is performed on that db (e.g. show tables, created table, etc). // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. LockInfo blockedBy = new LockInfo(rs); long intLockId = rs.getLong("REQ_LOCK_INT_ID"); + LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({}})", JavaUtils.lockIdToString(extLockId), + intLockId, JavaUtils.txnIdToString(txnId), blockedBy); - String sqlText = "UPDATE \"HIVE_LOCKS\"" + - " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + - ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + - " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; + String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" + + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; - LOG.debug("Executing sql: " + sqlText); - int updCnt = stmt.executeUpdate(sqlText); + LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">"); + int updCnt = stmt.executeUpdate(updateBlockedByQuery); if (updCnt != 1) { + LOG.error("Failure to update blocked lock (extLockId={}, intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})", + extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId); shouldNeverHappen(txnId, extLockId, intLockId); } - LOG.debug("Going to commit"); dbConn.commit(); - LOG.debug("Lock({} intLockId:{} {}) is blocked by Lock({}})", JavaUtils.lockIdToString(extLockId), - intLockId, JavaUtils.txnIdToString(txnId), blockedBy); response.setState(LockState.WAITING); return response; } @@ -4396,7 +4409,7 @@ is performed on that db (e.g. show tables, created table, etc). acquire(dbConn, stmt, locksBeingChecked); // We acquired all the locks, so commit and return acquired. - LOG.debug("Going to commit"); + LOG.debug("Successfully acquired locks: " + locksBeingChecked); dbConn.commit(); response.setState(LockState.ACQUIRED); } finally { @@ -4407,7 +4420,7 @@ is performed on that db (e.g. show tables, created table, etc). private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { - if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { + if (locksBeingChecked == null || locksBeingChecked.isEmpty()) { return; } long txnId = locksBeingChecked.get(0).txnId; @@ -4421,30 +4434,23 @@ private void acquire(Connection dbConn, Statement stmt, List locksBein LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < locksBeingChecked.size()) { - LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, List locksBeingChecked)"); + LOG.error("Failure to acquire all locks (acquired: {}, total needed: {}).", rc, locksBeingChecked.size()); dbConn.rollback(); /*select all locks for this ext ID and see which ones are missing*/ - StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); - ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId); - while(rs.next()) { - int intLockId = rs.getInt(1); - int idx = 0; - for(; idx < locksBeingChecked.size(); idx++) { - LockInfo expectedLock = locksBeingChecked.get(idx); - if(expectedLock != null && expectedLock.intLockId == intLockId) { - locksBeingChecked.set(idx, null); - break; - } - } - } - for(LockInfo expectedLock : locksBeingChecked) { - if(expectedLock != null) { - sb.append(expectedLock.intLockId).append(","); + String errorMsgTemplate = "No such lock(s): (%s: %s) %s"; + Set notFoundIds = locksBeingChecked.stream() + .map(lockInfo -> Long.toString(lockInfo.intLockId)) + .collect(Collectors.toSet()); + String getIntIdsQuery = "SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + LOG.debug("Going to execute query: <" + getIntIdsQuery + ">"); + try (ResultSet rs = stmt.executeQuery(getIntIdsQuery)) { + while (rs.next()) { + notFoundIds.remove(rs.getString(1)); } } - sb.append(") ").append(JavaUtils.txnIdToString(txnId)); - close(rs); - throw new NoSuchLockException(sb.toString()); + String errorMsg = String.format(errorMsgTemplate, + JavaUtils.lockIdToString(extLockId), String.join(", ", notFoundIds), JavaUtils.txnIdToString(txnId)); + throw new NoSuchLockException(errorMsg); } } @@ -4455,24 +4461,21 @@ private void acquire(Connection dbConn, Statement stmt, List locksBein private void heartbeatLock(Connection dbConn, long extLockId) throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat - if (extLockId == 0) return; - Statement stmt = null; - try { - stmt = dbConn.createStatement(); - - String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + + if (extLockId == 0) { + return; + } + try (Statement stmt = dbConn.createStatement()) { + String updateHeartbeatQuery = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); + LOG.debug("Going to execute update <" + updateHeartbeatQuery + ">"); + int rc = stmt.executeUpdate(updateHeartbeatQuery); if (rc < 1) { - LOG.debug("Going to rollback"); + LOG.error("Failure to update last heartbeat for extLockId={}.", extLockId); dbConn.rollback(); throw new NoSuchLockException("No such lock: " + JavaUtils.lockIdToString(extLockId)); } - LOG.debug("Going to commit"); + LOG.debug("Successfully heartbeated for extLockId={}", extLockId); dbConn.commit(); - } finally { - closeStmt(stmt); } } @@ -4480,24 +4483,22 @@ private void heartbeatLock(Connection dbConn, long extLockId) private void heartbeatTxn(Connection dbConn, long txnid) throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat - if (txnid == 0) return; - Statement stmt = null; - try { - stmt = dbConn.createStatement(); + if (txnid == 0) { + return; + } + try (Statement stmt = dbConn.createStatement()) { String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { ensureValidTxn(dbConn, txnid, stmt); // This should now throw some useful exception. - LOG.warn("Can neither heartbeat txn nor confirm it as invalid."); + LOG.error("Can neither heartbeat txn (txnId={}) nor confirm it as invalid.", txnid); dbConn.rollback(); throw new NoSuchTxnException("No such txn: " + txnid); } - LOG.debug("Going to commit"); + LOG.debug("Successfully heartbeated for txnId={}", txnid); dbConn.commit(); - } finally { - closeStmt(stmt); } } @@ -4679,53 +4680,38 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt } } - private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { - Statement stmt = null; - ResultSet rs = null; - try { - stmt = dbConn.createStatement(); - String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + - "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + - "\"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - return null; + private Optional getLockFromLockId(Connection dbConn, long extLockId) throws MetaException, SQLException { + try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) { + pstmt.setLong(1, extLockId); + LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + "> for extLockId={}", extLockId); + try (ResultSet rs = pstmt.executeQuery()) { + if (!rs.next()) { + return Optional.empty(); + } + LockInfo info = new LockInfo(rs); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); + return Optional.of(info); } - LockInfo info = new LockInfo(rs); - LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); - return info; - } finally { - close(rs); - closeStmt(stmt); } } // NEVER call this function without first calling heartbeat(long, long) - private List getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { - Statement stmt = null; - try { - stmt = dbConn.createStatement(); - String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + - "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + - "\"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List ourLockInfo = new ArrayList<>(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; + private List getLocksFromLockId(Connection dbConn, long extLockId) throws MetaException, SQLException { + try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_LOCKS_FOR_LOCK_ID_QUERY)) { + List locks = new ArrayList<>(); + pstmt.setLong(1, extLockId); + LOG.debug("Going to execute query <" + SELECT_LOCKS_FOR_LOCK_ID_QUERY + "> for extLockId={}", extLockId); + try (ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + locks.add(new LockInfo(rs)); + } } - if (!sawAtLeastOne) { + if (locks.isEmpty()) { throw new MetaException("This should never happen! We already " + "checked the lock(" + JavaUtils.lockIdToString(extLockId) + ") existed but now we can't find it!"); } - return ourLockInfo; - } finally { - closeStmt(stmt); + LOG.debug("Found {} locks for extLockId={}. Locks: {}", locks.size(), extLockId, locks); + return locks; } } @@ -4734,28 +4720,25 @@ private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) // and thus should be done before any calls to heartbeat that will leave // open transactions. private void timeOutLocks(Connection dbConn) { - Statement stmt = null; - ResultSet rs = null; - try { - stmt = dbConn.createStatement(); - //doing a SELECT first is less efficient but makes it easier to debug things - String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + - TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" = 0"; - //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() - //want to avoid expiring locks for a txn w/o expiring the txn itself - List extLockIDs = new ArrayList<>(); - rs = stmt.executeQuery(s); - while(rs.next()) { - extLockIDs.add(rs.getLong(1)); + Set timedOutLockIds = new TreeSet<>(); + //doing a SELECT first is less efficient but makes it easier to debug things + //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() + //want to avoid expiring locks for a txn w/o expiring the txn itself + try (PreparedStatement pstmt = dbConn.prepareStatement( + String.format(SELECT_TIMED_OUT_LOCKS_QUERY, TxnDbUtil.getEpochFn(dbProduct)))) { + pstmt.setLong(1, timeout); + LOG.debug("Going to execute query: <" + SELECT_TIMED_OUT_LOCKS_QUERY + ">"); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + timedOutLockIds.add(rs.getLong(1)); } - rs.close(); dbConn.commit(); - if(extLockIDs.size() <= 0) { + if (timedOutLockIds.isEmpty()) { + LOG.debug("Did not find any timed-out locks, therefore retuning."); return; } List queries = new ArrayList<>(); - StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -4764,29 +4747,25 @@ private void timeOutLocks(Connection dbConn) { prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); prefix.append(" AND \"HL_TXNID\" = 0 AND "); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); - - int deletedLocks = 0; - for (String query : queries) { - LOG.debug("Removing expired locks via: " + query); - deletedLocks += stmt.executeUpdate(query); - } - if(deletedLocks > 0) { - Collections.sort(extLockIDs);//easier to read logs - LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due to timeout (" + - "HL_LOCK_EXT_ID list: " + extLockIDs + ")"); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, timedOutLockIds, + "\"HL_LOCK_EXT_ID\"", true, false); + try (Statement stmt = dbConn.createStatement()) { + int deletedLocks = 0; + for (String query : queries) { + LOG.debug("Going to execute update: <" + query + ">"); + deletedLocks += stmt.executeUpdate(query); + } + if (deletedLocks > 0) { + LOG.info("Deleted {} locks due to timed-out. Lock ids: {}", deletedLocks, timedOutLockIds); + } + dbConn.commit(); } - LOG.debug("Going to commit"); - dbConn.commit(); } - catch(SQLException ex) { - LOG.error("Failed to purge timedout locks due to: " + getMessage(ex), ex); + catch (SQLException ex) { + LOG.error("Failed to purge timed-out locks: " + getMessage(ex), ex); } - catch(Exception ex) { - LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex); - } finally { - close(rs); - closeStmt(stmt); + catch (Exception ex) { + LOG.error("Failed to purge timed-out locks: " + ex.getMessage(), ex); } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index b3a1f826bb..4ee1a45aae 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -35,8 +35,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class TxnUtils { private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); @@ -172,9 +174,9 @@ public static String getFullTableName(String dbName, String tableName) { * Build a query (or queries if one query is too big but only for the case of 'IN' * composite clause. For the case of 'NOT IN' clauses, multiple queries change * the semantics of the intended query. - * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * E.g., Let's assume that input "inValues" parameter has [5, 6] and that * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, - * Then having two delete statements changes the semantics of the inteneded SQL statement. + * Then having two delete statements changes the semantics of the intended SQL statement. * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence * is not equal to 'delete from T where a not in (5, 6)'.) * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. @@ -192,7 +194,7 @@ public static String getFullTableName(String dbName, String tableName) { * @param queries OUT: Array of query strings * @param prefix IN: Part of the query that comes before IN list * @param suffix IN: Part of the query that comes after IN list - * @param inList IN: the list with IN list values + * @param inValues IN: Collection containing IN clause values * @param inColumn IN: single column name of IN list operator * @param addParens IN: add a pair of parenthesis outside the IN lists * e.g. "(id in (1,2,3) OR id in (4,5,6))" @@ -203,17 +205,16 @@ public static String getFullTableName(String dbName, String tableName) { List queries, StringBuilder prefix, StringBuilder suffix, - List inList, + Collection inValues, String inColumn, boolean addParens, boolean notIn) { - List inListStrings = new ArrayList<>(inList.size()); - for (Long aLong : inList) { - inListStrings.add(aLong.toString()); - } - return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, - inListStrings, inColumn, addParens, notIn); + List inValueStrings = inValues.stream() + .map(Object::toString) + .collect(Collectors.toList()); + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inValueStrings, inColumn, addParens, notIn); } /** * Build a query (or queries if one query is too big but only for the case of 'IN'