diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07a10df..f922b9c 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -198,8 +198,8 @@ public class HRegion implements HeapSize { // , Writable{ // Members ////////////////////////////////////////////////////////////////////////////// - private final ConcurrentHashMap lockedRows = - new ConcurrentHashMap(); + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); private final ConcurrentHashMap lockIds = new ConcurrentHashMap(); private final AtomicInteger lockIdGenerator = new AtomicInteger(1); @@ -2300,7 +2300,7 @@ public class HRegion implements HeapSize { // , Writable{ Integer acquiredLockId = null; try { acquiredLockId = getLock(providedLockId, mutation.getRow(), - shouldBlock); + shouldBlock, true); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -3406,6 +3406,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Obtain a lock on the given row. Blocks until success. + * Does not support reentrant row locks. * * I know it's strange to have two mappings: *
@@ -3432,7 +3433,7 @@ public class HRegion implements HeapSize { // , Writable{
     this.writeRequestsCount.increment();
     this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get());
     try {
-      return internalObtainRowLock(row, true);
+      return internalObtainRowLock(row, true, false);
     } finally {
       closeRegionOperation();
     }
@@ -3443,27 +3444,34 @@ public class HRegion implements HeapSize { // , Writable{
    * @param waitForLock if true, will block until the lock is available.
    *        Otherwise, just tries to obtain the lock and returns
    *        null if unavailable.
+   * @param allowReentrancy if true, subsequent calls for the same row from the same thread will
+   *        succeed and return the same lock id
    */
-  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
+  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock, boolean allowReentrancy)
       throws IOException {
     checkRow(row, "row lock");
     startRegionOperation();
     try {
       HashedBytes rowKey = new HashedBytes(row);
       CountDownLatch rowLatch = new CountDownLatch(1);
+      RowLockContext rowLockContext = new RowLockContext(rowLatch, allowReentrancy ? Thread.currentThread() : null);
 
       // loop until we acquire the row lock (unless !waitForLock)
       while (true) {
-        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
-        if (existingLatch == null) {
+        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+        if (existingContext == null) {
+          // row not locked
           break;
+        } else if (allowReentrancy && existingContext.thread == Thread.currentThread()) {
+          // The row is already locked by the same thread
+          return existingContext.lockId;
         } else {
           // row already locked
           if (!waitForLock) {
             return null;
           }
           try {
-            if (!existingLatch.await(this.rowLockWaitDuration,
+            if (!existingContext.latch.await(this.rowLockWaitDuration,
                             TimeUnit.MILLISECONDS)) {
               throw new IOException("Timed out on getting lock for row="
                   + Bytes.toStringBinary(row));
@@ -3479,6 +3487,7 @@ public class HRegion implements HeapSize { // , Writable{
         Integer lockId = lockIdGenerator.incrementAndGet();
         HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
         if (existingRowKey == null) {
+          rowLockContext.lockId = lockId;
           return lockId;
         } else {
           // lockId already in use, jump generator to a new spot
@@ -3501,23 +3510,23 @@ public class HRegion implements HeapSize { // , Writable{
   }
 
   /**
-   * Release the row lock!
+   * Release the row lock (if the given lockId was held, otherwise do nothing.)
    * @param lockId  The lock ID to release.
    */
   public void releaseRowLock(final Integer lockId) {
     if (lockId == null) return; // null lock id, do nothing
     HashedBytes rowKey = lockIds.remove(lockId);
     if (rowKey == null) {
-      LOG.warn("Release unknown lockId: " + lockId);
+      // Row not locked.  Likely released by the same thread already.
       return;
     }
-    CountDownLatch rowLatch = lockedRows.remove(rowKey);
-    if (rowLatch == null) {
+    RowLockContext rowLockContext = lockedRows.remove(rowKey);
+    if (rowLockContext == null) {
       LOG.error("Releases row not locked, lockId: " + lockId + " row: "
           + rowKey);
       return;
     }
-    rowLatch.countDown();
+    rowLockContext.latch.countDown();
   }
 
   /**
@@ -3536,13 +3545,15 @@ public class HRegion implements HeapSize { // , Writable{
    * @param row the row to lock
    * @param waitForLock if true, will block until the lock is available, otherwise will
    * simply return null if it could not acquire the lock.
+   * @param allowReentrancy if true, subsequent calls for the same row from the same thread will
+   *        succeed and return the same lock id 
    * @return lockid or null if waitForLock is false and the lock was unavailable.
    */
-  public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
+  public Integer getLock(Integer lockid, byte [] row, boolean waitForLock, boolean allowReentrancy)
   throws IOException {
     Integer lid = null;
     if (lockid == null) {
-      lid = internalObtainRowLock(row, waitForLock);
+      lid = internalObtainRowLock(row, waitForLock, allowReentrancy);
     } else {
       if (!isRowLocked(lockid)) {
         throw new IOException("Invalid row lock");
@@ -3551,6 +3562,20 @@ public class HRegion implements HeapSize { // , Writable{
     }
     return lid;
   }
+  
+  /**
+   * Returns existing row lock if found, otherwise
+   * obtains a new row lock and returns it.
+   * Does not allow reentrant row locks.
+   * @param lockid requested by the user, or null if the user didn't already hold lock
+   * @param row the row to lock
+   * @param waitForLock if true, will block until the lock is available, otherwise will
+   * simply return null if it could not acquire the lock.
+   * @return lockid or null if waitForLock is false and the lock was unavailable.
+   */  public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
+      throws IOException {
+    return getLock(lockid, row, waitForLock, false);
+  }
 
   /**
    * Determines whether multiple column families are present
@@ -4874,7 +4899,7 @@ public class HRegion implements HeapSize { // , Writable{
       acquiredLocks = new ArrayList(rowsToLock.size());
       for (byte[] row : rowsToLock) {
         // attempt to lock all involved rows, fail if one lock times out
-        Integer lid = getLock(null, row, true);
+        Integer lid = getLock(null, row, true, true);
         if (lid == null) {
           throw new IOException("Failed to acquire lock on "
               + Bytes.toStringBinary(row));
@@ -6013,4 +6038,16 @@ public class HRegion implements HeapSize { // , Writable{
     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
 
   }
+  
+  private static class RowLockContext {
+    
+    private CountDownLatch latch;
+    private Thread thread;
+    private Integer lockId;
+    
+    public RowLockContext(CountDownLatch latch, Thread thread) {
+      this.latch = latch;
+      this.thread = thread;
+    }
+  }
 }