From 3767d8815576a478b6eb8710c86ec0a4d31108b3 Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Thu, 5 Feb 2015 16:28:09 -0800 Subject: [PATCH] HBASE-12956 Binding to 0.0.0.0 is broken after HBASE-10569 --- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 30 +++++++++++++++++----- .../hadoop/hbase/regionserver/RSRpcServices.java | 13 +++++----- .../hadoop/hbase/TestHBaseTestingUtility.java | 12 +++++++++ 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f04ec1e..b8fd908 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -195,7 +195,7 @@ public class RpcServer implements RpcServerInterface { static final ThreadLocal MONITORED_RPC = new ThreadLocal(); - protected final InetSocketAddress isa; + protected final InetSocketAddress bindAddress; protected int port; // port we listen on private int readThreads; // number of read threads protected int maxIdleTime; // the maximum idle time after @@ -513,15 +513,16 @@ public class RpcServer implements RpcServerInterface { private ExecutorService readPool; - public Listener(final String name) throws IOException { + public Listener(final String name, final InetSocketAddress isa, + final InetSocketAddress bindAddress) throws IOException { super(name); backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); - // Bind the server socket to the local host and port - bind(acceptChannel.socket(), isa, backlogLength); + // Bind the server socket to the binding addrees (can be different from the default interface) + bind(acceptChannel.socket(), isa.equals(bindAddress) ? isa : bindAddress, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); @@ -1875,10 +1876,27 @@ public class RpcServer implements RpcServerInterface { final List services, final InetSocketAddress isa, Configuration conf, RpcScheduler scheduler) + throws IOException { + this(server, name, services, isa, isa, conf, scheduler); + } + /** + * Constructs a server listening on the named port and address. + * @param server hosting instance of {@link Server}. We will do authentications if an + * instance else pass null for no authentication check. + * @param name Used keying this rpc servers' metrics and for naming the Listener thread. + * @param services A list of services. + * @param isa Hostname/port pair + * @param bindAddress Where to listen + * @throws IOException + */ + public RpcServer(final Server server, final String name, + final List services, + final InetSocketAddress isa, final InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException { this.server = server; this.services = services; - this.isa = isa; + this.bindAddress = bindAddress; this.conf = conf; this.socketSendBufferSize = 0; this.maxQueueSize = @@ -1893,7 +1911,7 @@ public class RpcServer implements RpcServerInterface { this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); // Start the listener here and let it bind to the port - listener = new Listener(name); + listener = new Listener(name, isa, bindAddress); this.port = listener.getAddress().getPort(); this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 51a01b0..a5656b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -777,6 +777,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, HConstants.DEFAULT_REGIONSERVER_PORT); // Creation of a HSA will force a resolve. InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); + InetSocketAddress bindAddress = new InetSocketAddress(rs.conf.get("hbase.regionserver.ipc.address", hostname), port); if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initialIsa); } @@ -785,7 +786,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Set how many times to retry talking to another server over HConnection. ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); rpcServer = new RpcServer(rs, name, getServices(), - initialIsa, // BindAddress is IP we got for this server. + initialIsa, + bindAddress, // BindAddress is IP we got for this server. rs.conf, rpcSchedulerFactory.create(rs.conf, this, rs)); @@ -796,17 +798,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - // Set our address. - isa = rpcServer.getListenerAddress(); + // Set our address, however we need the final port that was given to rpcServer + isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); rpcServer.setErrorHandler(this); rs.setName(name); } public static String getHostname(Configuration conf) throws UnknownHostException { - return conf.get("hbase.regionserver.ipc.address", - Strings.domainNamePointerToHostName(DNS.getDefaultHost( + return Strings.domainNamePointerToHostName(DNS.getDefaultHost( conf.get("hbase.regionserver.dns.interface", "default"), - conf.get("hbase.regionserver.dns.nameserver", "default")))); + conf.get("hbase.regionserver.dns.nameserver", "default"))); } RegionScanner getScanner(long scannerId) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java index ecd7480..856cc9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java @@ -124,6 +124,18 @@ public class TestHBaseTestingUtility { } } + @Test + public void testMiniClusterBindToWildcard() throws Exception { + HBaseTestingUtility hbt = new HBaseTestingUtility(); + hbt.getConfiguration().set("hbase.regionserver.ipc.address", "0.0.0.0"); + MiniHBaseCluster cluster = hbt.startMiniCluster(); + try { + assertEquals(1, cluster.getLiveRegionServerThreads().size()); + } finally { + hbt.shutdownMiniCluster(); + } + } + /** * Test that we can start and stop multiple time a cluster * with the same HBaseTestingUtility. -- 2.2.2