diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index fad4f45..6835683 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -23,6 +23,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -55,6 +58,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -798,11 +803,27 @@ public class TestFromClientSide3 { assertEquals(0, readLockCount); } + public static class RecordingEndpoint extends MultiRowMutationEndpoint { + Exception ex = null; + public RecordingEndpoint() { + ex = null; + } + @Override + public void mutateRows(RpcController controller, MutateRowsRequest request, + RpcCallback done) { + super.mutateRows(controller, request, done); + if (controller instanceof ServerRpcController) { + ex = ((ServerRpcController)controller).getFailedOn(); + LOG.debug("mutateRows encounters " + ex); + } + } + } + @Test public void testMultiRowMutations() throws Exception, Throwable { final TableName tableName = TableName.valueOf(name.getMethodName()); HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addCoprocessor(MultiRowMutationEndpoint.class.getName()); + desc.addCoprocessor(RecordingEndpoint.class.getName()); desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName()); desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000)); desc.addFamily(new HColumnDescriptor(FAMILY)); @@ -829,8 +850,13 @@ public class TestFromClientSide3 { } }); ExecutorService cpService = Executors.newSingleThreadExecutor(); + WaitingForMultiMutationsObserver observer = find(tableName, + WaitingForMultiMutationsObserver.class); + HRegion region = find(tableName); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor( + RecordingEndpoint.class.getName()); + final RecordingEndpoint endpoint = RecordingEndpoint.class.cast(cp); cpService.execute(() -> { - boolean threw; Put put1 = new Put(row); Put put2 = new Put(rowLocked); put1.addColumn(FAMILY, QUALIFIER, value1); @@ -844,26 +870,21 @@ public class TestFromClientSide3 { org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2)) .build(); table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, - ROW, ROW, + ROW, ROW, (MultiRowMutationProtos.MultiRowMutationService exe) -> { ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback - rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); + rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); exe.mutateRows(controller, request, rpcCallback); - return rpcCallback.get(); + MultiRowMutationProtos.MutateRowsResponse resp = rpcCallback.get(); + return resp; }); - threw = false; } catch (Throwable ex) { - threw = true; - } - if (!threw) { - // Can't call fail() earlier because the catch would eat it. - fail("This cp should fail because the target lock is blocked by previous put"); + LOG.debug("encountered " + ex); } }); cpService.shutdown(); cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class); observer.latch.countDown(); putService.shutdown(); putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); @@ -877,6 +898,9 @@ public class TestFromClientSide3 { assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); } assertNoLocks(tableName); + if (endpoint.ex == null) { + fail("This cp should fail because the target lock is blocked by previous put"); + } } }