diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d2785bf..33a2f76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3050,18 +3050,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } + + //HBASE-18233 // If we haven't got any rows in our batch, we should block to - // get the next one. + // get the next one's read lock. We need at least one row to mutate. + // If we have got rows, do not block when lock is not available, + // so that we can fail fast and go on with the rows with locks in + // the batch. By doing this, we can reduce contention and prevent + // possible deadlocks. + // The unfinished rows in the batch will be detected in batchMutate, + // and it wil try to finish them by calling doMiniBatchMutation again. + boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLock(mutation.getRow(), true); + rowLock = getRowLock(mutation.getRow(), true, shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } if (rowLock == null) { - // We failed to grab another lock - break; // stop acquiring more rows for this batch + // We failed to grab another lock. Stop acquiring more rows for this + // batch and go on with the gotten ones + break; + } else { acquiredRowLocks.add(rowLock); } @@ -5151,9 +5162,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * started (the calling thread has already acquired the region-close-guard lock). * @param row The row actions will be performed against * @param readLock is the lock reader or writer. True indicates that a non-exlcusive - * lock is requested + * lock is requested + * */ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { + return getRowLock(row, readLock, true); + } + + /** + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + * @param waitForLock whether should wait for this lock + */ + public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); // create an object to use a a key in the row lock map @@ -5194,12 +5221,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = rowLockContext.newWriteLock(); } } - if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + boolean lockAvailable = false; + if(waitForLock) { + //if waiting for lock, wait for rowLockWaitDuration milliseconds + lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS); + } else { + //if we are not waiting for lock, tryLock() will return immediately whether we have got + //this lock or not + lockAvailable = result.getLock().tryLock(); + } + if(!lockAvailable) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; - throw new IOException("Timed out waiting for lock for row: " + rowKey); + if(waitForLock) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } else { + return null; + } } rowLockContext.setThreadName(Thread.currentThread().getName()); success = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 81fb0b9..dcf151c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -270,14 +270,14 @@ public interface Region extends ConfigurationObserver { /** * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. + * @param readlock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + RowLock getRowLock(byte[] row, boolean readlock) throws IOException; /** * If the given list of row locks is not null, releases all locks. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 935f6e8..9ebc1e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -63,6 +64,7 @@ public class TestMultiParallel { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); @@ -728,4 +730,143 @@ public class TestMultiParallel { validateEmpty(result); } } + + private static class MultiThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public MultiThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + table.setAutoFlush(false); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + table.put(put); + } + table.flushCommits(); + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when put:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + + private static class IncrementThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public IncrementThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + Increment inc = new Increment(put.getRow()); + inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1); + table.increment(inc); + } + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when incr:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + /** + * UT for HBASE-18233, test for disordered batch mutation thread and + * increment won't lock each other + * @throws Exception + */ + @Test + public void testMultiThreadWithRowLocks() throws Exception { + //set a short timeout to get timeout exception when getting row lock fail + UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000); + UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000); + UTIL.getConfiguration().setInt("hbase.client.retries.number", 10); + + UTIL.createTable(TEST_TABLE2, BYTES_FAMILY); + List puts = new ArrayList<>(); + for(int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0)); + puts.add(put); + } + List reversePuts = new ArrayList<>(puts); + Collections.reverse(reversePuts); + int NUM_OF_THREAD = 12; + CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD); + CountDownLatch beginLatch = new CountDownLatch(1); + int threadNum = NUM_OF_THREAD / 4; + List multiThreads = new ArrayList<>(); + List incThreads = new ArrayList<>(); + for(int i = 0; i < threadNum; i ++) { + MultiThread thread = new MultiThread(reversePuts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + MultiThread thread = new MultiThread(puts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i ++) { + IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + IncrementThread thread = new IncrementThread(puts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + long timeBegin = System.currentTimeMillis(); + beginLatch.countDown(); + latch.await(); + LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin)); + for(MultiThread thread : multiThreads) { + Assert.assertTrue(thread.throwable == null); + } + for(IncrementThread thread : incThreads) { + Assert.assertTrue(thread.throwable == null); + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index a02f56a..e5f3e40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -626,6 +626,14 @@ public class TestAtomicOperation { } @Override + public RowLock getRowLock(final byte[] row, boolean readLock, boolean waitForLock) throws IOException { + if (testStep == TestStep.CHECKANDPUT_STARTED) { + latch.countDown(); + } + return new WrappedRowLock(super.getRowLock(row, readLock, waitForLock)); + } + + @Override public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown();