From a9ed4c57980580b6bbbcb902d835e5a285f61f3a Mon Sep 17 00:00:00 2001 From: huaxiangsun Date: Wed, 15 Nov 2017 17:37:49 -0800 Subject: [PATCH] HBASE-19163 Maximum lock count exceeded from region server's batch processing --- .../apache/hadoop/hbase/regionserver/HRegion.java | 50 +++++++++++++--------- .../hadoop/hbase/client/TestFromClientSide3.java | 25 +++++++++++ 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3a3cb03..4e45c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3149,33 +3149,43 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks) throws IOException { int readyToWriteCount = 0; int lastIndexExclusive = 0; + byte[] lastLockedRow = null; for (; lastIndexExclusive < size(); lastIndexExclusive++) { if (!isOperationPending(lastIndexExclusive)) { continue; } Mutation mutation = getMutation(lastIndexExclusive); - // If we haven't got any rows in our batch, we should block to get the next one. - RowLock rowLock = null; - try { - // if atomic then get exclusive lock, else shared lock - rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); - } catch (TimeoutIOException e) { - // We will retry when other exceptions, but we should stop if we timeout . - throw e; - } catch (IOException ioe) { - LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); - if (isAtomic()) { // fail, atomic means all or none - throw ioe; + + // After HBASE-17924, mutations in the batch is sorted. For mutations with the same key, + // it only needs to acquire one read lock. + if ((lastLockedRow == null) || !Bytes.equals( + lastLockedRow, 0, lastLockedRow.length, + mutation.getRow(), 0, mutation.getRow().length)) { + + // If we haven't got any rows in our batch, we should block to get the next one. + RowLock rowLock = null; + try { + // if atomic then get exclusive lock, else shared lock + rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; + } catch (IOException ioe) { + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + if (isAtomic()) { // fail, atomic means all or none + throw ioe; + } } - } - if (rowLock == null) { - // We failed to grab another lock - if (isAtomic()) { - throw new IOException("Can't apply all operations atomically!"); + if (rowLock == null) { + // We failed to grab another lock + if (isAtomic()) { + throw new IOException("Can't apply all operations atomically!"); + } + break; // Stop acquiring more rows for this batch + } else { + acquiredRowLocks.add(rowLock); + lastLockedRow = mutation.getRow(); } - break; // Stop acquiring more rows for this batch - } else { - acquiredRowLocks.add(rowLock); } readyToWriteCount++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 5f7622a..961fefb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -412,6 +412,31 @@ public class TestFromClientSide3 { } @Test + public void testHTableLargeBatch ()throws Exception { + Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + new byte[][] { FAMILY }); + int sixtyFourK = 64 * 1024; + try { + List actions = (List) new ArrayList(); + Object[] results = new Object[(sixtyFourK + 1) * 2]; + + for (int i = 0; i < sixtyFourK + 1; i ++) { + Put put1 = new Put(ROW); + put1.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put1); + + Put put2 = new Put(ANOTHERROW); + put2.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put2); + } + + table.batch(actions, results); + } finally { + table.close(); + } + } + + @Test public void testBatchWithRowMutation() throws Exception { LOG.info("Starting testBatchWithRowMutation"); final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation"); -- 2.5.2