diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1f62272..da022ef 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -22,7 +22,7 @@ import org.apache.log4j.Logger import kafka.log.LogManager import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean -import kafka.utils.{Utils, SystemTime, KafkaScheduler} +import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler} import kafka.network.{SocketServerStats, SocketServer} import java.io.File @@ -72,6 +72,7 @@ class KafkaServer(val config: KafkaConfig) { config.maxSocketRequestSize) Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName)) socketServer.startup + Mx4jLoader.maybeLoad /** * Registers this broker in ZK. After this, consumers can connect to broker. * So this should happen after socket server start. diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala new file mode 100644 index 0000000..fa0a4d2 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + + +import java.lang.management.ManagementFactory +import javax.management.ObjectName +import org.apache.log4j.Logger + +/** + * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j. + * + * The default port is 8082. To override that provide e.g. -Dmx4jport=8083 + * The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1 + * If you wish to force mx4j to be disabled despite it's presence on the classpath, use + * -Dmx4jdisable=true + * + * This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068 + * */ +object Mx4jLoader { + private val logger = Logger.getLogger(getClass()) + + def maybeLoad(): Boolean = { + if (Utils.getBoolean(System.getProperties(), "mx4jdisable", false)) + false + val address = System.getProperty("mx4jaddress", "0.0.0.0") + val port = Utils.getInt(System.getProperties(), "mx4jport", 8082) + try { + logger.debug("Will try to load MX4j now, if it's in the classpath"); + + val mbs = ManagementFactory.getPlatformMBeanServer() + val processorName = new ObjectName("Server:name=XSLTProcessor") + + val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor") + val httpAdaptor = httpAdaptorClass.newInstance() + httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef]) + httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef]) + + val httpName = new ObjectName("system:name=http") + mbs.registerMBean(httpAdaptor, httpName) + + val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor") + val xsltProcessor = xsltProcessorClass.newInstance() + httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef]) + mbs.registerMBean(xsltProcessor, processorName) + httpAdaptorClass.getMethod("start").invoke(httpAdaptor) + logger.info("mx4j successfuly loaded") + true + } + catch { + case e: ClassNotFoundException => { + logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); + } + case e => { + logger.warn("Could not start register mbean in JMX", e); + } + } + false + } +} diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index d37f286..c8ba1bd 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -7,4 +7,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=WARN # zkclient can be verbose, during debugging it is common to adjust is separately -log4j.logger.org.I0Itec.zkclient.ZkClient=WARN \ No newline at end of file +log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR +log4j.logger.org.apache.zookeeper=ERROR \ No newline at end of file