diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 26f2bd8..141cbd2 100644 --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -48,7 +48,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int } def sizeInBytes(): Int = { - val size = 4 + 4 + (2 + isr.mkString(",").length) + 4 + val size = 4 /* leader broker id */ + 4 /* leader epoch */ + (2 + isr.mkString(",").length) + 4 /* zk version */ size } @@ -73,6 +73,7 @@ object LeaderAndIsrRequest { val versionId = buffer.getShort val clientId = Utils.readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerEpoch = buffer.getInt val leaderAndISRRequestCount = buffer.getInt val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr] @@ -83,7 +84,7 @@ object LeaderAndIsrRequest { leaderAndISRInfos.put((topic, partition), leaderAndISRRequest) } - new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos) + new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, controllerEpoch, leaderAndISRInfos) } } @@ -91,17 +92,20 @@ object LeaderAndIsrRequest { case class LeaderAndIsrRequest (versionId: Short, clientId: String, ackTimeoutMs: Int, + controllerEpoch: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { - this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos) + def this(controllerEpoch: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { + this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, + controllerEpoch, leaderAndISRInfos) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerEpoch) buffer.putInt(leaderAndISRInfos.size) for((key, value) <- leaderAndISRInfos){ Utils.writeShortString(buffer, key._1, "UTF-8") @@ -111,7 +115,8 @@ case class LeaderAndIsrRequest (versionId: Short, } def sizeInBytes(): Int = { - var size = 1 + 2 + (2 + clientId.length) + 4 + 4 + var size = 2 /* version id */ + (2 + clientId.length) + 4 /* ack timeout ms */ + + 4 /* controller epoch */ + 4 /* number of leader and isr entries */ for((key, value) <- leaderAndISRInfos) size += (2 + key._1.length) + 4 + value.sizeInBytes size diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala index 2f2ba44..032c4d0 100644 --- core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -32,28 +32,32 @@ object StopReplicaRequest { val versionId = buffer.getShort val clientId = Utils.readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerEpoch = buffer.getInt val topicPartitionPairCount = buffer.getInt val topicPartitionPairSet = new HashSet[(String, Int)]() for (i <- 0 until topicPartitionPairCount){ topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt)) } - new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet) + new StopReplicaRequest(versionId, clientId, ackTimeoutMs, controllerEpoch, topicPartitionPairSet) } } case class StopReplicaRequest(versionId: Short, clientId: String, ackTimeoutMs: Int, + controllerEpoch: Int, stopReplicaSet: Set[(String, Int)]) extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { - def this(stopReplicaSet: Set[(String, Int)]) = { - this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) + def this(controllerEpoch: Int, stopReplicaSet: Set[(String, Int)]) = { + this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, + controllerEpoch, stopReplicaSet) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerEpoch) buffer.putInt(stopReplicaSet.size) for ((topic, partitionId) <- stopReplicaSet){ Utils.writeShortString(buffer, topic, "UTF-8") @@ -62,7 +66,8 @@ case class StopReplicaRequest(versionId: Short, } def sizeInBytes(): Int = { - var size = 2 + (2 + clientId.length()) + 4 + 4 + var size = 2 /* version id */ + (2 + clientId.length()) + 4 /* ack timeout ms */ + 4 /* controller epoch */ + + 4 /* number of replicas */ for ((topic, partitionId) <- stopReplicaSet){ size += (2 + topic.length()) + 4 } diff --git core/src/main/scala/kafka/cluster/Partition.scala core/src/main/scala/kafka/cluster/Partition.scala index 3aa6eab..148e740 100644 --- core/src/main/scala/kafka/cluster/Partition.scala +++ core/src/main/scala/kafka/cluster/Partition.scala @@ -113,7 +113,8 @@ class Partition(val topic: String, /** * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader. */ - def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { + def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, + controllerEpoch: Int, isMakingLeader: Boolean): Boolean = { leaderISRUpdateLock synchronized { if (leaderEpoch >= leaderAndISR.leaderEpoch){ info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request" @@ -292,7 +293,8 @@ class Partition(val topic: String, info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndISR, replicaManager.controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newISR zkVersion = newVersion diff --git core/src/main/scala/kafka/common/ControllerMovedException.scala core/src/main/scala/kafka/common/ControllerMovedException.scala new file mode 100644 index 0000000..be15231 --- /dev/null +++ core/src/main/scala/kafka/common/ControllerMovedException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.common + +class ControllerMovedException(message: String) extends RuntimeException(message) { + def this(message: String, cause: Throwable) = this(message + " Root cause -> " + kafka.utils.Utils.stackTrace(cause)) + def this() = this(null) +} \ No newline at end of file diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala index ba6f352..e127746 100644 --- core/src/main/scala/kafka/common/ErrorMapping.scala +++ core/src/main/scala/kafka/common/ErrorMapping.scala @@ -40,6 +40,7 @@ object ErrorMapping { val BrokerNotAvailableCode: Short = 8 val ReplicaNotAvailableCode: Short = 9 val MessageSizeTooLargeCode: Short = 10 + val StaleControllerEpochCode: Short = 11 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -52,7 +53,8 @@ object ErrorMapping { classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode, classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode, - classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode + classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode, + classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git core/src/main/scala/kafka/common/StateChangeFailedException.scala core/src/main/scala/kafka/common/StateChangeFailedException.scala index a78ca6b..ed25828 100644 --- core/src/main/scala/kafka/common/StateChangeFailedException.scala +++ core/src/main/scala/kafka/common/StateChangeFailedException.scala @@ -18,6 +18,6 @@ package kafka.common class StateChangeFailedException(message: String) extends RuntimeException(message) { - def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString) + def this(message: String, cause: Throwable) = this(message + " Root cause -> " + kafka.utils.Utils.stackTrace(cause)) def this() = this(null) } \ No newline at end of file diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 90cf187..220c25d 100644 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -137,8 +137,6 @@ class RequestSendThread(val controllerId: Int, } } -// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr -// request class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit) extends Logging { val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] @@ -158,11 +156,11 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques } } - def sendRequestsToBrokers() { + def sendRequestsToBrokers(controllerEpoch: Int) { brokerRequestMap.foreach { m => val broker = m._1 val leaderAndIsr = m._2 - val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr) + val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerEpoch, leaderAndIsr) info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index d43af7f..9ed21dc 100644 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,6 +28,8 @@ import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import com.yammer.metrics.core.Gauge +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import kafka.common.ControllerMovedException class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, @@ -46,6 +48,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, config.brokerId) + var epoch = 1 newGauge( "ActiveControllerCount", @@ -57,15 +60,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - - * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and + * 1. Increments the controller epoch + * 2. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. - * 2. Starts the controller's channel manager - * 3. Starts the replica state machine - * 4. Starts the partition state machine + * 3. Starts the controller's channel manager + * 4. Starts the replica state machine + * 5. Starts the partition state machine */ def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) + // increment the controller epoch + epoch = incrementControllerEpoch(zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() @@ -151,23 +157,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /* TODO: kafka-330 This API is unused until we introduce the delete topic functionality. remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ - def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { - val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]] - for((topicPartition, brokers) <- replicaAssignment){ - for (broker <- brokers){ - if (!brokerToPartitionToStopReplicaMap.contains(broker)) - brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)]) - brokerToPartitionToStopReplicaMap(broker).add(topicPartition) - } - controllerContext.allLeaders.remove(topicPartition) - ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2)) - } - for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){ - val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica) - info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest)) - sendRequest(broker, stopReplicaRequest) - } - } +// def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { +// val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]] +// for((topicPartition, brokers) <- replicaAssignment){ +// for (broker <- brokers){ +// if (!brokerToPartitionToStopReplicaMap.contains(broker)) +// brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)]) +// brokerToPartitionToStopReplicaMap(broker).add(topicPartition) +// } +// controllerContext.allLeaders.remove(topicPartition) +// ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2)) +// } +// for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){ +// val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica) +// info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest)) +// sendRequest(broker, stopReplicaRequest) +// } +// } /** * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker @@ -244,6 +250,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } + def incrementControllerEpoch(zkClient: ZkClient): Int = { + var newControllerEpoch = 1 + // if controller persistent path doesn't exist, create one + ZkUtils.pathExists(zkClient, ZkUtils.ControllerEpochPath) match { + case true => // read the current value and increment by one + val previousControllerData = ZkUtils.readData(zkClient, ZkUtils.ControllerEpochPath) + newControllerEpoch = previousControllerData._1.toInt + 1 + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.ControllerEpochPath, + newControllerEpoch.toString, previousControllerData._2.getVersion) + if(!updateSucceeded) + throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") + case false => // if path doesn't exist, this is the first controller whose epoch should be 1 + // the following call can still fail if another controller gets elected between checking if the path exists and + // trying to create the controller epoch path + try { + zkClient.createPersistent(ZkUtils.ControllerEpochPath, "1") + }catch { + case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + + "Aborting controller startup procedure") + } + } + newControllerEpoch + } + class SessionExpireListener() extends IZkStateListener with Logging { this.logIdent = "[Controller " + config.brokerId + "], " @throws(classOf[Exception]) @@ -274,6 +304,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } +case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) + object ControllerStat extends KafkaMetricsGroup { val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala index a9c094c..a82e41f 100644 --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -84,7 +84,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach { partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition) } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e => error("Error while moving some partitions to the online state", e) } @@ -102,7 +102,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState) } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) } @@ -172,9 +172,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val partition = topicPartition._2 // check if leader and isr path exists for partition. If not, then it is in NEW state ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { - case Some(currentLeaderAndIsr) => + case Some(currentLeaderIsrAndEpoch) => // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state - controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match { + controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match { case true => // leader is alive partitionState.put(topicPartition, OnlinePartition) case false => @@ -233,8 +233,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList) try { ZkUtils.createPersistentPath(controllerContext.zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString) - // TODO: the above write can fail only if the current controller lost its zk session and the new controller + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch)) + // the above write can fail only if the current controller lost its zk session and the new controller // took over and initialized this partition. This can happen if the current controller went into a long // GC pause brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr) @@ -242,9 +243,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitionState.put((topic, partition), OnlinePartition) }catch { case e: ZkNodeExistsException => + // read the controller epoch + val leaderIsrAndEpoch = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition).get ControllerStat.offlinePartitionRate.mark() throw new StateChangeFailedException("Error while changing partition [%s, %d]'s state from New to Online" - .format(topic, partition) + " since Leader and ISR path already exists") + .format(topic, partition) + " since Leader and isr path already exists with value " + + "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), + leaderIsrAndEpoch.controllerEpoch)) } } } @@ -303,7 +308,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { var newLeaderAndIsr: LeaderAndIsr = null while(!zookeeperPathUpdateSucceeded) { newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { - case Some(currentLeaderAndIsr) => + case Some(currentLeaderIsrAndEpoch) => + val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > controller.epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) @@ -327,7 +338,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val newLeader = liveAssignedReplicasToThisPartition.head warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + "There's potential data loss") - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion+1) } case false => val newLeader = liveBrokersInIsr.head @@ -338,7 +349,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion) + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded newLeaderAndIsr diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 3574d3a..662756d 100644 --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -86,7 +86,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(partitionsAssignedToThisBroker.size == 0) info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controller.epoch) }catch { case e => error("Error while moving some replicas to %s state".format(targetState), e) } @@ -105,12 +105,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { targetState match { case OnlineReplica => assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState) - // check if the leader for this partition is alive or even exists - // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper - // for the ISR anyways - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - leaderAndIsrOpt match { - case Some(leaderAndIsr) => + val leaderIsrAndEpochOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + leaderIsrAndEpochOpt match { + case Some(leaderIsrAndEpoch) => + val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { case true => // leader is alive brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) @@ -125,18 +123,25 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { // As an optimization, the controller removes dead replicas from the ISR var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null + var retry = false while(!zookeeperPathUpdateSucceeded) { // refresh leader and isr from zookeeper again - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - leaderAndIsrOpt match { - case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes + val leaderIsrAndEpochOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + leaderIsrAndEpochOpt match { + case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes + val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > controller.epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString, - leaderAndIsr.zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controller.epoch), leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) + diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index 4fe8248..b963962 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,8 +70,8 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) - val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) - val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) + val (responseMap, error) = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) + val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) } @@ -82,13 +82,8 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling stop replica request " + stopReplicaRequest) trace("Handling stop replica request " + stopReplicaRequest) - val responseMap = new HashMap[(String, Int), Short] - - for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){ - val errorCode = replicaManager.stopReplica(topic, partitionId) - responseMap.put((topic, partitionId), errorCode) - } - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) + val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) } diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala index f078b99..56d1532 100644 --- core/src/main/scala/kafka/server/ReplicaManager.scala +++ core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,11 +22,11 @@ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ import kafka.log.LogManager -import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr} import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit +import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest, LeaderAndIsr} object ReplicaManager { val UnknownLogEndOffset = -1L @@ -39,7 +39,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient private val leaderPartitionsLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " - + var controllerEpoch = 1 private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) @@ -74,7 +74,23 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) } - def stopReplica(topic: String, partitionId: Int): Short = { + def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = { + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + (stopReplicaRequest.controllerEpoch < controllerEpoch) match { + case true => + error("Received stop replica request from an old controller with epoch " + stopReplicaRequest.controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + case false => + controllerEpoch = stopReplicaRequest.controllerEpoch + for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){ + val errorCode = stopReplica(topic, partitionId) + responseMap.put((topic, partitionId), errorCode) + } + (responseMap, ErrorMapping.NoError) + } + } + + private def stopReplica(topic: String, partitionId: Int): Short = { trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId)) val errorCode = ErrorMapping.NoError getReplica(topic, partitionId) match { @@ -137,48 +153,40 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = { - info("Handling leader and isr request %s".format(leaderAndISRRequest)) + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { val responseMap = new collection.mutable.HashMap[(String, Int), Short] - - for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ - var errorCode = ErrorMapping.NoError - val topic = partitionInfo._1 - val partitionId = partitionInfo._2 - - val requestedLeaderId = leaderAndISR.leader - try { - if(requestedLeaderId == config.brokerId) - makeLeader(topic, partitionId, leaderAndISR) - else - makeFollower(topic, partitionId, leaderAndISR) - } catch { - case e => - error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) - errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { + error("Received leader and isr request from an old controller with epoch " + leaderAndISRRequest.controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + }else { + info("Handling leader and isr request %s".format(leaderAndISRRequest)) + controllerEpoch = leaderAndISRRequest.controllerEpoch + for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ + var errorCode = ErrorMapping.NoError + val topic = partitionInfo._1 + val partitionId = partitionInfo._2 + + val requestedLeaderId = leaderAndISR.leader + try { + if(requestedLeaderId == config.brokerId) + makeLeader(topic, partitionId, leaderAndISR) + else + makeFollower(topic, partitionId, leaderAndISR) + } catch { + case e => + error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) + errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } + responseMap.put(partitionInfo, errorCode) } - responseMap.put(partitionInfo, errorCode) + (responseMap, ErrorMapping.NoError) } - - /** - * If IsInit flag is on, this means that the controller wants to treat topics not in the request - * as deleted. - * TODO: Handle this properly as part of KAFKA-330 - */ -// if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){ -// startHighWaterMarksCheckPointThread -// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1) -// info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) -// partitionsToRemove.foreach(p => stopReplica(p._1, p._2)) -// } - - responseMap } private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = { info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) { + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, controllerEpoch, true)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -193,7 +201,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient .format(leaderBrokerId, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) { + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, controllerEpoch, false)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition diff --git core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index ef04d6a..f5cc02c 100644 --- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -44,8 +44,6 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: } } - def amILeader : Boolean = leaderId == brokerId - def elect: Boolean = { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) try { @@ -56,10 +54,12 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: } catch { case e: ZkNodeExistsException => // If someone else has written the path, then - debug("Someone else was elected as leader other than " + brokerId) val data: String = controllerContext.zkClient.readData(electionPath, true) - if (data != null) leaderId = data.toInt - case e2 => throw e2 + debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId)) + if (data != null) { + leaderId = data.toInt + } + case e2 => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) } amILeader } @@ -68,6 +68,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: leaderId = -1 } + def amILeader : Boolean = leaderId == brokerId + /** * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will * have its own session expiration listener and handler @@ -79,6 +81,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { + controllerContext.controllerLock synchronized { + leaderId = data.toString.toInt + info("New leader is %d".format(leaderId)) + } } /** diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala index 66332a4..5c107cc 100644 --- core/src/main/scala/kafka/utils/ZkUtils.scala +++ core/src/main/scala/kafka/utils/ZkUtils.scala @@ -24,15 +24,18 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ import kafka.api.LeaderAndIsr +import mutable.HashMap import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} import kafka.common.{KafkaException, NoEpochForPartitionException} +import kafka.controller.LeaderIsrAndControllerEpoch object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" val ControllerPath = "/controller" + val ControllerEpochPath = "/controllerEpoch" def getTopicPath(topic: String): String ={ BrokerTopicsPath + "/" + topic @@ -70,7 +73,7 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } - def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { + def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition) val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath) val leaderAndIsrOpt = leaderAndIsrInfo._1 @@ -82,11 +85,12 @@ object ZkUtils extends Logging { val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt val isr = Utils.getCSVList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, isr.toString(), zkPathVersion, topic, partition)) - Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion)) + Some(new LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch)) case None => None } case None => None // TODO: Handle if leader and isr info is not available in zookeeper @@ -182,6 +186,15 @@ object ZkUtils extends Logging { topicDirs.consumerOwnerDir + "/" + partition } + def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { + val jsonDataMap = new HashMap[String, String] + jsonDataMap.put("leader", leaderAndIsr.leader.toString) + jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString) + jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(",")) + jsonDataMap.put("controllerEpoch", controllerEpoch.toString) + Utils.stringMapToJsonString(jsonDataMap) + } + /** * make sure a persistent path exists in ZK. Create the path if not exist. */ @@ -426,7 +439,7 @@ object ZkUtils extends Logging { for((topic, partitions) <- partitionsForTopics) { for(partition <- partitions) { ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match { - case Some(leaderAndIsr) => ret.put((topic, partition.toInt), leaderAndIsr) + case Some(leaderIsrAndEpoch) => ret.put((topic, partition.toInt), leaderIsrAndEpoch.leaderAndIsr) case None => } } diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala index c9e2229..b132a5b 100644 --- core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head .partitionsMetadata.map(p => p.replicas) val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList @@ -189,7 +189,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head newTopicMetadata.errorCode match { diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 203e0ff..f1f8075 100644 --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -69,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderForPartitionMap = Map( 0 -> configs.head.brokerId ) - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val topicMetadata = mockLogManagerAndTestTopic(topic) assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index c463763..cc558a6 100644 --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -72,7 +72,7 @@ object RpcDataSerializationTestUtils{ val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) val map = Map(((topic1, 0), leaderAndISR1), ((topic2, 0), leaderAndISR2)) - new LeaderAndIsrRequest(map) + new LeaderAndIsrRequest(1, map) } def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { @@ -82,7 +82,7 @@ object RpcDataSerializationTestUtils{ } def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) + new StopReplicaRequest(1, Set((topic1, 0), (topic2, 0))) } def createTestStopReplicaResponse() : StopReplicaResponse = { diff --git core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index d926813..28475af 100644 --- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -23,6 +23,11 @@ import kafka.admin.CreateTopicCommand import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.controller.{ControllerChannelManager, KafkaController} +import kafka.cluster.Broker +import kafka.api.{LeaderAndISRResponse, RequestOrResponse, LeaderAndIsr, LeaderAndIsrRequest} +import kafka.common.ErrorMapping +import collection.mutable.HashMap class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 @@ -35,6 +40,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var staleControllerEpochDetected = false + override def setUp() { super.setUp() // start both servers @@ -93,4 +100,47 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { else assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) } + + def testLeaderElectionWithStaleControllerEpoch() { + // start 2 brokers + val topic = "new-topic" + val partitionId = 0 + + // create topic with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + + // wait until leader is elected + val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) + val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) + debug("leader Epoc: " + leaderEpoch1) + debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) + assertTrue("Leader should get elected", leader1.isDefined) + // NOTE: this is to avoid transient test failures + assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertEquals("First epoch value should be 0", 0, leaderEpoch1) + + // start another controller + val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort())) + val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port)) + val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig) + controllerChannelManager.startup() + val staleControllerEpoch = 0 + val leaderAndIsr = new HashMap[(String, Int), LeaderAndIsr] + leaderAndIsr.put((topic, partitionId), new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2))) + val leaderAndIsrRequest = new LeaderAndIsrRequest(staleControllerEpoch, leaderAndIsr) + + controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) + TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000) + assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected) + + controllerChannelManager.shutdown() + } + + private def staleControllerEpochCallback(response: RequestOrResponse): Unit = { + val leaderAndIsrResponse = response.asInstanceOf[LeaderAndISRResponse] + staleControllerEpochDetected = leaderAndIsrResponse.errorCode match { + case ErrorMapping.StaleControllerEpochCode => true + case _ => false + } + } } \ No newline at end of file diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0e47daf..ac7082a 100644 --- core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -372,7 +372,9 @@ object TestUtils extends Logging { new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) } - def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) { + def makeLeaderForPartition(zkClient: ZkClient, topic: String, + leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int], + controllerEpoch: Int) { leaderPerPartitionMap.foreach { leaderForPartition => { @@ -384,12 +386,13 @@ object TestUtils extends Logging { if(currentLeaderAndISROpt == None) newLeaderAndISR = new LeaderAndIsr(leader, List(leader)) else{ - newLeaderAndISR = currentLeaderAndISROpt.get + newLeaderAndISR = currentLeaderAndISROpt.get.leaderAndIsr newLeaderAndISR.leader = leader newLeaderAndISR.leaderEpoch += 1 newLeaderAndISR.zkVersion += 1 } - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), + ZkUtils.leaderAndIsrZkData(newLeaderAndISR, controllerEpoch)) } catch { case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) } diff --git core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 3600873..85eec6f 100644 --- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package kafka.zk @@ -35,7 +35,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { try { ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created") } catch { - case e: Exception => println("Exception in creating ephemeral node") + case e: Exception => } var testData: String = null