diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0d79ca9..fc27ea9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -23,16 +23,8 @@ import java.io.InterruptedIOException; import java.net.BindException; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -735,6 +727,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { + /** HBASE-17924 + * mutationActionMap is a map to map the relation between mutations and actions + * since mutation array may have been reoredered.In order to return the right + * result or exception to the corresponding actions, We need to know which action + * is the mutation belong to. We can't sort ClientProtos.Action array, since they + * are bonded to cellscanners. + */ + Map mutationActionMap = new HashMap(); int i = 0; for (ClientProtos.Action action: mutations) { MutationProto m = action.getMutation(); @@ -746,6 +746,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } + mutationActionMap.put(mutation, action); mArray[i++] = mutation; quota.addMutation(mutation); } @@ -753,11 +754,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - + // HBASE-17924 + // sort to improve lock efficiency + Arrays.sort(mArray); OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, HConstants.NO_NONCE); for (i = 0; i < codes.length; i++) { - int index = mutations.get(i).getIndex(); + Mutation currentMutation = mArray[i]; + ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); + int index = currentAction.getIndex(); Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: @@ -1760,6 +1765,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, walEntries.add(walEntry); } if(edits!=null && !edits.isEmpty()) { + // HBASE-17924 + // sort to improve lock efficiency + Collections.sort(edits); long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 28bbe27..d0098c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -2221,7 +2221,7 @@ public class WALSplitter { } /** A struct used by getMutationsFromWALEntry */ - public static class MutationReplay { + public static class MutationReplay implements Comparable { public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { this.type = type; this.mutation = mutation; @@ -2237,6 +2237,25 @@ public class WALSplitter { public final Mutation mutation; public final long nonceGroup; public final long nonce; + + @Override + public int compareTo(final MutationReplay d) { + return this.mutation.compareTo(d.mutation); + } + + @Override + public boolean equals(Object obj) { + if(!(obj instanceof MutationReplay)) { + return false; + } else { + return this.compareTo((MutationReplay)obj) == 0; + } + } + + @Override + public int hashCode() { + return this.mutation.hashCode(); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 935f6e8..13f7cb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -63,6 +64,7 @@ public class TestMultiParallel { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); @@ -728,4 +730,122 @@ public class TestMultiParallel { validateEmpty(result); } } + + private static class MultiThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public MultiThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + try { + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + table.setAutoFlush(false); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + table.put(put); + } + table.flushCommits(); + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when put:", t); + } finally { + endLatch.countDown(); + } + } + } + + + private static class IncrementThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List puts; + public IncrementThread(List puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + try { + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + Increment inc = new Increment(put.getRow()); + inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1); + table.increment(inc); + } + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when incr:", t); + } finally { + endLatch.countDown(); + } + } + } + + @Test + public void testMultiThreadWithRowLocks() throws Exception { + //set a short timeout to get timeout exception when getting row lock fail + UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000); + UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000); + UTIL.getConfiguration().setInt("hbase.client.retries.number", 10); + + UTIL.createTable(TEST_TABLE2, BYTES_FAMILY); + List puts = new ArrayList<>(); + for(int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0)); + puts.add(put); + } + List reversePuts = new ArrayList<>(puts); + Collections.reverse(reversePuts); + int NUM_OF_THREAD = 12; + CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD); + CountDownLatch beginLatch = new CountDownLatch(1); + int threadNum = NUM_OF_THREAD / 4; + List multiThreads = new ArrayList<>(); + List incThreads = new ArrayList<>(); + for(int i = 0; i < threadNum; i ++) { + MultiThread thread = new MultiThread(reversePuts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + MultiThread thread = new MultiThread(puts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i ++) { + IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + IncrementThread thread = new IncrementThread(puts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + long timeBegin = System.currentTimeMillis(); + beginLatch.countDown(); + latch.await(); + LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin)); + for(MultiThread thread : multiThreads) { + Assert.assertTrue(thread.throwable == null); + } + for(IncrementThread thread : incThreads) { + Assert.assertTrue(thread.throwable == null); + } + + } }