diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1087a2e..91cd9ef 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -30,6 +30,8 @@ import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} import java.io.IOException +import scala.Some +import kafka.common.TopicAndPartition /** @@ -193,7 +195,7 @@ class Partition(val topic: String, */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - leaders: Set[Broker], correlationId: Int): Boolean = { + correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -202,23 +204,20 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 - leaders.find(_.id == newLeaderBrokerId) match { - case Some(leaderBroker) => - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - case None => // we should not come here - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] new leader %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - topic, partitionId, newLeaderBrokerId)) - } + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + + // If the leader replica id does not change from the current value, + // return false to indicate the replica manager + if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) + return false; + + leaderReplicaIdOpt = Some(newLeaderBrokerId) true } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 483559a..8e5433c 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -188,8 +188,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case Some(currLeaderIsrAndControllerEpoch) => controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + // send the shrunk ISR state change request to the rest of the replicas of the partition + val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) replicaState.put((topic, partition, replicaId), OfflineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f9d10d3..e2c1f8b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,21 +16,21 @@ */ package kafka.server -import kafka.cluster.{Broker, Partition, Replica} import collection._ import mutable.HashMap -import org.I0Itec.zkclient.ZkClient -import java.io.{File, IOException} -import java.util.concurrent.atomic.AtomicBoolean +import kafka.cluster.{Broker, Partition, Replica} import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge -import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController import org.apache.log4j.Logger +import org.I0Itec.zkclient.ZkClient +import com.yammer.metrics.core.Gauge +import java.util.concurrent.atomic.AtomicBoolean +import java.io.{IOException, File} +import java.util.concurrent.TimeUnit object ReplicaManager { @@ -214,9 +214,9 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since its controller epoch %d is old." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { @@ -235,17 +235,17 @@ class ReplicaManager(val config: KafkaConfig, if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo) else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) } } else { // Otherwise record the error code in response - stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } @@ -344,10 +344,11 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - partitionState.foreach(state => + partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + } for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -357,47 +358,63 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitions --= partitionState.keySet } - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + var partitionsToMakeFollower: Set[Partition] = Set() - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => + // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + partitionState.foreach{ case (partition, partitionStateInfo) => + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader + leaders.find(_.id == newLeaderBrokerId) match { + case Some(leaderBroker) => + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) + partitionsToMakeFollower += partition + else + stateChangeLogger.info(("Broker %d skipped the rest of the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + partition.topic, partition.partitionId, newLeaderBrokerId)) + case None => + // The leader broker should always present in the leaderAndIsrRequest. + // If now, we should record the error message and abort the transition process for this partition + stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + partition.topic, partition.partitionId, newLeaderBrokerId)) + } + } + + replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) + partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => - new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark - }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + + logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) + + partitionsToMakeFollower.foreach { partition => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) + partition.topic, partition.partitionId, correlationId, controllerId, epoch)) } - if (!isShuttingDown.get()) { - val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() - partitionState.foreach { - case (partition, partitionStateInfo) => - val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == leader) match { - case Some(leaderBroker) => - partitionAndOffsets.put(new TopicAndPartition(partition), - BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) - case None => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) - } + + if (isShuttingDown.get()) { + partitionsToMakeFollower.foreach { partition => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, partition.topic, partition.partitionId)) } - replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + // we do not need to check if the leader exists again since this has been done at the beginning of this process + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + + partitionsToMakeFollower.foreach { partition => + stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition [%s,%d]") + .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId)) } } } catch {