diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 4b6bc3e1e3..aa5d5d1137 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -112,6 +112,7 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List transactionalListeners; /** @@ -4021,30 +4026,35 @@ private void determineDatabaseProduct(Connection conn) { private final LockState state; private final LockType type; + LockInfo(ResultSet rs) throws MetaException, SQLException { + this(rs, ""); + } + // Assumes the result set is set to a valid row - LockInfo(ResultSet rs) throws SQLException, MetaException { - extLockId = rs.getLong("HL_LOCK_EXT_ID"); // can't be null - intLockId = rs.getLong("HL_LOCK_INT_ID"); // can't be null - db = rs.getString("HL_DB"); // can't be null - String t = rs.getString("HL_TABLE"); + LockInfo(ResultSet rs, String columnPrefix) throws SQLException, MetaException { + extLockId = rs.getLong(columnPrefix + "HL_LOCK_EXT_ID"); // can't be null + intLockId = rs.getLong(columnPrefix + "HL_LOCK_INT_ID"); // can't be null + db = rs.getString(columnPrefix + "HL_DB"); // can't be null + String t = rs.getString(columnPrefix + "HL_TABLE"); table = (rs.wasNull() ? null : t); - String p = rs.getString("HL_PARTITION"); + String p = rs.getString(columnPrefix + "HL_PARTITION"); partition = (rs.wasNull() ? null : p); - switch (rs.getString("HL_LOCK_STATE").charAt(0)) { + switch (rs.getString(columnPrefix + "HL_LOCK_STATE").charAt(0)) { case LOCK_WAITING: state = LockState.WAITING; break; case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; default: - throw new MetaException("Unknown lock state " + rs.getString("HL_LOCK_STATE").charAt(0)); + throw new MetaException("Unknown lock state " + rs.getString(columnPrefix + "HL_LOCK_STATE").charAt(0)); } - switch (rs.getString("HL_LOCK_TYPE").charAt(0)) { + switch (rs.getString(columnPrefix + "HL_LOCK_TYPE").charAt(0)) { case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; case LOCK_SHARED: type = LockType.SHARED_READ; break; case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; default: - throw new MetaException("Unknown lock type " + rs.getString("HL_LOCK_TYPE").charAt(0)); + throw new MetaException("Unknown lock type " + rs.getString(columnPrefix + "HL_LOCK_TYPE").charAt(0)); } - txnId = rs.getLong("HL_TXNID");//returns 0 if value is NULL + txnId = rs.getLong(columnPrefix + "HL_TXNID");//returns 0 if value is NULL } + LockInfo(ShowLocksResponseElement e) { extLockId = e.getLockid(); intLockId = e.getLockIdInternal(); @@ -4260,8 +4270,8 @@ private static boolean isValidTxn(long txnId) { return txnId != 0; } /** - * hl_lock_ext_id by only checking earlier locks. * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller + * hl_lock_ext_id by only checking earlier locks. * * For any given SQL statement all locks required by it are grouped under single extLockId and are * granted all at once or all locks wait. @@ -4293,7 +4303,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); //being acquired now response.setLockid(extLockId); - //This the set of entities that the statement represented by extLockId wants to update + //This is the set of entities that the statement represented by extLockId wants to update List writeSet = new ArrayList<>(); for (LockInfo info : locksBeingChecked) { @@ -4354,10 +4364,9 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) close(rs, stmt, null); } - String queryStr = - " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" AS \"REQ_LOCK_INT_ID\" FROM (" + - " SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + - " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + + String queryStr = " \"EX\".*, " + REQ_LOCKS_ALIASED_FIELDS + " FROM " + + "(SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + " WHERE \"HL_LOCK_EXT_ID\" < " + extLockId + ") \"EX\"" + " INNER JOIN (" + " SELECT \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + @@ -4403,24 +4412,23 @@ is performed on that db (e.g. show tables, created table, etc). if (rs.next()) { // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. LockInfo blockedBy = new LockInfo(rs); - long intLockId = rs.getLong("REQ_LOCK_INT_ID"); + LockInfo blockedLock = new LockInfo(rs, "REQ_"); - String sqlText = "UPDATE \"HIVE_LOCKS\"" + + String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + - " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + blockedLock.intLockId; - LOG.debug("Executing sql: " + sqlText); - int updCnt = stmt.executeUpdate(sqlText); + LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">"); + int updCnt = stmt.executeUpdate(updateBlockedByQuery); if (updCnt != 1) { - shouldNeverHappen(txnId, extLockId, intLockId); + shouldNeverHappen(txnId, extLockId, blockedLock.intLockId); } LOG.debug("Going to commit"); dbConn.commit(); - LOG.debug("Lock({} intLockId:{} {}) is blocked by Lock({}})", JavaUtils.lockIdToString(extLockId), - intLockId, JavaUtils.txnIdToString(txnId), blockedBy); + LOG.debug("Failed to acquire lock: {}. It has been blocked by lock: {}", blockedLock, blockedBy); response.setState(LockState.WAITING); return response; } @@ -4439,7 +4447,7 @@ is performed on that db (e.g. show tables, created table, etc). private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { - if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { + if (locksBeingChecked == null || locksBeingChecked.isEmpty()) { return; } long txnId = locksBeingChecked.get(0).txnId; @@ -4457,25 +4465,21 @@ private void acquire(Connection dbConn, Statement stmt, List locksBein dbConn.rollback(); /*select all locks for this ext ID and see which ones are missing*/ StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); - ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId); - while(rs.next()) { - int intLockId = rs.getInt(1); - int idx = 0; - for(; idx < locksBeingChecked.size(); idx++) { - LockInfo expectedLock = locksBeingChecked.get(idx); - if(expectedLock != null && expectedLock.intLockId == intLockId) { - locksBeingChecked.set(idx, null); - break; - } + try (ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId)) { + List missingIntIds = new ArrayList<>(); + while (rs.next()) { + int intLockId = rs.getInt(1); + locksBeingChecked.stream() + .map(lockInfo -> lockInfo.intLockId) + .filter(i -> i == intLockId) + .findFirst() + .ifPresent(missingIntIds::add); } - } - for(LockInfo expectedLock : locksBeingChecked) { - if(expectedLock != null) { - sb.append(expectedLock.intLockId).append(","); + for (Long intId : missingIntIds) { + sb.append(intId).append(","); } + sb.append(") ").append(JavaUtils.txnIdToString(txnId)); } - sb.append(") ").append(JavaUtils.txnIdToString(txnId)); - close(rs); throw new NoSuchLockException(sb.toString()); } } @@ -4735,29 +4739,23 @@ private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) } // NEVER call this function without first calling heartbeat(long, long) - private List getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { - Statement stmt = null; - try { - stmt = dbConn.createStatement(); - String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + + private List getLockInfoFromLockId(Connection dbConn, long extLockId) throws MetaException, SQLException { + try (Statement stmt = dbConn.createStatement()) { + String query = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + "\"HL_LOCK_EXT_ID\" = " + extLockId; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - boolean sawAtLeastOne = false; - List ourLockInfo = new ArrayList<>(); - while (rs.next()) { - ourLockInfo.add(new LockInfo(rs)); - sawAtLeastOne = true; + List locks = new ArrayList<>(); + LOG.debug("Going to execute query <" + query + ">"); + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + locks.add(new LockInfo(rs)); + } } - if (!sawAtLeastOne) { + if (locks.isEmpty()) { throw new MetaException("This should never happen! We already " + - "checked the lock(" + JavaUtils.lockIdToString(extLockId) + ") existed but now we can't find it!"); + "checked the lock(" + JavaUtils.lockIdToString(extLockId) + ") existed but now we can't find it!"); } - return ourLockInfo; - } finally { - closeStmt(stmt); + return locks; } }