From db9d743d05b1e993da89bb91dded01cc5b45fe41 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 28 Nov 2017 09:14:58 -0800 Subject: [PATCH] HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang) This patch plus a sorting of the batch (HBASE-17924) fixes a regression in Increment/CheckAndPut-style operations. Signed-off-by: Yu Li Signed-off-by: Allan Yang --- .../apache/hadoop/hbase/regionserver/HRegion.java | 82 ++++++++---- .../apache/hadoop/hbase/regionserver/Region.java | 4 +- .../hadoop/hbase/client/TestMultiParallel.java | 141 +++++++++++++++++++++ .../hbase/regionserver/TestAtomicOperation.java | 13 +- 4 files changed, 212 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a7ed011e7c..fe7d905e44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2102,7 +2102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Should the store be flushed because it is old enough. *

* Every FlushPolicy should call this to determine whether a store is old enough to flush(except - * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always + * that you always flush all stores). Otherwise the #shouldFlush() method will always * returns true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { @@ -3135,18 +3135,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } + + //HBASE-18233 // If we haven't got any rows in our batch, we should block to - // get the next one. + // get the next one's read lock. We need at least one row to mutate. + // If we have got rows, do not block when lock is not available, + // so that we can fail fast and go on with the rows with locks in + // the batch. By doing this, we can reduce contention and prevent + // possible deadlocks. + // The unfinished rows in the batch will be detected in batchMutate, + // and it wil try to finish them by calling doMiniBatchMutation again. + boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), true); + rowLock = getRowLock(mutation.getRow(), true, shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } if (rowLock == null) { - // We failed to grab another lock - break; // stop acquiring more rows for this batch + // We failed to grab another lock. Stop acquiring more rows for this + // batch and go on with the gotten ones + break; + } else { acquiredRowLocks.add(rowLock); } @@ -3242,7 +3253,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now); // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + acquiredRowLocks.add(getRowLock(cpMutation.getRow(), true)); if (cpMutation.getDurability() == Durability.SKIP_WAL) { recordMutationWithoutWal(cpFamilyMap); @@ -3537,7 +3548,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addColumn(family, qualifier); checkRow(row, "checkAndMutate"); // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLockInternal(get.getRow(), false); + RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.await(); try { @@ -3647,7 +3658,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addColumn(family, qualifier); checkRow(row, "checkAndRowMutate"); // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLockInternal(get.getRow(), false); + RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.await(); try { @@ -3874,9 +3885,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * not check the families for validity. * * @param familyMap Map of kvs per family - * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. - * If null, then this method internally creates a mvcc transaction. - * @param output newly added KVs into memstore * @param isInReplay true when adding replayed KVs into memstore * @return the additional memory usage of the memstore caused by the * new entries. @@ -5292,7 +5300,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Get an exclusive ( write lock ) lock on a given row. * @param row Which row to lock. * @return A locked RowLock. The lock is exclusive and already aqquired. - * @throws IOException + * @throws IOException if any error occurred */ public RowLock getRowLock(byte[] row) throws IOException { return getRowLock(row, false); @@ -5306,17 +5314,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * started (the calling thread has already acquired the region-close-guard lock). * @param row The row actions will be performed against * @param readLock is the lock reader or writer. True indicates that a non-exlcusive - * lock is requested + * lock is requested + * @return A locked RowLock. + * @throws IOException if any error occurred */ @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { - // Make sure the row is inside of this region before getting the lock for it. - checkRow(row, "row lock"); - return getRowLockInternal(row, readLock); + return getRowLock(row, readLock, true); } - protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException { - // create an object to use a a key in the row lock map + /** + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + * @param waitForLock whether should wait for this lock + * @return A locked RowLock, or null if {@code waitForLock} set to false and tryLock failed + * @throws IOException if any error occurred + */ + public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { + // Make sure the row is inside of this region before getting the lock for it. + checkRow(row, "row lock"); HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = null; @@ -5354,12 +5376,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = rowLockContext.newWriteLock(); } } - if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + boolean lockAvailable = false; + if(waitForLock) { + //if waiting for lock, wait for rowLockWaitDuration milliseconds + lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS); + } else { + //if we are not waiting for lock, tryLock() will return immediately whether we have got + //this lock or not + lockAvailable = result.getLock().tryLock(); + } + if(!lockAvailable) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; - throw new IOException("Timed out waiting for lock for row: " + rowKey); + if(waitForLock) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } else { + return null; + } } rowLockContext.setThreadName(Thread.currentThread().getName()); success = true; @@ -7233,7 +7268,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); + acquiredRowLocks.add(getRowLock(row)); } // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); @@ -7491,7 +7526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALKey walKey = null; boolean doRollBackMemstore = false; try { - rowLock = getRowLockInternal(row, false); + rowLock = getRowLock(row); assert rowLock != null; try { lock(this.updatesLock.readLock()); @@ -7775,7 +7810,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List memstoreCells = new ArrayList(); Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); try { - rowLock = getRowLockInternal(increment.getRow(), false); + rowLock = getRowLock(increment.getRow()); long txid = 0; try { lock(this.updatesLock.readLock()); @@ -7985,7 +8020,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * from incrementCoordinates only. * @param increment * @param columnFamily - * @param incrementCoordinates * @return Return the Cells to Increment * @throws IOException */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 465102136b..59c1999257 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -277,14 +277,14 @@ public interface Region extends ConfigurationObserver { /** * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. + * @param readlock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + RowLock getRowLock(byte[] row, boolean readlock) throws IOException; /** * If the given list of row locks is not null, releases all locks. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 484bc0ea82..055e4acf52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -63,6 +64,7 @@ public class TestMultiParallel { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); @@ -727,4 +729,143 @@ public class TestMultiParallel { validateEmpty(result); } } + + private static class MultiThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public MultiThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + table.setAutoFlush(false); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + table.put(put); + } + table.flushCommits(); + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when put:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + + private static class IncrementThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public IncrementThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + Increment inc = new Increment(put.getRow()); + inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1); + table.increment(inc); + } + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when incr:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + /** + * UT for HBASE-18233, test for disordered batch mutation thread and + * increment won't lock each other + * @throws Exception if any error occurred + */ + @Test(timeout=300000) + public void testMultiThreadWithRowLocks() throws Exception { + //set a short timeout to get timeout exception when getting row lock fail + UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000); + UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000); + UTIL.getConfiguration().setInt("hbase.client.retries.number", 10); + + UTIL.createTable(TEST_TABLE2, BYTES_FAMILY); + List puts = new ArrayList<>(); + for(int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0)); + puts.add(put); + } + List reversePuts = new ArrayList<>(puts); + Collections.reverse(reversePuts); + int NUM_OF_THREAD = 12; + CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD); + CountDownLatch beginLatch = new CountDownLatch(1); + int threadNum = NUM_OF_THREAD / 4; + List multiThreads = new ArrayList<>(); + List incThreads = new ArrayList<>(); + for(int i = 0; i < threadNum; i ++) { + MultiThread thread = new MultiThread(reversePuts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + MultiThread thread = new MultiThread(puts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i ++) { + IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + IncrementThread thread = new IncrementThread(puts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + long timeBegin = System.currentTimeMillis(); + beginLatch.countDown(); + latch.await(); + LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin)); + for(MultiThread thread : multiThreads) { + Assert.assertTrue(thread.throwable == null); + } + for(IncrementThread thread : incThreads) { + Assert.assertTrue(thread.throwable == null); + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index edef89992d..e0f13ea26b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -663,11 +663,20 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException { + public RowLock getRowLock(final byte[] row, boolean readLock, boolean waitForLock) + throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, readLock)); + return new WrappedRowLock(super.getRowLock(row, readLock, waitForLock)); + } + + @Override + public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException { + if (testStep == TestStep.CHECKANDPUT_STARTED) { + latch.countDown(); + } + return new WrappedRowLock(super.getRowLock(row, readLock)); } public class WrappedRowLock implements RowLock { -- 2.11.0 (Apple Git-81)