diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 390fef500d7e0027e698c259d777454ba5a0f5e8..dd93c23e1b54388a1430a6575a4ee90b9513dc7a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,6 +25,7 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import java.net.BindException import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker @@ -129,11 +130,34 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) + + val chroot = { + if (config.zkConnect.indexOf("/") > 0) + config.zkConnect.substring(config.zkConnect.indexOf("/")) + else + "" + } + + if (chroot.length > 1) { + val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) + val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val sepIndex = zkConnForChrootCreation.indexOf('/') + if (sepIndex >0) + { + val connect = zkConnForChrootCreation.substring(0, sepIndex) + val path = zkConnForChrootCreation.substring(sepIndex) + ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, path) + info("Created zookeeper path " + path) + } + zkClientForChrootCreation.close() + } + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient } + /** * Forces some dynamic jmx beans to be registered on server startup. */