diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 1423b28503d8b2533d402a31d45ce042afab7ba5..f091f1773667bcb36d62c0a7b5079718bcf2bbd9 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -49,6 +49,30 @@ class KafkaZooKeeper(config: KafkaConfig, private val leaderChangeLock = new Object def startup() { + /* check that chroot exists */ + val zkChRootPath = config.zkConnect.split("/") + + if (zkChRootPath.size > 1) + { + info("checking that ZK chroot exists") + + val connect = zkChRootPath(0) + var path = "" + + zkChRootPath.zipWithIndex foreach { case(value, index) => + if (index > 0) + { + path += ( "/" + value ) + } + } + + val client = new ZkClient(connect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + + ZkUtils.makeSurePersistentPathExists(client, path) + + client.close(); + } + /* start client */ info("connecting to ZK: " + config.zkConnect) zkClient = KafkaZookeeperClient.getZookeeperClient(config)