diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4d35b51..c372faa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7044,8 +7044,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - boolean locked; - List acquiredRowLocks; + boolean locked = false; + List acquiredRowLocks = null; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); // This is assigned by mvcc either explicity in the below or in the guts of the WAL append @@ -7053,19 +7053,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry writeEntry = null; MemstoreSize memstoreSize = new MemstoreSize(); try { - // STEP 2. Acquire the row lock(s) - acquiredRowLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { - // Attempt to lock all involved rows, throw if any lock times out - // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); - } - // STEP 3. Region lock - lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); - locked = true; boolean success = false; - long now = EnvironmentEdgeManager.currentTime(); try { + // STEP 2. Acquire the row lock(s) + acquiredRowLocks = new ArrayList<>(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes + acquiredRowLocks.add(getRowLockInternal(row, false)); + } + // STEP 3. Region lock + lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); + locked = true; + long now = EnvironmentEdgeManager.currentTime(); // STEP 4. Let the processor scan the rows, generate mutations and add waledits doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 82fbe77..cbc97a2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -23,13 +23,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -43,10 +47,17 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -54,6 +65,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -494,7 +506,7 @@ public class TestFromClientSide3 { } @Test(timeout = 60000) - public void testPutWithPreBatchMutate ()throws Exception { + public void testPutWithPreBatchMutate() throws Exception { TableName tableName = TableName.valueOf("testPutWithPreBatchMutate"); testPreBatchMutate(tableName, () -> { try { @@ -509,7 +521,7 @@ public class TestFromClientSide3 { } @Test(timeout = 60000) - public void testRowMutationsWithPreBatchMutate ()throws Exception { + public void testRowMutationsWithPreBatchMutate() throws Exception { TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate"); testPreBatchMutate(tableName, () -> { try { @@ -525,7 +537,7 @@ public class TestFromClientSide3 { }); } - private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception { + private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception { HTableDescriptor desc = new HTableDescriptor(tableName); desc.addCoprocessor(WatiingForScanObserver.class.getName()); desc.addFamily(new HColumnDescriptor(FAMILY)); @@ -555,22 +567,118 @@ public class TestFromClientSide3 { TEST_UTIL.deleteTable(tableName); } - private static T find(final TableName tableName, - Class clz) throws IOException, InterruptedException { + @Test(timeout = 30000) + public void testMultiRowMutations() throws Exception, Throwable { + TableName tableName = TableName.valueOf("testMultiRowMutations"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); + desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName()); + desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getAdmin().createTable(desc); + // new a connection for lower retry number. + Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); + copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection con = ConnectionFactory.createConnection(copy)) { + byte[] row = Bytes.toBytes("ROW-0"); + byte[] rowLocked= Bytes.toBytes("ROW-1"); + byte[] value0 = Bytes.toBytes("VALUE-0"); + byte[] value1 = Bytes.toBytes("VALUE-1"); + byte[] value2 = Bytes.toBytes("VALUE-2"); + assertNoLocks(tableName); + ExecutorService putService = Executors.newSingleThreadExecutor(); + putService.execute(() -> { + try (Table table = con.getTable(tableName)) { + Put put0 = new Put(rowLocked); + put0.addColumn(FAMILY, QUALIFIER, value0); + // the put will be blocked by WatiingForMultiMutationsObserver. + table.put(put0); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + ExecutorService cpService = Executors.newSingleThreadExecutor(); + cpService.execute(() -> { + Put put1 = new Put(row); + Put put2 = new Put(rowLocked); + put1.addColumn(FAMILY, QUALIFIER, value1); + put2.addColumn(FAMILY, QUALIFIER, value2); + try (Table table = con.getTable(tableName)) { + MultiRowMutationProtos.MutateRowsRequest request + = MultiRowMutationProtos.MutateRowsRequest.newBuilder() + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1)) + .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) + .build(); + table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, + ROW, ROW, + (MultiRowMutationProtos.MultiRowMutationService exe) -> { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback + rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); + exe.mutateRows(controller, request, rpcCallback); + return rpcCallback.get(); + }); + fail("This cp should fail because the target lock is blocked by previous put"); + } catch (Throwable ex) { + } + }); + cpService.shutdown(); + cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class); + observer.latch.countDown(); + putService.shutdown(); + putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + try (Table table = con.getTable(tableName)) { + Get g0 = new Get(row); + Get g1 = new Get(rowLocked); + Result r0 = table.get(g0); + Result r1 = table.get(g1); + assertTrue(r0.isEmpty()); + assertFalse(r1.isEmpty()); + assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); + } + assertNoLocks(tableName); + } + } + + private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException { + HRegion region = (HRegion) find(tableName); + assertEquals(0, region.getLockedRows().size()); + } + private static Region find(final TableName tableName) + throws IOException, InterruptedException { HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); List regions = rs.getOnlineRegions(tableName); assertEquals(1, regions.size()); - Region region = regions.get(0); + return regions.get(0); + } + + private static T find(final TableName tableName, + Class clz) throws IOException, InterruptedException { + Region region = find(tableName); Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); assertTrue("The cp instance should be " + clz.getName() + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); return clz.cast(cp); } - public static class WatiingForScanObserver extends BaseRegionObserver { + public static class WatiingForMultiMutationsObserver extends BaseRegionObserver { + final CountDownLatch latch = new CountDownLatch(1); + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + try { + latch.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + } + public static class WatiingForScanObserver extends BaseRegionObserver { private final CountDownLatch latch = new CountDownLatch(1); - @Override public void postBatchMutate(final ObserverContext c, final MiniBatchOperationInProgress miniBatchOp) throws IOException {