From f637e85fdfab4a67e765087372a5214efbb36c77 Mon Sep 17 00:00:00 2001 From: chenheng Date: Sat, 10 Sep 2016 16:03:48 +0800 Subject: [PATCH] HBASE-16607 Make NoncedRegionServerCallable extends CancellableRegionServerCallable --- .../org/apache/hadoop/hbase/client/HTable.java | 52 +++++++++--------- .../hbase/client/NoncedRegionServerCallable.java | 63 +--------------------- 2 files changed, 28 insertions(+), 87 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e98424c..0d1b156 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -630,17 +630,17 @@ public class HTable implements Table { public Result append(final Append append) throws IOException { checkHasFamilies(append); NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), append.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } - }; + new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, + getName(), append.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } + }; return rpcCallerFactory. newCaller(this.writeRpcTimeout). callWithRetries(callable, this.operationTimeout); } @@ -652,16 +652,16 @@ public class HTable implements Table { public Result increment(final Increment increment) throws IOException { checkHasFamilies(increment); NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - // Should this check for null like append does? - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } + new NoncedRegionServerCallable(this.connection, + this.rpcControllerFactory, getName(), increment.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + // Should this check for null like append does? + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } }; return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); @@ -701,12 +701,12 @@ public class HTable implements Table { new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), row) { @Override - protected Long call(HBaseRpcController controller) throws Exception { + protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 7c98861..2d27630 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -44,9 +44,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; * @param the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class NoncedRegionServerCallable extends AbstractRegionServerCallable { - private ClientService.BlockingInterface stub; - private final HBaseRpcController rpcController; +public abstract class NoncedRegionServerCallable extends CancellableRegionServerCallable { private final long nonce; /** @@ -56,67 +54,10 @@ public abstract class NoncedRegionServerCallable extends AbstractRegionServer */ public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, TableName tableName, byte [] row) { - this(connection, rpcControllerFactory.newController(), tableName, row); - } - - public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController, - TableName tableName, byte [] row) { - super(connection, tableName, row); - this.rpcController = rpcController; + super(connection, tableName, row, rpcControllerFactory); this.nonce = getConnection().getNonceGenerator().newNonce(); } - void setClientByServiceName(ServerName service) throws IOException { - this.setStub(getConnection().getClient(service)); - } - - /** - * @return Client Rpc protobuf communication stub - */ - protected ClientService.BlockingInterface getStub() { - return this.stub; - } - - /** - * Set the client protobuf communication stub - * @param stub to set - */ - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; - } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - public T call(int callTimeout) throws IOException { - if (this.rpcController != null) { - this.rpcController.reset(); - this.rpcController.setPriority(tableName); - this.rpcController.setCallTimeout(callTimeout); - } - try { - return call(this.rpcController); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract T call(HBaseRpcController rpcController) throws Exception; - - public HBaseRpcController getRpcController() { - return this.rpcController; - } - long getNonceGroup() { return getConnection().getNonceGenerator().getNonceGroup(); } -- 2.9.3