Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (revision 1495925) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (working copy) @@ -30,6 +30,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -125,6 +126,7 @@ protected final SocketFactory socketFactory; // how to create sockets protected String clusterId; + protected final SocketAddress localAddr; private final boolean fallbackAllowed; @@ -551,6 +553,9 @@ this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); + if (localAddr != null) { + this.socket.bind(localAddr); + } // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); @@ -1164,13 +1169,25 @@ super(msg); } } + + /** + * Construct an IPC cluster client whose values are of the {@link Message} class. + * @param conf configuration + * @param clusterId + * @param factory socket factory + */ + RpcClient(Configuration conf, String clusterId, SocketFactory factory) { + this(conf, clusterId, factory, null); + } /** * Construct an IPC cluster client whose values are of the {@link Message} class. * @param conf configuration + * @param clusterId * @param factory socket factory + * @param localAddr client socket bind address */ - RpcClient(Configuration conf, String clusterId, SocketFactory factory) { + RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.failureSleep = conf.getInt("hbase.client.pause", 1000); @@ -1187,6 +1204,7 @@ this.failedServers = new FailedServers(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.localAddr = localAddr; if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + @@ -1194,7 +1212,8 @@ ", maxIdleTime=" + this.maxIdleTime + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" + this.fallbackAllowed + - ", ping interval=" + this.pingInterval + "ms."); + ", ping interval=" + this.pingInterval + "ms" + + ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); } } @@ -1204,8 +1223,18 @@ * @param clusterId */ public RpcClient(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf)); + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); } + + /** + * Construct an IPC client for the cluster clusterId with the default SocketFactory + * @param conf configuration + * @param clusterId + * @param localAddr client socket bind address. + */ + public RpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); + } /** * Encapsulate the ugly casting and RuntimeException conversion in private method. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1495925) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -765,7 +765,8 @@ movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); // Setup RPC client for master communication - rpcClient = new RpcClient(conf, clusterId); + rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress( + this.isa.getAddress(), 0)); } /**