diff --git a/core/build.sbt b/core/build.sbt index 405ea55..c54cf44 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -12,7 +12,7 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ ) libraryDependencies ++= Seq( "org.apache.zookeeper" % "zookeeper" % "3.3.4", - "com.101tec" % "zkclient" % "0.2", + "com.101tec" % "zkclient" % "0.3", "org.xerial.snappy" % "snappy-java" % "1.0.4.1", "com.yammer.metrics" % "metrics-core" % "2.2.0", "com.yammer.metrics" % "metrics-annotation" % "2.2.0", diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2f5dff6..63ea87e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -298,22 +298,17 @@ object ZkUtils extends Logging { * create parrent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String): Int = { - var stat: Stat = null + def updatePersistentPath(client: ZkClient, path: String, data: String) = { try { - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { client.createPersistent(path, data) - // When the new path is created, its zkVersion always starts from 0 - return 0 } catch { case e: ZkNodeExistsException => - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) case e2 => throw e2 } } @@ -327,7 +322,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) @@ -345,7 +340,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion)