diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd..9bdfcc21554ab858956189cdffdd11510711d207 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer +import org.apache.kafka.common.config.ConfigException import collection._ import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.Stat @@ -208,7 +209,7 @@ object ZkUtils extends Logging { */ def makeSurePersistentPathExists(client: ZkClient, path: String) { if (!client.exists(path)) - client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException + new ZkPath(client).createPersistent(path, true) // won't throw NoNodeException or NodeExistsException } /** @@ -216,20 +217,22 @@ object ZkUtils extends Logging { */ private def createParentPath(client: ZkClient, path: String): Unit = { val parentDir = path.substring(0, path.lastIndexOf('/')) - if (parentDir.length != 0) - client.createPersistent(parentDir, true) + if (parentDir.length != 0) { + new ZkPath(client).createPersistent(parentDir, true) + } } /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + val zkPath = new ZkPath(client) try { - client.createEphemeral(path, data) + zkPath.createEphemeral(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createEphemeral(path, data) + zkPath.createEphemeral(path, data) } } } @@ -308,18 +311,19 @@ object ZkUtils extends Logging { * Create an persistent node with the given path and data. Create parents if necessary. */ def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { + val zkPath = new ZkPath(client) try { - client.createPersistent(path, data) + zkPath.createPersistent(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createPersistent(path, data) + zkPath.createPersistent(path, data) } } } def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { - client.createPersistentSequential(path, data) + new ZkPath(client).createPersistentSequential(path, data) } /** @@ -334,7 +338,7 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => { createParentPath(client, path) try { - client.createPersistent(path, data) + new ZkPath(client).createPersistent(path, data) } catch { case e: ZkNodeExistsException => client.writeData(path, data) @@ -405,7 +409,7 @@ object ZkUtils extends Logging { } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createEphemeral(path, data) + new ZkPath(client).createEphemeral(path, data) } case e2: Throwable => throw e2 } @@ -754,3 +758,25 @@ class ZKConfig(props: VerifiableProperties) { /** how far a ZK follower can be behind a ZK leader */ val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000) } + +class ZkPath(client: ZkClient) { + if (!client.exists("/")) { + throw new ConfigException("Zookeeper namespace does not exist") + } + + def createPersistent(path: String, data: Object) { + client.createPersistent(path, data) + } + + def createPersistent(path: String, createParents: Boolean) { + client.createPersistent(path, createParents) + } + + def createEphemeral(path: String, data: Object) { + client.createEphemeral(path, data) + } + + def createPersistentSequential(path: String, data: Object): String = { + client.createPersistentSequential(path, data) + } +}