From d7a199cb4ab1f226d899eb62d02222ef7aae4f40 Mon Sep 17 00:00:00 2001 From: chenheng Date: Sat, 10 Sep 2016 16:03:48 +0800 Subject: [PATCH] HBASE-16607 Make NoncedRegionServerCallable extends CancellableRegionServerCallable --- .../org/apache/hadoop/hbase/client/HTable.java | 52 +++++++-------- .../hbase/client/NoncedRegionServerCallable.java | 74 ++-------------------- .../org/apache/hadoop/hbase/master/HMaster.java | 3 +- 3 files changed, 32 insertions(+), 97 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e98424c..0d1b156 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -630,17 +630,17 @@ public class HTable implements Table { public Result append(final Append append) throws IOException { checkHasFamilies(append); NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), append.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } - }; + new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, + getName(), append.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } + }; return rpcCallerFactory. newCaller(this.writeRpcTimeout). callWithRetries(callable, this.operationTimeout); } @@ -652,16 +652,16 @@ public class HTable implements Table { public Result increment(final Increment increment) throws IOException { checkHasFamilies(increment); NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - // Should this check for null like append does? - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } + new NoncedRegionServerCallable(this.connection, + this.rpcControllerFactory, getName(), increment.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + // Should this check for null like append does? + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } }; return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); @@ -701,12 +701,12 @@ public class HTable implements Table { new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), row) { @Override - protected Long call(HBaseRpcController controller) throws Exception { + protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 7c98861..7c01e21 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -18,15 +18,9 @@ package org.apache.hadoop.hbase.client; -import java.io.IOException; - -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; /** * Implementations make an rpc call against a RegionService via a protobuf Service. @@ -44,9 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; * @param the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class NoncedRegionServerCallable extends AbstractRegionServerCallable { - private ClientService.BlockingInterface stub; - private final HBaseRpcController rpcController; +public abstract class NoncedRegionServerCallable extends CancellableRegionServerCallable { private final long nonce; /** @@ -54,69 +46,13 @@ public abstract class NoncedRegionServerCallable extends AbstractRegionServer * @param tableName Table name to which row belongs. * @param row The row we want in tableName. */ - public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - this(connection, rpcControllerFactory.newController(), tableName, row); - } - - public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController, - TableName tableName, byte [] row) { - super(connection, tableName, row); - this.rpcController = rpcController; + public NoncedRegionServerCallable(Connection connection, + RpcControllerFactory rpcControllerFactory, + TableName tableName, byte [] row) { + super(connection, tableName, row, rpcControllerFactory); this.nonce = getConnection().getNonceGenerator().newNonce(); } - void setClientByServiceName(ServerName service) throws IOException { - this.setStub(getConnection().getClient(service)); - } - - /** - * @return Client Rpc protobuf communication stub - */ - protected ClientService.BlockingInterface getStub() { - return this.stub; - } - - /** - * Set the client protobuf communication stub - * @param stub to set - */ - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; - } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - public T call(int callTimeout) throws IOException { - if (this.rpcController != null) { - this.rpcController.reset(); - this.rpcController.setPriority(tableName); - this.rpcController.setCallTimeout(callTimeout); - } - try { - return call(this.rpcController); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract T call(HBaseRpcController rpcController) throws Exception; - - public HBaseRpcController getRpcController() { - return this.rpcController; - } - long getNonceGroup() { return getConnection().getNonceGenerator().getNonceGroup(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0e07ae0..c70444d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -195,8 +195,7 @@ import org.mortbay.jetty.servlet.Context; @SuppressWarnings("deprecation") public class HMaster extends HRegionServer implements MasterServices { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); - - + /** * Protection against zombie master. Started once Master accepts active responsibility and * starts taking over responsibilities. Allows a finite time window before giving up ownership. -- 2.9.3