diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index fad4f45..678f0d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -76,6 +78,9 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; @@ -798,11 +803,27 @@ public class TestFromClientSide3 { assertEquals(0, readLockCount); } + public static class RecordingEndpoint extends MultiRowMutationEndpoint { + Exception ex = null; + public RecordingEndpoint() { + ex = null; + } + @Override + public void mutateRows(RpcController controller, MutateRowsRequest request, + RpcCallback done) { + super.mutateRows(controller, request, done); + if (controller instanceof ServerRpcController) { + ex = ((ServerRpcController)controller).getFailedOn(); + LOG.debug("mutateRows encounters " + ex); + } + } + } + @Test public void testMultiRowMutations() throws Exception, Throwable { final TableName tableName = TableName.valueOf(name.getMethodName()); HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); + desc.addCoprocessor(RecordingEndpoint.class.getName()); desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); desc.addFamily(new HColumnDescriptor(FAMILY)); @@ -829,8 +850,12 @@ public class TestFromClientSide3 { } }); ExecutorService cpService = Executors.newSingleThreadExecutor(); + WaitingForMultiMutationsObserver observer = find(tableName, + WaitingForMultiMutationsObserver.class); + HRegion region = find(tableName); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(RecordingEndpoint.class.getName()); + final RecordingEndpoint endpoint = RecordingEndpoint.class.cast(cp); cpService.execute(() -> { - boolean threw; Put put1 = new Put(row); Put put2 = new Put(rowLocked); put1.addColumn(FAMILY, QUALIFIER, value1); @@ -844,26 +869,21 @@ public class TestFromClientSide3 { org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) .build(); table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, - ROW, ROW, - (MultiRowMutationProtos.MultiRowMutationService exe) -> { - ServerRpcController controller = new ServerRpcController(); - CoprocessorRpcUtils.BlockingRpcCallback + ROW, ROW, + (MultiRowMutationProtos.MultiRowMutationService exe) -> { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); - exe.mutateRows(controller, request, rpcCallback); - return rpcCallback.get(); - }); - threw = false; + exe.mutateRows(controller, request, rpcCallback); + MultiRowMutationProtos.MutateRowsResponse resp = rpcCallback.get(); + return resp; + }); } catch (Throwable ex) { - threw = true; - } - if (!threw) { - // Can't call fail() earlier because the catch would eat it. - fail("This cp should fail because the target lock is blocked by previous put"); + LOG.debug("encountered " + ex); } }); cpService.shutdown(); cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); observer.latch.countDown(); putService.shutdown(); putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); @@ -877,6 +897,9 @@ public class TestFromClientSide3 { assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); } assertNoLocks(tableName); + if (endpoint.ex == null) { + fail("This cp should fail because the target lock is blocked by previous put"); + } } }