diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 48aac3ef179d9cfb166c05b7daf625b0b2c5e36d..1b1904f386953547460c391a0382323c19a85b22 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -39,6 +39,30 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Loggin val lock = new Object() def startup() { + /* check that chroot exists */ + val zkChRootPath = config.zkConnect.split("/") + + if (zkChRootPath.size > 1) + { + info("checking that ZK chroot exists") + + val connect = zkChRootPath(0) + var path = "" + + zkChRootPath.zipWithIndex foreach { case(value, index) => + if (index > 0) + { + path += ( "/" + value ) + } + } + + val client = new ZkClient(connect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + + ZkUtils.makeSurePersistentPathExists(client, path) + + client.close(); + } + /* start client */ info("connecting to ZK: " + config.zkConnect) zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)