Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1499669) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -142,6 +142,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.MutableClassToInstanceMap; /** @@ -2241,6 +2242,8 @@ /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + Set rowsAlreadyLocked = Sets.newHashSet(); + // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -2297,15 +2300,24 @@ // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; + boolean failedToAcquire = false; Integer acquiredLockId = null; + HashedBytes currentRow = new HashedBytes(mutation.getRow()); try { - acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + if (providedLockId != null || !rowsAlreadyLocked.contains(currentRow)) { + acquiredLockId = getLock(providedLockId, currentRow, shouldBlock); + if (acquiredLockId == null) { + failedToAcquire = true; + } + if (providedLockId == null) { + rowsAlreadyLocked.add(currentRow); + } + } } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock in batch put, row=" + currentRow, ioe); + failedToAcquire = true; } - if (acquiredLockId == null) { + if (failedToAcquire) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; break; // stop acquiring more rows for this batch @@ -2452,6 +2464,7 @@ releaseRowLock(toRelease); } acquiredLocks = null; + rowsAlreadyLocked = null; } // ------------------------- // STEP 7. Sync wal. @@ -3432,7 +3445,7 @@ this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get()); try { - return internalObtainRowLock(row, true); + return internalObtainRowLock(new HashedBytes(row), true); } finally { closeRegionOperation(); } @@ -3444,12 +3457,11 @@ * Otherwise, just tries to obtain the lock and returns * null if unavailable. */ - private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) + private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock) throws IOException { - checkRow(row, "row lock"); + checkRow(rowKey.getBytes(), "row lock"); startRegionOperation(); try { - HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) @@ -3465,8 +3477,7 @@ try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out on getting lock for row=" - + Bytes.toStringBinary(row)); + throw new IOException("Timed out on getting lock for row=" + rowKey); } } catch (InterruptedException ie) { // Empty @@ -3540,12 +3551,27 @@ */ public Integer getLock(Integer lockid, byte [] row, boolean waitForLock) throws IOException { - Integer lid = null; + return getLock(lockid, new HashedBytes(row), waitForLock); + } + + /** + * Returns existing row lock if found, otherwise + * obtains a new row lock and returns it. + * @param lockid requested by the user, or null if the user didn't already hold lock + * @param row the row to lock + * @param waitForLock if true, will block until the lock is available, otherwise will + * simply return null if it could not acquire the lock. + * @return lockid or null if waitForLock is false and the lock was unavailable. + */ + protected Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) + throws IOException { + Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { - if (!isRowLocked(lockid)) { - throw new IOException("Invalid row lock"); + HashedBytes rowFromLock = lockIds.get(lockid); + if (!row.equals(rowFromLock)) { + throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + rowFromLock + " but wanted lock for row: " + row); } lid = lockid; }