diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcbe269..f785e33 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,7 +20,7 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr @@ -58,7 +58,7 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = + def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic def getDeleteTopicPath(topic: String): String = @@ -99,7 +99,7 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } - + def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) makeSurePersistentPathExists(zkClient, path) @@ -280,7 +280,7 @@ object ZkUtils extends Logging { storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2: Throwable => throw e2 + case e2: Throwable => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -387,13 +387,53 @@ object ZkUtils extends Logging { .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e: Exception => + case b: ZkBadVersionException => { + // in case of zkClient session timesout and when it reconnects to zookeeeper + // it might be using the older zkVersion for the conditionalUpdate. + // checking to see if the new leaderandisr data matches the one in zookeeper + // except the version. If it matches returns true and version from zookeeper. + try { + val newLeader = getLeaderIsrAndControllerEpochFromJson(data) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(client,path) + val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 + val writtenStat = writtenLeaderAndIsrInfo._2 + writtenLeaderOpt match { + case Some(writtenData) => + val writtenLeader = getLeaderIsrAndControllerEpochFromJson(writtenData) + if (newLeader.equals(writtenLeader)) + return (true,writtenStat.getVersion) + else + throw b + case None => throw b + } + } catch { + case e1: Exception => + error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e1.getMessage)) + (false, -1) + } + } + case e2: Exception => error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e.getMessage)) + expectVersion, e2.getMessage)) (false, -1) } } + def getLeaderIsrAndControllerEpochFromJson(data: String): LeaderIsrAndControllerEpoch = { + Json.parseFull(data) match { + case Some(m) => + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] + val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] + val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] + val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]].sorted + val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] + val zkPathVersion = leaderIsrAndEpochInfo.get("version").get.asInstanceOf[Int] + new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch) + case None => null + } + } + /** * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException @@ -428,7 +468,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -451,7 +491,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)