Index: hbase-common/src/main/java/org/apache/hadoop/hbase/jmx/JMXServer.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/jmx/JMXServer.java (revision 0) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/jmx/JMXServer.java (working copy) @@ -0,0 +1,200 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.jmx; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.ServerSocket; +import java.net.Socket; +import java.rmi.registry.LocateRegistry; +import java.rmi.server.RMIServerSocketFactory; +import java.util.HashMap; +import java.util.Map; + +import javax.management.MBeanServer; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * Customer JMX server intended to give a option to user to control the port used in default java + * JMX server. This can be initiated by any process by passing the configuration object and process + * name. JMX server will be started if the configuration object passed contains + * 'processname'.rmi.registry.port. + */ +public class JMXServer { + + /** Configuration key for "rmi registry port" */ + public static final String RMI_REGISTRY_PORT = ".rmi.registry.port"; + + /** Configuration key for "rmi connector port" */ + public static final String RMI_CONNECTOR_PORT = ".rmi.connector.port"; + + /** Configuration key for "rmi registry ip" */ + public static final String RMI_REGISTRY_IP = ".rmi.registry.ip"; + + /** Configuration key for "rmi connector name" */ + public static final String RMI_REGISTRY_CONNECTORNAME = ".rmi.registry.connectorname"; + + /** Default rmi registry ip value */ + public static final String RMI_REGISTRY_IP_DEFAULT = "127.0.0.1"; + + private static final Log LOG = LogFactory.getLog(JMXServer.class); + private String jmxIp = null; + private MBeanServer mBeanServer = null; + private String connectorName = null; + private JMXConnectorServer jmxConnectorServer = null; + private Configuration conf; + private String process = ""; + JMXServiceURL url = null; + + public JMXServer(Configuration conf, String process) { + this.conf = conf; + this.process = process; + } + + /** + * Starts the JMX server + * @param conf configuration to be used to read the ports + * @param process process name + * @throws IOException + */ + public void start() throws IOException { + + // Start JMX server only if registry port configured. + String jmxPortString = conf.get(process + RMI_REGISTRY_PORT); + if (jmxPortString == null) { + LOG.warn(process + RMI_REGISTRY_PORT + " is not configured for the process " + process + + ", so not starting the JMX server"); + return; + } + + int jmxPort = validatePort(process + RMI_REGISTRY_PORT, jmxPortString); + + mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + String jmxConnectorServerPortString = conf.get(process + RMI_CONNECTOR_PORT); + int jmxConnectorServerPort; + if (jmxConnectorServerPortString == null) { + jmxConnectorServerPort = jmxPort; + } else { + jmxConnectorServerPort = validatePort(process + RMI_REGISTRY_PORT, jmxPortString); + } + connectorName = conf.get(process + RMI_REGISTRY_CONNECTORNAME, process); + jmxIp = conf.get(process + RMI_REGISTRY_IP, RMI_REGISTRY_IP_DEFAULT); + + RMIServerSocketFactory ssFactory = + new RMIServerSocketFactoryImpl(InetAddress.getByName(jmxIp)); + LocateRegistry.createRegistry(jmxPort, null, ssFactory); + Map env = new HashMap(2); + env.put(RMIConnectorServer.JNDI_REBIND_ATTRIBUTE, Boolean.TRUE.toString()); + env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssFactory); + + url = getJmxUrl(jmxPort, jmxConnectorServerPort, connectorName); + jmxConnectorServer = + JMXConnectorServerFactory.newJMXConnectorServer(url, env, mBeanServer); + jmxConnectorServer.start(); + LOG.info("JMX service started successfully URL : " + url); + } + + /** + * Stops the JMXServer + * @throws IOException + */ + public void stop() throws IOException { + if (null != jmxConnectorServer) { + jmxConnectorServer.stop(); + LOG.info("JMX Server stopped successfully"); + } + } + + private JMXServiceURL getJmxUrl(int p_port, int c_port, String p_serviceName) + throws MalformedURLException { + StringBuilder builder = new StringBuilder(); + builder.append("service:jmx:rmi://"); + + if (c_port != -1) { + builder.append(jmxIp); + builder.append(":"); + builder.append(c_port); + } + + builder.append("/jndi/rmi://"); + builder.append(jmxIp); + builder.append(":"); + builder.append(p_port); + builder.append("/"); + builder.append(p_serviceName); + return new JMXServiceURL(builder.toString()); + } + + public JMXServiceURL getUrl() { + return url; + } + + private int validatePort(String property, String portString) { + int port = -1; + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException ne) { + throw new IllegalArgumentException("Invalid value (" + portString + ") configured for " + + "\"" + property + "\"" + "Allowed range [1024-65535]."); + } + if (port < 1024 || port > 65535) { + throw new IllegalArgumentException("Invalid value (" + portString + ") configured for " + + "\"" + property + "\"" + "Allowed range [1024-65535]."); + } + return port; + } + + /** + * Custom server socket factory to control the binding of server address to one IP address. + */ + public static class RMIServerSocketFactoryImpl implements RMIServerSocketFactory { + private InetAddress bindAddress = null; + + public RMIServerSocketFactoryImpl() { + } + + public RMIServerSocketFactoryImpl(final InetAddress bindAddress) { + this.bindAddress = bindAddress; + } + + public Socket createSocket(String host, int port) throws IOException { + return new Socket(host, port); + } + + public ServerSocket createServerSocket(int port) throws IOException { + if (null != bindAddress) { + LOG.info("Creating socket " + bindAddress.getHostAddress() + ":" + port + + " for JMX Server"); + return new ServerSocket(port, 50, bindAddress); + } + + return new ServerSocket(port); + } + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (revision 1561480) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.jmx.JMXServer; import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private @@ -61,6 +62,8 @@ private final Class masterClass; + private JMXServer jmxServer = null; + public HMasterCommandLine(Class masterClass) { this.masterClass = masterClass; } @@ -186,6 +189,8 @@ LOG.info("Won't bring the Master up as a shutdown is requested"); return 1; } + jmxServer = new JMXServer(conf, "hbase.master"); + jmxServer.start(); master.start(); master.join(); if(master.isAborted()) @@ -193,6 +198,7 @@ } } catch (Throwable t) { LOG.error("Master exiting", t); + stopJMXServer(); return 1; } return 0; @@ -220,6 +226,8 @@ } catch (Throwable t) { LOG.error("Failed to stop master", t); return 1; + } finally { + stopJMXServer(); } return 0; } @@ -272,4 +280,16 @@ this.zkcluster = zkcluster; } } -} + + private void stopJMXServer() { + if (jmxServer != null) { + try { + jmxServer.stop(); + } catch (IOException e) { + LOG.warn("Exception during JMX server stop.", e); + } finally { + jmxServer = null; + } + } + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1561480) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -240,6 +240,7 @@ import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; import com.google.protobuf.ZeroCopyLiteralByteString; +import org.apache.hadoop.hbase.jmx.JMXServer; /** * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -520,6 +521,8 @@ private UserProvider userProvider; + private JMXServer jmxServer = null; + /** * Starts a HRegionServer at the default location * @@ -628,6 +631,7 @@ // that port is occupied. Adjust serverInfo if this is the case. this.rsInfo.setInfoPort(putUpWebUI()); this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); + jmxServer = new JMXServer(conf, "hbase.regionserver"); } /** @@ -906,6 +910,8 @@ String prefix = t instanceof YouAreDeadException? "": "Unhandled: "; abort(prefix + t.getMessage(), t); } + } finally { + stopJMXServer(); } // Run shutdown. if (mxBean != null) { @@ -1643,6 +1649,7 @@ sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start(); + if (jmxServer != null) jmxServer.start(); } /** @@ -4573,4 +4580,16 @@ respBuilder.setResponse(openInfoList.size()); return respBuilder.build(); } + + private void stopJMXServer() { + if (jmxServer != null) { + try { + jmxServer.stop(); + } catch (IOException e) { + LOG.warn("Exception during JMX server stop.", e); + } finally { + jmxServer = null; + } + } + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/jmx/TestJMXServer.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/jmx/TestJMXServer.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/jmx/TestJMXServer.java (working copy) @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.jmx; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.Inet4Address; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestJMXServer { + + private Configuration conf; + private JMXServer server; + + @Before + public void setup() { + conf = new Configuration(); + conf.set("jmxtest.rmi.registry.port", "2356"); + conf.set("jmxtest.rmi.registry.port", "2356"); + } + + @After + public void tearDown() throws IOException { + if (server != null) { + server.stop(); + } + } + + public static interface TestJMXMBean { + String getName(); + } + + public static class TestJMX implements TestJMXMBean { + + public String getName() { + return "Hbase"; + } + } + + @Test + public void testJMXStart() throws Exception { + conf.set("jmxtest.rmi.registry.ip", Inet4Address.getLocalHost().getHostAddress().toString()); + System.out.println("TestJMXServer.testJMXStart()"); + server = new JMXServer(conf, "jmxtest"); + server.start(); + + // Create and register test mbean + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName name = new ObjectName("Hadoop:service=TestJMX,name=TestJMXInfo"); + TestJMXMBean mbean = new TestJMX(); + mbs.registerMBean(mbean, name); + + // Get the mbean and + JMXServiceURL jmxUrl = server.getUrl(); + JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null); + MBeanServerConnection mbeansServerConnection = jmxConnector.getMBeanServerConnection(); + + ObjectName objectName = new ObjectName("Hadoop:service=TestJMX,name=TestJMXInfo"); + + MBeanInfo mBeanInfo = mbeansServerConnection.getMBeanInfo(objectName); + MBeanAttributeInfo[] attributes = mBeanInfo.getAttributes(); + + Object attribute = mbeansServerConnection.getAttribute(objectName, attributes[0].getName()); + + assertTrue("Mbean not registered with JMX Server", + attribute.toString().equalsIgnoreCase("Hbase")); + } + + @Test + public void testPortValidation() throws Exception { + conf.set("jmxtest.rmi.registry.port", "1023"); + server = new JMXServer(conf, "jmxtest"); + try { + server.start(); + fail("Port not validated"); + } catch (IllegalArgumentException e) { + // success + } + } + +} \ No newline at end of file