From bbb11752ebfadfa01e426c2bffdebb784aa8c076 Mon Sep 17 00:00:00 2001 From: Jonathan Creasy Date: Thu, 25 Sep 2014 00:10:23 -0500 Subject: [PATCH] KAFKA-404 auto-create Zookeeper CHROOT on Startup --- core/src/main/scala/kafka/server/KafkaServer.scala | 18 ++++++++ .../unit/kafka/server/CreateZKChrootTest.scala | 54 ++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 390fef5..52eba74 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,6 +25,7 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import java.net.BindException import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker @@ -129,11 +130,28 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) + + val chroot = { + if (config.zkConnect.indexOf("/") > 0) + config.zkConnect.substring(config.zkConnect.indexOf("/")) + else + "" + } + + if (chroot.length > 1) { + val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) + val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) + info("Created zookeeper path " + chroot) + zkClientForChrootCreation.close() + } + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient } + /** * Forces some dynamic jmx beans to be registered on server startup. */ diff --git a/core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala b/core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala new file mode 100644 index 0000000..bb48c44 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk +import kafka.utils.ZkUtils +import kafka.utils.Utils +import kafka.utils.TestUtils + +import kafka.zk.ZooKeeperTestHarness +import junit.framework.Assert._ + +class CreateZKChrootTest extends JUnit3Suite with ZooKeeperTestHarness { + var server : KafkaServer = null + val brokerId = 0 + val zookeeperChroot = "/kafka-chroot-for-unittest" + + override def setUp() { + super.setUp() + val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + val zooKeeperConnect = props.get("zookeeper.connect") + props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) + + server = TestUtils.createServer(new KafkaConfig(props)) + } + + override def tearDown() { + server.shutdown() + Utils.rm(server.config.logDirs) + super.tearDown() + } + + def testBrokerCreatesZKChroot { + val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot) + assertTrue(pathExists) + } + +} \ No newline at end of file -- 1.9.3 (Apple Git-50)