diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 518d2df..a9c0465 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig @@ -216,7 +216,7 @@ class Partition(val topic: String, inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - + leaderReplicaIdOpt.foreach { leaderReplica => if (topic == OffsetManager.OffsetsTopicName && /* if we are making a leader->follower transition */ @@ -261,7 +261,15 @@ class Partition(val topic: String, info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -325,7 +333,15 @@ class Partition(val topic: String, info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -373,22 +389,6 @@ class Partition(val topic: String, } } - private def updateIsr(newIsr: Set[Replica]) { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ - inSyncReplicas = newIsr - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } - } - override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) return false diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e776423..d20a951 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -951,7 +951,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -981,7 +981,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion) + leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) @@ -1017,7 +1017,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -1035,7 +1035,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion) + leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) if (updateSucceeded) @@ -1333,6 +1333,16 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } + + override def equals(obj: Any): Boolean = { + obj match { + case null => false + case n: LeaderIsrAndControllerEpoch => + leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && + leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch + case _ => false + } + } } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 6457b56..8c816e8 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.utils.{Logging, ZkUtils} +import kafka.utils.{Logging, ZkUtils, ReplicationUtils} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -293,7 +293,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case e: ZkNodeExistsException => // read the controller epoch - val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, + val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition).get val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " + "exists with value %s and controller epoch %d") @@ -336,7 +336,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) + ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded @@ -383,7 +383,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { val topicAndPartition = TopicAndPartition(topic, partition) - ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state" @@ -521,5 +521,3 @@ case object NewPartition extends PartitionState { val state: Byte = 0 } case object OnlinePartition extends PartitionState { val state: Byte = 1 } case object OfflinePartition extends PartitionState { val state: Byte = 2 } case object NonExistentPartition extends PartitionState { val state: Byte = 3 } - - diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 2f0f29d..ad9c7c4 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -20,7 +20,7 @@ import collection._ import collection.JavaConversions._ import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{TopicAndPartition, StateChangeFailedException} -import kafka.utils.{ZkUtils, Logging} +import kafka.utils.{ZkUtils, ReplicationUtils, Logging} import org.I0Itec.zkclient.IZkChildListener import org.apache.log4j.Logger import kafka.controller.Callbacks._ @@ -153,7 +153,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NewReplica => assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition - val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) @@ -367,5 +367,3 @@ case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4} case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5} case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6} case object NonExistentReplica extends ReplicaState { val state: Byte = 7 } - - diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala new file mode 100644 index 0000000..54e5439 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils +import kafka.cluster.Replica +import kafka.api.LeaderAndIsr +import kafka.controller.LeaderIsrAndControllerEpoch +import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.ZkClient + +import scala.Some +import scala.collection._ + +object ReplicationUtils extends Logging { + + def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int, + controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = { + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) + val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) + val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch + ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, + Some(checkLeaderAndIsrZkData)) + } + + def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion: Int): (Boolean,Int) = { + try { + val newLeaderStat: Stat = new Stat() + newLeaderStat.setVersion(zkVersion) + val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path) + val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 + val writtenStat = writtenLeaderAndIsrInfo._2 + writtenLeaderOpt match { + case Some(writtenData) => + val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) + (newLeader,writtenLeader) match { + case (Some(newLeader),Some(writtenLeader)) => + if(newLeader.equals(writtenLeader)) + return (true,writtenStat.getVersion()) + case _ => + } + case None => + } + } catch { + case e1: Exception => + } + (false,-1) + } + + def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { + val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition) + val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath) + val leaderAndIsrOpt = leaderAndIsrInfo._1 + val stat = leaderAndIsrInfo._2 + leaderAndIsrOpt match { + case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) + case None => None + } + } + + private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) + : Option[LeaderIsrAndControllerEpoch] = { + Json.parseFull(leaderAndIsrStr) match { + case Some(m) => + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] + val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] + val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] + val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] + val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, + isr.toString(), zkPathVersion, path)) + Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) + case None => None + } + } + +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcbe269..54a6d42 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,7 +20,8 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, + ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr @@ -58,7 +59,7 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = + def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic def getDeleteTopicPath(topic: String): String = @@ -85,43 +86,15 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } - def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { - val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) - val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath) - val leaderAndIsrOpt = leaderAndIsrInfo._1 - val stat = leaderAndIsrInfo._2 - leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat) - case None => None - } - } - def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { - getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } - + def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) makeSurePersistentPathExists(zkClient, path) } - def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) - : Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(leaderAndIsrStr) match { - case Some(m) => - val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] - val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] - val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] - val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader, epoch, - isr.toString(), zkPathVersion, topic, partition)) - Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) - case None => None - } - } - def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { @@ -379,17 +352,30 @@ object ZkUtils extends Logging { /** * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't * exist, the current version is not the expected version, etc.) return (false, -1) + * + * Upon receiving a ZkBadVersionException, a optional checker method will be called to match passed in data with + * zookeeper data except version. On a successful match returns true with zookeeper version, otherwise + * return (false, -1) */ - def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { + def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int, + optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean, Int) = { try { val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e: Exception => - error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e.getMessage)) + case e1: ZkBadVersionException => + optionalChecker match { + case Some(checker) => return checker(client,path,data,expectVersion) + case _ => debug("Checker method is not passed skipping zkData match") + } + warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e1.getMessage)) + (false, -1) + case e2: Exception => + warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e2.getMessage)) (false, -1) } } @@ -428,7 +414,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -451,7 +437,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) @@ -518,7 +504,7 @@ object ZkUtils extends Logging { : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] for(topicAndPartition <- topicAndPartitions) { - ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match { + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match { case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch) case None => } diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala new file mode 100644 index 0000000..47beda2 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import kafka.cluster.{Replica, Partition} +import kafka.server.{ReplicaFetcherManager, KafkaConfig} +import kafka.zk.ZooKeeperTestHarness +import kafka.common.TopicAndPartition +import org.scalatest.junit.JUnit3Suite +import org.junit.Assert._ +import org.junit.Test +import org.easymock.EasyMock + + +class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { + val topic = "my-topic-test" + val partitionId = 0 + val brokerId = 1 + val leaderEpoch = 1 + val controllerEpoch = 1 + val zkVersion = 1 + val topicPath = "/brokers/topics/my-topic-test/partitions/0/state" + val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, + "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2))) + val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, + "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(2,1))) + val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, + "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2))) + + + override def setUp() { + super.setUp() + ZkUtils.createPersistentPath(zkClient,topicPath,topicData) + } + + @Test + def testCheckLeaderAndIsrZkData() { + //mismatched zkVersion with the same data + val(dataMatched1,newZkVersion1) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataVersionMismatch,1) + assertTrue(dataMatched1) + assertEquals(newZkVersion1,0) + + //mismatched zkVersion and leaderEpoch + val(dataMatched2,newZkVersion2) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataMismatch,1) + assertFalse(dataMatched2) + assertEquals(newZkVersion2,-1) + } + + @Test + def testUpdateIsr() { + val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_)) + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() + EasyMock.expect(log) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val partition = new Partition(topic,0,1,new MockTime,replicaManager) + val replicas = Set(new Replica(1,partition),new Replica(2,partition)) + + // regular update + val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateIsr(zkClient, + "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, 0, replicas) + assertTrue(updateSucceeded1) + assertEquals(newZkVersion1,1) + + // mismatched zkVersion with the same data + val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateIsr(zkClient, + "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, zkVersion + 1, replicas) + assertTrue(updateSucceeded2) + // returns true with existing zkVersion + assertEquals(newZkVersion2,1) + + // mismatched zkVersion and leaderEpoch + val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateIsr(zkClient, + "my-topic-test", partitionId, brokerId, leaderEpoch + 1, controllerEpoch, zkVersion + 1, replicas) + assertFalse(updateSucceeded3) + assertEquals(newZkVersion3,-1) + } + +}