diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a9c0465..f2ca856 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging} +import kafka.utils.{ReplicationUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig @@ -261,15 +261,7 @@ class Partition(val topic: String, info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache - val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, - leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) - if(updateSucceeded) { - inSyncReplicas = newInSyncReplicas - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } + updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -333,15 +325,7 @@ class Partition(val topic: String, info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, - leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) - if(updateSucceeded) { - inSyncReplicas = newInSyncReplicas - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } + updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -389,6 +373,19 @@ class Partition(val topic: String, } } + private def updateIsr(newIsr: Set[Replica]) { + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId, + newLeaderAndIsr, controllerEpoch, zkVersion) + if(updateSucceeded) { + inSyncReplicas = newIsr + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } + } + override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) return false diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8af48ab..94bbd33 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -953,7 +953,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -979,13 +979,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, newIsr, leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( - zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) - newLeaderAndIsr.zkVersion = newVersion + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition, + newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get) if (updateSucceeded) @@ -1019,7 +1016,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -1033,11 +1030,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, leaderAndIsr.isr, leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( - zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, + partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) if (updateSucceeded) @@ -1335,16 +1330,6 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } - - override def equals(obj: Any): Boolean = { - obj match { - case null => false - case n: LeaderIsrAndControllerEpoch => - leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && - leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch - case _ => false - } - } } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e29e470..34c70b6 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -293,7 +293,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case e: ZkNodeExistsException => // read the controller epoch - val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, + val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition).get val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " + "exists with value %s and controller epoch %d") @@ -334,9 +334,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } // elect new leader or throw exception val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition, + leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded @@ -383,7 +382,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { val topicAndPartition = TopicAndPartition(topic, partition) - ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state" diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 2f0f29d..ad9c7c4 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -20,7 +20,7 @@ import collection._ import collection.JavaConversions._ import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{TopicAndPartition, StateChangeFailedException} -import kafka.utils.{ZkUtils, Logging} +import kafka.utils.{ZkUtils, ReplicationUtils, Logging} import org.I0Itec.zkclient.IZkChildListener import org.apache.log4j.Logger import kafka.controller.Callbacks._ @@ -153,7 +153,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NewReplica => assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition - val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) @@ -367,5 +367,3 @@ case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4} case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5} case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6} case object NonExistentReplica extends ReplicaState { val state: Byte = 7 } - - diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index eb53837..7157673 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -16,7 +16,7 @@ */ package kafka.utils -import kafka.cluster.Replica + import kafka.api.LeaderAndIsr import kafka.controller.LeaderIsrAndControllerEpoch import org.apache.zookeeper.data.Stat @@ -27,31 +27,27 @@ import scala.collection._ object ReplicationUtils extends Logging { - def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int, - controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, + zkVersion: Int): (Boolean,Int) = { + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(","))) val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, - Some(checkLeaderAndIsrZkData)) + ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) } - def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion: Int): (Boolean,Int) = { + def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = { try { - val newLeaderStat: Stat = new Stat() - newLeaderStat.setVersion(zkVersion) - val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat) - val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, path) val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 val writtenStat = writtenLeaderAndIsrInfo._2 + val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat) writtenLeaderOpt match { case Some(writtenData) => val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) - (newLeader,writtenLeader) match { - case (Some(newLeader),Some(writtenLeader)) => - if(newLeader.equals(writtenLeader)) + (expectedLeader,writtenLeader) match { + case (Some(expectedLeader),Some(writtenLeader)) => + if(expectedLeader == writtenLeader) return (true,writtenStat.getVersion()) case _ => } @@ -63,7 +59,18 @@ object ReplicationUtils extends Logging { (false,-1) } - def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) + def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { + val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition) + val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath) + val leaderAndIsrOpt = leaderAndIsrInfo._1 + val stat = leaderAndIsrInfo._2 + leaderAndIsrOpt match { + case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) + case None => None + } + } + + private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) : Option[LeaderIsrAndControllerEpoch] = { Json.parseFull(leaderAndIsrStr) match { case Some(m) => diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 1a23eb4..dcdc1ce 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -19,24 +19,20 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount -import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr -import mutable.ListBuffer import org.apache.zookeeper.data.Stat -import java.util.concurrent.locks.{ReentrantLock, Condition} import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext -import kafka.controller.PartitionAndReplica import kafka.controller.KafkaController -import scala.{collection, Some} +import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition -import kafka.utils.Utils.inLock import scala.collection object ZkUtils extends Logging { @@ -86,19 +82,8 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } - def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { - val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) - val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath) - val leaderAndIsrOpt = leaderAndIsrInfo._1 - val stat = leaderAndIsrInfo._2 - leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => ReplicationUtils.parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) - case None => None - } - } - def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { - getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } def setupCommonPaths(zkClient: ZkClient) { @@ -363,26 +348,29 @@ object ZkUtils extends Logging { /** * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't * exist, the current version is not the expected version, etc.) return (false, -1) + * + * When there is a ConnectionLossException during the conditional update, zkClient will retry the update and may fail + * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one). + * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded. */ def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int, - optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean, Int) = { + optionalChecker:Option[(ZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = { try { val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e1: ZkBadVersionException => { + case e1: ZkBadVersionException => optionalChecker match { - case Some(checker) => return checker(client,path,data,expectVersion) + case Some(checker) => return checker(client, path, data) case _ => debug("Checker method is not passed skipping zkData match") } - error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, expectVersion, e1.getMessage)) (false, -1) - } case e2: Exception => - error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, expectVersion, e2.getMessage)) (false, -1) } @@ -512,7 +500,7 @@ object ZkUtils extends Logging { : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] for(topicAndPartition <- topicAndPartitions) { - ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match { + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match { case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch) case None => } diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index f364980..84e0855 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -17,22 +17,17 @@ package kafka.utils -import kafka.cluster.{Replica, Partition} import kafka.server.{ReplicaFetcherManager, KafkaConfig} -import kafka.utils.TestUtils._ +import kafka.api.LeaderAndIsr import kafka.zk.ZooKeeperTestHarness -import kafka.log.Log import kafka.common.TopicAndPartition import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ import org.junit.Test -import org.I0Itec.zkclient.ZkClient import org.easymock.EasyMock -import org.apache.log4j.Logger class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { - private val logger = Logger.getLogger(classOf[UtilsTest]) val topic = "my-topic-test" val partitionId = 0 val brokerId = 1 @@ -43,7 +38,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2))) val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, - "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(2,1))) + "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(1,2))) val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2))) @@ -53,58 +48,48 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { ZkUtils.createPersistentPath(zkClient,topicPath,topicData) } - def testCheckLeaderAndIsrZkData() { - //mismatched zkVersion with the same data - val(dataMatched1,newZkVersion1) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataVersionMismatch,1) - assertTrue(dataMatched1) - assertEquals(newZkVersion1,0) - - //mismatched zkVersion and leaderEpoch - val(dataMatched2,newZkVersion2) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataMismatch,1) - assertFalse(dataMatched2) - assertEquals(newZkVersion2,-1) + @Test + def testUpdateLeaderAndIsr() { + val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_)) + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() + EasyMock.expect(log) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val replicas = List(0,1) + + // regular update + val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0) + val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient, + "my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0) + assertTrue(updateSucceeded1) + assertEquals(newZkVersion1, 1) + + // mismatched zkVersion with the same data + val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion + 1) + val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient, + "my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1) + assertTrue(updateSucceeded2) + // returns true with existing zkVersion + assertEquals(newZkVersion2,1) + + // mismatched zkVersion and leaderEpoch + val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion + 1) + val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient, + "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1) + assertFalse(updateSucceeded3) + assertEquals(newZkVersion3,-1) } - def testUpdateIsr() { - val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_)) - - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() - EasyMock.expect(log) - EasyMock.replay(log) - - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() - EasyMock.replay(logManager) - - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.replay(replicaManager) - - val partition = new Partition(topic,0,1,new MockTime,replicaManager) - val replicas = Set(new Replica(1,partition),new Replica(2,partition)) - - // regular update - val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, 0, replicas) - assertTrue(updateSucceeded1) - assertEquals(newZkVersion1,1) - - // mismatched zkVersion with the same data - val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, zkVersion + 1, replicas) - assertTrue(updateSucceeded2) - // returns true with existing zkVersion - assertEquals(newZkVersion2,1) - - // mismatched zkVersion and leaderEpoch - val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch + 1, controllerEpoch, zkVersion + 1, replicas) - assertFalse(updateSucceeded3) - assertEquals(newZkVersion3,-1) - } - }