diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e776423..14a2f33 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1333,6 +1333,17 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } + + override def equals(obj: Any): Boolean = { + obj match { + case null => false + case n: LeaderIsrAndControllerEpoch => + leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && + leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch + case _ => false + } + } + } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcbe269..06e87fa 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,7 +20,7 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr @@ -58,7 +58,7 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = + def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic def getDeleteTopicPath(topic: String): String = @@ -99,7 +99,7 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } - + def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) makeSurePersistentPathExists(zkClient, path) @@ -280,7 +280,7 @@ object ZkUtils extends Logging { storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2: Throwable => throw e2 + case e2: Throwable => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -387,9 +387,40 @@ object ZkUtils extends Logging { .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e: Exception => + case b: ZkBadVersionException => { + // in case of zkClient session timesout and when it reconnects to zookeeeper + // it might be using the older zkVersion for the conditionalUpdate. + // checking to see if the new leaderandisr data matches the one in zookeeper + // except the version. If it matches returns true and version from zookeeper. + try { + val newLeaderStat: Stat = new Stat() + val newLeader = parseLeaderAndIsr(data,"",-1,newLeaderStat) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(client,path) + val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 + val writtenStat = writtenLeaderAndIsrInfo._2 + writtenLeaderOpt match { + case Some(writtenData) => + val writtenLeader = parseLeaderAndIsr(writtenData,"",-1,writtenStat) + (newLeader,writtenLeader) match { + case (Some(newLeader),Some(writtenLeader)) => + if(newLeader.equals(writtenLeader)) + return (true,writtenStat.getVersion) + throw b + case _ => + throw b + } + case None => throw b + } + } catch { + case e1: Exception => + error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e1.getMessage)) + (false, -1) + } + } + case e2: Exception => error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e.getMessage)) + expectVersion, e2.getMessage)) (false, -1) } } @@ -428,7 +459,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -451,7 +482,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)