diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c7d46a3..15e8ed3 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -21,7 +21,7 @@ import org.apache.log4j.Logger import kafka.log.LogManager import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean -import kafka.utils.{Utils, SystemTime, KafkaScheduler} +import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler} import kafka.network.{SocketServerStats, SocketServer} import java.io.File @@ -70,6 +70,7 @@ class KafkaServer(val config: KafkaConfig) { handlers.handlerFor) Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName)) socketServer.startup + Mx4jLoader.maybeLoad /** * Registers this broker in ZK. After this, consumers can connect to broker. * So this should happen after socket server start. diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala new file mode 100644 index 0000000..20c2ad0 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils + + +import java.lang.management.ManagementFactory +import javax.management.MBeanServer +import javax.management.ObjectName +import org.apache.log4j.Logger + +/** + * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j. + * + * The default port is 8081. To override that provide e.g. -Dmx4jport=8082 + * The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1 + * + * This is a scale port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068 + * */ +object Mx4jLoader { + private val logger = Logger.getLogger(getClass()) + + def maybeLoad(): Boolean = { + val address = System.getProperty("mx4jaddress", "0.0.0.0") + val port = Utils.getInt(System.getProperties(), "mx4jport", 8082) + try { + logger.debug("Will try to load MX4j now, if it's in the classpath"); + + val mbs = ManagementFactory.getPlatformMBeanServer() + val processorName = new ObjectName("Server:name=XSLTProcessor") + + val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor") + val httpAdaptor = httpAdaptorClass.newInstance() + httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef]) + httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef]) + + val httpName = new ObjectName("system:name=http") + mbs.registerMBean(httpAdaptor, httpName) + + val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor") + val xsltProcessor = xsltProcessorClass.newInstance() + httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef]) + mbs.registerMBean(xsltProcessor, processorName) + httpAdaptorClass.getMethod("start").invoke(httpAdaptor) + logger.info("mx4j successfuly loaded") + true + } + catch { + case e: ClassNotFoundException => { + logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); + } + case e => { + logger.warn("Could not start register mbean in JMX", e); + } + } + false + } +}