diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a6d56137b3..75066ce893 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -25,6 +25,7 @@ import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.commons.lang.NotImplementedException; +import org.apache.derby.iapi.store.raw.LockingPolicy; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; @@ -48,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; import javax.sql.DataSource; +import javax.swing.*; import java.io.PrintWriter; import java.nio.ByteBuffer; @@ -2446,10 +2448,6 @@ private static boolean isValidTxn(long txnId) { * * This is expected to run at READ_COMMITTED. * - * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take - * all locks for given extLockId or none. Would be more efficient to update state on all locks - * at once. Semantics are the same since this is all part of the same txn. - * * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS. * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid, * checkLock() will in the worst case keep locks in Waiting state a little longer. @@ -2656,7 +2654,6 @@ private LockResponse checkLock(Connection dbConn, long extLockId) // Look at everything in front of this lock to see if it should block // it or not. - boolean acquired = false; for (int i = index - 1; i >= 0; i--) { // Check if we're operating on the same database, if not, move on if (!locks[index].db.equals(locks[i].db)) { @@ -2708,20 +2705,16 @@ private LockResponse checkLock(Connection dbConn, long extLockId) } //fall through to ACQUIRE case ACQUIRE: - acquire(dbConn, stmt, extLockId, info); - acquired = true; break; case KEEP_LOOKING: continue; } - if (acquired) break; // We've acquired this lock component, - // so get out of the loop and look at the next component. + //if we got here, it means it's ok to acquire 'info' lock + break;// so exit the loop and check next lock } - - // If we've arrived here and we have not already acquired, it means there's nothing in the - // way of the lock, so acquire the lock. - if (!acquired) acquire(dbConn, stmt, extLockId, info); } + //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all + acquire(dbConn, stmt, locksBeingChecked); // We acquired all of the locks, so commit and return acquired. LOG.debug("Going to commit"); @@ -2735,6 +2728,48 @@ private LockResponse checkLock(Connection dbConn, long extLockId) } return response; } + private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) + throws SQLException, NoSuchLockException, MetaException { + if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { + return; + } + long txnId = locksBeingChecked.get(0).txnId; + long extLockId = locksBeingChecked.get(0).extLockId; + long now = getDbTime(dbConn); + String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + + //if lock is part of txn, heartbeat info is in txn record + "hl_last_heartbeat = " + (isValidTxn(txnId) ? 0 : now) + + ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + + " where hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < locksBeingChecked.size()) { + LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, List locksBeingChecked)"); + dbConn.rollback(); + /*select all locks for this ext ID and see which ones are missing*/ + StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); + ResultSet rs = stmt.executeQuery("select hl_lock_int_id from HIVE_LOCKS where hl_lock_ext_id = " + extLockId); + while(rs.next()) { + int intLockId = rs.getInt(1); + int idx = 0; + for(; idx < locksBeingChecked.size(); idx++) { + LockInfo expectedLock = locksBeingChecked.get(idx); + if(expectedLock != null && expectedLock.intLockId == intLockId) { + locksBeingChecked.set(idx, null); + break; + } + } + } + for(LockInfo expectedLock : locksBeingChecked) { + if(expectedLock != null) { + sb.append(expectedLock.intLockId).append(","); + } + } + sb.append(") ").append(JavaUtils.txnIdToString(txnId)); + close(rs); + throw new NoSuchLockException(sb.toString()); + } + } /** * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not @@ -2777,28 +2812,6 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { LOG.debug("Going to rollback to savepoint"); dbConn.rollback(save); } - - private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo) - throws SQLException, NoSuchLockException, MetaException { - long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - //if lock is part of txn, heartbeat info is in txn record - "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) + - ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + lockInfo.intLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," + - + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId)); - } - // We update the database, but we don't commit because there may be other - // locks together with this, and we only want to acquire one if we can - // acquire all. - } - /** * Heartbeats on the lock table. This commits, so do not enter it with any state. * Should not be called on a lock that belongs to transaction.