diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 92e8ef7..671b385 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3176,6 +3176,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; + RowLock prevRowLock = null; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; WALKey walKey = null; @@ -3257,7 +3258,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock); + rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock, prevRowLock); } catch (TimeoutIOException e) { // We will retry when other exceptions, but we should stop if we timeout . throw e; @@ -3271,7 +3272,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi break; } else { - acquiredRowLocks.add(rowLock); + if (rowLock != prevRowLock) { + // It is a different row now, add this to the acquiredRowLocks and + // set prevRowLock to the new returned rowLock + acquiredRowLocks.add(rowLock); + prevRowLock = rowLock; + } } lastIndexExclusive++; @@ -3368,7 +3374,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now); // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true)); + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true, null)); // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. @@ -5464,17 +5470,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); - return getRowLockInternal(row, readLock, waitForLock); + return getRowLockInternal(row, readLock, waitForLock, null); } // getRowLock calls checkRow. Call this to skip checkRow. protected RowLock getRowLockInternal(byte[] row) throws IOException { - return getRowLockInternal(row, false, true); + return getRowLockInternal(row, false, true, null); } - protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock) - throws IOException { + protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock, + final RowLock prevRowLock) throws IOException { // create an object to use a a key in the row lock map HashedBytes rowKey = new HashedBytes(row); @@ -5508,6 +5514,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // // This can fail as if (readLock) { + // For read lock, if the caller has locked the same row previously, it will not try + // to acquire the same read lock. It simply returns the previous row lock. + RowLockImpl prevRowLockImpl = (RowLockImpl)prevRowLock; + if ((prevRowLockImpl != null) && (prevRowLockImpl.getLock() == + rowLockContext.readWriteLock.readLock())) { + success = true; + return prevRowLock; + } result = rowLockContext.newReadLock(); } else { result = rowLockContext.newWriteLock(); @@ -5569,6 +5583,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Thread.currentThread().interrupt(); throw iie; + } catch (Error error) { + // The maximum lock count for read lock is 64K (hardcoded), when this maximum count + // is reached, it will throw out an Error. This Error needs to be caught so it can + // go ahead to process the minibatch with lock acquired. + LOG.warn("Error to get row lock for " + Bytes.toStringBinary(row) + ", cause: " + error); + IOException ioe = new IOException(); + ioe.initCause(error); + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Error getting row lock"); + } + throw ioe; } finally { // Clean up the counts just in case this was the thing keeping the context alive. if (!success && rowLockContext != null) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 385d4ad..515c989 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -413,6 +413,33 @@ public class TestFromClientSide3 { } } + // Test Table.batch with large amount of mutations against the same key. + // It used to trigger read lock's "Maximum lock count exceeded" Error. + @Test + public void testHTableWithLargeBatch() throws Exception { + Table table = TEST_UTIL.createTable(TableName.valueOf("testHTableWithLargeBatch"), + new byte[][] { FAMILY }); + int sixtyFourK = 64 * 1024; + try { + List actions = new ArrayList(); + Object[] results = new Object[(sixtyFourK + 1) * 2]; + + for (int i = 0; i < sixtyFourK + 1; i ++) { + Put put1 = new Put(ROW); + put1.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put1); + + Put put2 = new Put(ANOTHERROW); + put2.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put2); + } + + table.batch(actions, results); + } finally { + table.close(); + } + } + @Test public void testBatchWithRowMutation() throws Exception { LOG.info("Starting testBatchWithRowMutation"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index d9415df..b9f2290 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -663,12 +663,12 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock) - throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock, + final RowLock prevRowLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock)); + return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock, prevRowLock)); } public class WrappedRowLock implements RowLock {