From 0803ef027031b0de7f7542ca5dc988c89d0a9f4d Mon Sep 17 00:00:00 2001 From: huaxiangsun Date: Mon, 20 Nov 2017 14:12:31 -0800 Subject: [PATCH] HBASE-19163 Maximum lock count exceeded from region server's batch processing --- .../apache/hadoop/hbase/regionserver/HRegion.java | 72 ++++++++++++++++++---- .../hadoop/hbase/client/TestFromClientSide3.java | 27 ++++++++ .../client/TestSnapshotCloneIndependence.java | 7 +++ .../master/cleaner/TestSnapshotFromMaster.java | 7 +++ .../hbase/regionserver/TestAtomicOperation.java | 5 +- .../hbase/regionserver/TestCompactSplitThread.java | 7 +++ .../hbase/snapshot/TestRegionSnapshotTask.java | 7 +++ 7 files changed, 118 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3a3cb03..c78ec41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -226,6 +226,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize"; public static final int DEFAULT_MAX_CELL_SIZE = 10485760; + public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE = + "hbase.regionserver.minibatch.size"; + public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 10000; + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -338,6 +342,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // in bytes final long maxCellSize; + // Number of mutations for minibatch processing. + private final int miniBatchSize; + // negative number indicates infinite timeout static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); @@ -809,6 +816,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); + this.miniBatchSize = conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE, + DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE); } void setHTableSpecificConf() { @@ -3149,7 +3158,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks) throws IOException { int readyToWriteCount = 0; int lastIndexExclusive = 0; + RowLock prevRowLock = null; for (; lastIndexExclusive < size(); lastIndexExclusive++) { + // It reaches the miniBatchSize, stop here and process the miniBatch + // This only applies to non-atomic batch operations. + if (!isAtomic() && (readyToWriteCount == region.miniBatchSize)) { + break; + } + if (!isOperationPending(lastIndexExclusive)) { continue; } @@ -3158,7 +3174,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RowLock rowLock = null; try { // if atomic then get exclusive lock, else shared lock - rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); + rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); } catch (TimeoutIOException e) { // We will retry when other exceptions, but we should stop if we timeout . throw e; @@ -3175,8 +3191,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } break; // Stop acquiring more rows for this batch } else { - acquiredRowLocks.add(rowLock); + if (rowLock != prevRowLock) { + // It is a different row now, add this to the acquiredRowLocks and + // set prevRowLock to the new returned rowLock + acquiredRowLocks.add(rowLock); + prevRowLock = rowLock; + } } + readyToWriteCount++; } return createMiniBatch(lastIndexExclusive, readyToWriteCount); @@ -3565,7 +3587,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.checkAndPrepareMutation(cpMutation, timestamp); // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true)); + acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true, null)); // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. @@ -3942,7 +3964,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addColumn(family, qualifier); // Lock row - note that doBatchMutate will relock this row if called checkRow(row, "doCheckAndRowMutate"); - RowLock rowLock = getRowLockInternal(get.getRow(), false); + RowLock rowLock = getRowLockInternal(get.getRow(), false, null); try { if (mutation != null && this.getCoprocessorHost() != null) { // Call coprocessor. @@ -5557,10 +5579,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { checkRow(row, "row lock"); - return getRowLockInternal(row, readLock); + return getRowLockInternal(row, readLock, null); } - protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException { + protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock prevRowLock) + throws IOException { // create an object to use a a key in the row lock map HashedBytes rowKey = new HashedBytes(row); @@ -5577,6 +5600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Now try an get the lock. // This can fail as if (readLock) { + // For read lock, if the caller has locked the same row previously, it will not try + // to acquire the same read lock. It simply returns the previous row lock. + RowLockImpl prevRowLockImpl = (RowLockImpl)prevRowLock; + if ((prevRowLockImpl != null) && (prevRowLockImpl.getLock() == + rowLockContext.readWriteLock.readLock())) { + success = true; + return prevRowLock; + } result = rowLockContext.newReadLock(); } else { result = rowLockContext.newWriteLock(); @@ -5599,7 +5630,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { TraceUtil.addTimelineAnnotation("Failed to get row lock"); - result = null; String message = "Timed out waiting for lock for row: " + rowKey + " in region " + getRegionInfo().getEncodedName(); if (reachDeadlineFirst) { @@ -5619,6 +5649,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock"); Thread.currentThread().interrupt(); throw iie; + } catch (Error error) { + // The maximum lock count for read lock is 64K (hardcoded), when this maximum count + // is reached, it will throw out an Error. This Error needs to be caught so it can + // go ahead to process the minibatch with lock acquired. + LOG.warn("Error to get row lock for " + Bytes.toStringBinary(row) + ", cause: " + error); + IOException ioe = new IOException(); + ioe.initCause(error); + TraceUtil.addTimelineAnnotation("Error getting row lock"); + throw ioe; } finally { // Clean up the counts just in case this was the thing keeping the context alive. if (!success && rowLockContext != null) { @@ -7166,10 +7205,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public MiniBatchOperationInProgress lockRowsAndBuildMiniBatch( List acquiredRowLocks) throws IOException { + RowLock prevRowLock = null; for (byte[] row : rowsToLock) { try { - RowLock rowLock = region.getRowLockInternal(row, false); // write lock - acquiredRowLocks.add(rowLock); + RowLock rowLock = region.getRowLockInternal(row, false, prevRowLock); // write lock + if (rowLock != prevRowLock) { + acquiredRowLocks.add(rowLock); + prevRowLock = rowLock; + } } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe); throw ioe; @@ -7256,10 +7299,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // STEP 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList<>(rowsToLock.size()); + RowLock prevRowLock = null; for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); + RowLock rowLock = getRowLockInternal(row, false, prevRowLock); + if (rowLock != prevRowLock) { + acquiredRowLocks.add(rowLock); + prevRowLock = rowLock; + } } // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); @@ -7437,7 +7485,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RowLock rowLock = null; MemStoreSizing memstoreAccounting = new MemStoreSizing(); try { - rowLock = getRowLockInternal(mutation.getRow(), false); + rowLock = getRowLockInternal(mutation.getRow(), false, null); lock(this.updatesLock.readLock()); try { Result cpResult = doCoprocessorPreCall(op, mutation); @@ -7786,7 +7834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 5f7622a..e5d5324 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -411,6 +411,33 @@ public class TestFromClientSide3 { } } + // Test Table.batch with large amount of mutations against the same key. + // It used to trigger read lock's "Maximum lock count exceeded" Error. + @Test + public void testHTableWithLargeBatch() throws Exception { + Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + new byte[][] { FAMILY }); + int sixtyFourK = 64 * 1024; + try { + List actions = new ArrayList(); + Object[] results = new Object[(sixtyFourK + 1) * 2]; + + for (int i = 0; i < sixtyFourK + 1; i ++) { + Put put1 = new Put(ROW); + put1.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put1); + + Put put2 = new Put(ANOTHERROW); + put2.addColumn(FAMILY, QUALIFIER, VALUE); + actions.add(put2); + } + + table.batch(actions, results); + } finally { + table.close(); + } + } + @Test public void testBatchWithRowMutation() throws Exception { LOG.info("Starting testBatchWithRowMutation"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java index 5688617..095fb01 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotCloneIndependence.java @@ -118,6 +118,13 @@ public class TestSnapshotCloneIndependence { // will even trigger races between creating the directory containing back references and // the back reference itself. conf.setInt("hbase.master.hfilecleaner.ttl", CLEANER_INTERVAL); + + // Set the minibatch size to 64000, the default behaviour is that it is unlimited + // so the batch can be processed in one batch. Right now, the default minibatch size + // is set to 10000, so for huge batch, it is divied into multiple minibatches. + // If one of the minibatch fails, the whole batch is going to be retried. + // Set minibatchSize to a large number to avoid that. + conf.setInt("hbase.regionserver.minibatch.size", 64000); } @Before diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index 756152e..a5659b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -113,6 +113,13 @@ public class TestSnapshotFromMaster { conf.setInt("hbase.regionsever.info.port", -1); // change the flush size to a small amount, regulating number of store files conf.setInt("hbase.hregion.memstore.flush.size", 25000); + + // Set the minibatch size to 64000, the default behaviour is that it is unlimited + // so the batch can be processed in one batch. Right now, the default minibatch size + // is set to 10000, so for huge batch, it is divied into multiple minibatches. + // If one of the minibatch fails, the whole batch is going to be retried. + // Set minibatchSize to a large number to avoid that. + conf.setInt("hbase.regionserver.minibatch.size", 64000); // so make sure we get a compaction when doing a load, but keep around some // files in the store conf.setInt("hbase.hstore.compaction.min", 2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 3aed91c..af82692 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -667,11 +667,12 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean readLock, + final RowLock prevRowlock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, readLock)); + return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); } public class WrappedRowLock implements RowLock { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java index 6d06494..3f69c87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -74,6 +74,13 @@ public class TestCompactSplitThread { // change the flush size to a small amount, regulating number of store files conf.setInt("hbase.hregion.memstore.flush.size", 25000); + // Set the minibatch size to 64000, the default behaviour is that it is unlimited + // so the batch can be processed in one batch. Right now, the default minibatch size + // is set to 10000, so for huge batch, it is divied into multiple minibatches. + // If one of the minibatch fails, the whole batch is going to be retried. + // Set minibatchSize to a large number to avoid that. + conf.setInt("hbase.regionserver.minibatch.size", 64000); + // block writes if we get to blockingStoreFiles store files conf.setInt("hbase.hstore.blockingStoreFiles", blockingStoreFiles); // Ensure no extra cleaners on by default (e.g. TimeToLiveHFileCleaner) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java index df8fc64..a712bb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java @@ -78,6 +78,13 @@ public class TestRegionSnapshotTask { conf.setInt("hbase.hfile.compaction.discharger.interval", 1000); conf.setInt("hbase.master.hfilecleaner.ttl", 1000); + // Set the minibatch size to 64000, the default behaviour is that it is unlimited + // so the batch can be processed in one batch. Right now, the default minibatch size + // is set to 10000, so for huge batch, it is divied into multiple minibatches. + // If one of the minibatch fails, the whole batch is going to be retried. + // Set minibatchSize to a large number to avoid that. + conf.setInt("hbase.regionserver.minibatch.size", 64000); + TEST_UTIL.startMiniCluster(1); TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); -- 2.5.2