From d132839321231526d256eb671c66f12e6b23869f Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Mon, 1 Dec 2014 12:34:20 +0100 Subject: [PATCH] Create a new RpcClientInterface and let RpcClient implement it and ConnectionManager use it. --- .../hadoop/hbase/client/ConnectionManager.java | 9 +++--- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 5 +++- .../hadoop/hbase/ipc/RpcClientInterface.java | 34 ++++++++++++++++++++++ .../org/apache/hadoop/hbase/client/TestHCM.java | 3 +- 4 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientInterface.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index f822709..c1b0101 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientInterface; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -573,7 +574,7 @@ class ConnectionManager { private final TableConfiguration tableConfig; // Client rpc instance. - private RpcClient rpcClient; + private RpcClientInterface rpcClient; private MetaCache metaCache = new MetaCache(); @@ -789,8 +790,8 @@ class ConnectionManager { * @return Previous rpcClient */ @VisibleForTesting - RpcClient setRpcClient(final RpcClient rpcClient) { - RpcClient oldRpcClient = this.rpcClient; + RpcClientInterface setRpcClient(final RpcClientInterface rpcClient) { + RpcClientInterface oldRpcClient = this.rpcClient; this.rpcClient = rpcClient; return oldRpcClient; } @@ -799,7 +800,7 @@ class ConnectionManager { * For tests only. */ @VisibleForTesting - RpcClient getRpcClient() { + RpcClientInterface getRpcClient() { return rpcClient; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 4586e3e..cbf64c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -113,7 +113,7 @@ import com.google.protobuf.ServiceException; *

See HBaseServer */ @InterfaceAudience.Private -public class RpcClient { +public class RpcClient implements RpcClientInterface { public static final Log LOG = LogFactory.getLog(RpcClient.class); protected final PoolMap connections; @@ -1432,6 +1432,7 @@ public class RpcClient { /** Stop all threads related to this client. No further calls may be made * using this client. */ + @Override public void stop() { if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client"); if (!running.compareAndSet(true, false)) return; @@ -1578,6 +1579,7 @@ public class RpcClient { * process died) or no route to host: i.e. their next retries should be faster and with a * safe exception. */ + @Override public void cancelConnections(String hostname, int port) { synchronized (connections) { for (Connection connection : connections.values()) { @@ -1718,6 +1720,7 @@ public class RpcClient { * protobuf blocking stubs. * @return A blocking rpc channel that goes via this rpc client instance. */ + @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, int defaultOperationTimeout) { return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientInterface.java new file mode 100644 index 0000000..1f19edc --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientInterface.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.BlockingRpcChannel; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.User; + +/** + * Interface for RpcClient implementations so ConnectionManager can handle it. + */ +public interface RpcClientInterface { + /** + * Interrupt the connections to the given ip:port server. This should be called if the server + * is known as actually dead. This will not prevent current operation to be retried, and, + * depending on their own behavior, they may retry on the same server. This can be a feature, + * for example at startup. In any case, they're likely to get connection refused (if the + * process died) or no route to host: i.e. their next retries should be faster and with a + * safe exception. + */ + public void cancelConnections(String hostname, int port); + + /** + * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up + * protobuf blocking stubs. + * + * @return A blocking rpc channel that goes via this rpc client instance. + */ + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout); + + /** + * Stop all threads related to this client. No further calls may be made + * using this client. + */ + public void stop(); +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 3c99270..6c6260d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientInterface; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -390,7 +391,7 @@ public class TestHCM { ServerName sn = table.getRegionLocation(ROW).getServerName(); ConnectionManager.HConnectionImplementation conn = (ConnectionManager.HConnectionImplementation) table.getConnection(); - RpcClient rpcClient = conn.getRpcClient(); + RpcClientInterface rpcClient = conn.getRpcClient(); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); for (int i = 0; i < 5000; i++) { -- 1.9.3 (Apple Git-50)