diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dfb7043..8974f49 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1386,6 +1386,7 @@ public class HRegion implements HeapSize { // , Writable{ private long doMiniBatchPut(BatchOperationInProgress> batchOp) throws IOException { long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); + boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); @@ -1443,6 +1444,10 @@ public class HRegion implements HeapSize { // , Writable{ byteNow); } + + this.updatesLock.readLock().lock(); + locked = true; + // ------------------------------------ // STEP 3. Write to WAL // ---------------------------------- @@ -1474,6 +1479,9 @@ public class HRegion implements HeapSize { // , Writable{ success = true; return addedSize; } finally { + if (locked) + this.updatesLock.readLock().unlock(); + for (Integer toRelease : acquiredLocks) { releaseRowLock(toRelease); } @@ -2986,6 +2994,7 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(); try { Integer lid = getLock(lockid, row, true); + this.updatesLock.readLock().lock(); try { // Process each family for (Map.Entry> family : @@ -3041,6 +3050,7 @@ public class HRegion implements HeapSize { // , Writable{ size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { + this.updatesLock.readLock().unlock(); releaseRowLock(lid); } } finally { @@ -3075,6 +3085,7 @@ public class HRegion implements HeapSize { // , Writable{ startRegionOperation(); try { Integer lid = obtainRowLock(row); + this.updatesLock.readLock().lock(); try { Store store = stores.get(family); @@ -3113,6 +3124,7 @@ public class HRegion implements HeapSize { // , Writable{ size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { + this.updatesLock.readLock().unlock(); releaseRowLock(lid); } } finally {