Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (revision 1402398) +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (working copy) @@ -87,7 +87,7 @@ val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) - new LeaderAndIsrRequest(map) + new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]()) } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1402398) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -112,73 +112,72 @@ /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader. + * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. + * 1. stop the existing replica fetcher + * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) + * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) + * 4. set the new leader and ISR */ - def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { + def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = { leaderIsrUpdateLock synchronized { if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request" - .format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else "follower")) + info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request" + .format(leaderEpoch, leaderAndIsr.leaderEpoch)) return false } - if(isMakingLeader) - makeLeader(topic, partitionId, leaderAndIsr) - else - makeFollower(topic, partitionId, leaderAndIsr) + trace("Started to become leader at the request %s".format(leaderAndIsr.toString())) + // stop replica fetcher thread, if any + replicaFetcherManager.removeFetcher(topic, partitionId) + + val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet + // reset LogEndOffset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) + inSyncReplicas = newInSyncReplicas + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(localBrokerId) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(getReplica().get) true } } /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. - * 1. stop the existing replica fetcher - * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) - * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) - * 4. set the new leader and ISR - */ - private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) { - trace("Started to become leader at the request %s".format(leaderAndIsr.toString())) - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(topic, partitionId) - - val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet - // reset LogEndOffset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) - inSyncReplicas = newInSyncReplicas - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(localBrokerId) - // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(getReplica().get) - } - - /** + * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the follower in the following steps. * 1. stop any existing fetcher on this partition from the local replica * 2. make sure local replica exists and truncate the log to high watermark * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) = { - trace("Started to become follower at the request %s".format(leaderAndIsr.toString())) - val newLeaderBrokerId: Int = leaderAndIsr.leader - info("Starting the follower state transition to follow leader %d for topic %s partition %d" - .format(newLeaderBrokerId, topic, partitionId)) - ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match { - case Some(leaderBroker) => - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(topic, partitionId) - // make sure local replica exists - val localReplica = getOrCreateReplica() - localReplica.log.get.truncateTo(localReplica.highWatermark) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) - case None => // leader went down - warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) + - " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId)) + def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = { + leaderIsrUpdateLock synchronized { + if (leaderEpoch >= leaderAndIsr.leaderEpoch){ + info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request" + .format(leaderEpoch, leaderAndIsr.leaderEpoch)) + return false + } + trace("Started to become follower at the request %s".format(leaderAndIsr.toString())) + val newLeaderBrokerId: Int = leaderAndIsr.leader + info("Starting the follower state transition to follow leader %d for topic %s partition %d" + .format(newLeaderBrokerId, topic, partitionId)) + liveBrokers.find(_.id == newLeaderBrokerId) match { + case Some(leaderBroker) => + // stop fetcher thread to previous leader + replicaFetcherManager.removeFetcher(topic, partitionId) + // make sure local replica exists + val localReplica = getOrCreateReplica() + localReplica.log.get.truncateTo(localReplica.highWatermark) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + // start fetcher thread to current leader + replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + case None => // leader went down + warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) + + " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId)) + } + true } } @@ -325,4 +324,4 @@ partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) partitionString.toString() } -} \ No newline at end of file +} Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala (revision 1402398) +++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala (working copy) @@ -86,7 +86,7 @@ partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition, offlinePartitionSelector) } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) }catch { case e => error("Error while moving some partitions to the online state", e) } @@ -105,7 +105,7 @@ partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) }catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) } Index: core/src/main/scala/kafka/controller/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/controller/KafkaController.scala (revision 1402398) +++ core/src/main/scala/kafka/controller/KafkaController.scala (working copy) @@ -177,7 +177,7 @@ } } } - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(","))) Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (revision 1402398) +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (working copy) @@ -83,7 +83,7 @@ try { brokerRequestBatch.newBatch() replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) - brokerRequestBatch.sendRequestsToBrokers() + brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) }catch { case e => error("Error while moving some replicas to %s state".format(targetState), e) } Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala =================================================================== --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala (revision 1402398) +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala (working copy) @@ -183,11 +183,11 @@ } } - def sendRequestsToBrokers() { + def sendRequestsToBrokers(liveBrokers: Set[Broker]) { leaderAndIsrRequestMap.foreach { m => val broker = m._1 val partitionStateInfos = m._2 - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, liveBrokers) debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1402398) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -16,7 +16,7 @@ */ package kafka.server -import kafka.cluster.{Partition, Replica} +import kafka.cluster.{Broker, Partition, Replica} import collection._ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean @@ -173,7 +173,7 @@ if(requestedLeaderId == config.brokerId) makeLeader(topic, partitionId, partitionStateInfo) else - makeFollower(topic, partitionId, partitionStateInfo) + makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.liveBrokers) } catch { case e => error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) @@ -201,7 +201,7 @@ val leaderAndIsr = partitionStateInfo.leaderAndIsr info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) { + if (partition.makeLeader(topic, partitionId, leaderAndIsr)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -210,14 +210,14 @@ info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId)) } - private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) { + private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) { val leaderAndIsr = partitionStateInfo.leaderAndIsr val leaderBrokerId: Int = leaderAndIsr.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(leaderBrokerId, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) { + if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (revision 1402398) +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (working copy) @@ -23,6 +23,7 @@ import kafka.api.ApiUtils._ import collection.mutable.Map import collection.mutable.HashMap +import kafka.cluster.Broker object LeaderAndIsr { @@ -92,7 +93,13 @@ partitionStateInfos.put((topic, partition), partitionStateInfo) } - new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos) + + val brokersCount = buffer.getInt + var liveBrokers = Set[Broker]() + for (i <- 0 until brokersCount) + liveBrokers += Broker.readFrom(buffer) + + new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, liveBrokers) } } @@ -100,11 +107,12 @@ case class LeaderAndIsrRequest (versionId: Short, clientId: String, ackTimeoutMs: Int, - partitionStateInfos: Map[(String, Int), PartitionStateInfo]) + partitionStateInfos: Map[(String, Int), PartitionStateInfo], + liveBrokers: Set[Broker]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = { - this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos) + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = { + this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers) } def writeTo(buffer: ByteBuffer) { @@ -117,12 +125,17 @@ buffer.putInt(key._2) value.writeTo(buffer) } + buffer.putInt(liveBrokers.size) + liveBrokers.foreach(_.writeTo(buffer)) } def sizeInBytes(): Int = { var size = 1 + 2 + (2 + clientId.length) + 4 + 4 for((key, value) <- partitionStateInfos) size += (2 + key._1.length) + 4 + value.sizeInBytes + size += 4 + for(broker <- liveBrokers) + size += broker.sizeInBytes size } } \ No newline at end of file