diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala index 9571fd5..a38a567 100644 --- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala +++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala @@ -29,27 +29,32 @@ import scala.Some object ShutdownBroker extends Logging { - private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String) + private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer) private def invokeShutdown(params: ShutdownParams): Boolean = { var zkClient: ZkClient = null try { zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer) + val controllerBrokerId = ZkUtils.getController(zkClient) - val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId) - controllerOpt match { - case Some(controller) => - val jmxUrl = new JMXServiceURL(params.jmxUrl) + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1 match { + case Some(controllerInfo) => + val parsed = controllerInfo.split(":") + val controllerHost = parsed(0) + val controllerJmxPort = parsed(2) + val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi" + .format(controllerHost, controllerJmxPort)) + info("Connecting to jmx url " + jmxUrl) val jmxc = JMXConnectorFactory.connect(jmxUrl, null) val mbsc = jmxc.getMBeanServerConnection val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName), - "shutdownBroker", - Array(params.brokerId), - Array(classOf[Int].getName)).asInstanceOf[Int] + "shutdownBroker", + Array(params.brokerId), + Array(classOf[Int].getName)).asInstanceOf[Int] val shutdownComplete = (leaderPartitionsRemaining == 0) info("Shutdown status: " + (if (shutdownComplete) - "complete" else - "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining))) + "complete" else + "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining))) shutdownComplete case None => error("Operation failed due to controller failure on %d.".format(controllerBrokerId)) @@ -88,11 +93,6 @@ object ShutdownBroker extends Logging { .describedAs("retry interval in ms (> 1000)") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.") - .withRequiredArg - .describedAs("JMX url.") - .ofType(classOf[String]) - .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi") val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt) @@ -100,8 +100,7 @@ object ShutdownBroker extends Logging { val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000) val numRetries = options.valueOf(numRetriesOpt).intValue - val shutdownParams = - ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt)) + val shutdownParams = ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt)) if (!invokeShutdown(shutdownParams)) { (1 to numRetries).takeWhile(attempt => { diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index e1c11f2..d8120df 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -43,7 +43,8 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { private def registerBrokerInZk() { info("Registering broker " + brokerIdPath) val hostName = config.hostName - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port) + val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt + ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort) } /** diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 358c4fd..ec5a504 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -180,11 +180,11 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, host, port) try { - createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString) + createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString + ":" + jmxPort) } catch { case e: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index cebb371..a508895 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -334,7 +334,7 @@ object TestUtils extends Logging { def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { val brokers = ids.map(id => new Broker(id, "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, jmxPort = -1)) brokers }