diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d0d457f..6961b6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3017,9 +3017,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If we haven't got any rows in our batch, we should block to // get the next one. + boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLock(mutation.getRow(), true); + rowLock = getRowLock(mutation.getRow(), true, shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -5117,9 +5118,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * started (the calling thread has already acquired the region-close-guard lock). * @param row The row actions will be performed against * @param readLock is the lock reader or writer. True indicates that a non-exlcusive - * lock is requested + * lock is requested + * */ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { + return getRowLock(row, readLock, true); + } + + /** + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + * @param waitForLock whether should wait for this lock + */ + public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); // create an object to use a a key in the row lock map @@ -5159,14 +5176,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = rowLockContext.newWriteLock(); } } - if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + boolean lockAvailable = false; + if(waitForLock) { + lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS); + } else { + lockAvailable = result.getLock().tryLock(); + } + if(!lockAvailable) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; // Clean up the counts just in case this was the thing keeping the context alive. rowLockContext.cleanUp(); - throw new IOException("Timed out waiting for lock for row: " + rowKey); + if(waitForLock) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } else { + return result; + } } rowLockContext.setThreadName(Thread.currentThread().getName()); return result; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 81fb0b9..dcf151c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -270,14 +270,14 @@ public interface Region extends ConfigurationObserver { /** * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. + * @param readlock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + RowLock getRowLock(byte[] row, boolean readlock) throws IOException; /** * If the given list of row locks is not null, releases all locks. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 935f6e8..264e3cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -63,6 +64,7 @@ public class TestMultiParallel { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); @@ -728,4 +730,127 @@ public class TestMultiParallel { validateEmpty(result); } } + + private static class MultiThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public MultiThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + try { + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + table.setAutoFlush(false); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + table.put(put); + } + table.flushCommits(); + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when put:", t); + } finally { + endLatch.countDown(); + } + } + } + + + private static class IncrementThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public IncrementThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + try { + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + Increment inc = new Increment(put.getRow()); + inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1); + table.increment(inc); + } + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when incr:", t); + } finally { + endLatch.countDown(); + } + } + } + + /** + * UT for HBASE-18233, test for disordered batch mutation thread and + * increment won't lock each other + * @throws Exception + */ + @Test + public void testMultiThreadWithRowLocks() throws Exception { + //set a short timeout to get timeout exception when getting row lock fail + UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000); + UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000); + UTIL.getConfiguration().setInt("hbase.client.retries.number", 10); + + UTIL.createTable(TEST_TABLE2, BYTES_FAMILY); + List puts = new ArrayList<>(); + for(int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0)); + puts.add(put); + } + List reversePuts = new ArrayList<>(puts); + Collections.reverse(reversePuts); + int NUM_OF_THREAD = 12; + CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD); + CountDownLatch beginLatch = new CountDownLatch(1); + int threadNum = NUM_OF_THREAD / 4; + List multiThreads = new ArrayList<>(); + List incThreads = new ArrayList<>(); + for(int i = 0; i < threadNum; i ++) { + MultiThread thread = new MultiThread(reversePuts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + MultiThread thread = new MultiThread(puts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i ++) { + IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + IncrementThread thread = new IncrementThread(puts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + long timeBegin = System.currentTimeMillis(); + beginLatch.countDown(); + latch.await(); + LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin)); + for(MultiThread thread : multiThreads) { + Assert.assertTrue(thread.throwable == null); + } + for(IncrementThread thread : incThreads) { + Assert.assertTrue(thread.throwable == null); + } + + } }