diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1087a2e..f665dcd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -30,6 +30,8 @@ import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} import java.io.IOException +import scala.Some +import kafka.common.TopicAndPartition /** @@ -193,7 +195,7 @@ class Partition(val topic: String, */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - leaders: Set[Broker], correlationId: Int): Boolean = { + correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -202,23 +204,15 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 - leaders.find(_.id == newLeaderBrokerId) match { - case Some(leaderBroker) => - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - case None => // we should not come here - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] new leader %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - topic, partitionId, newLeaderBrokerId)) - } + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + true } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f9d10d3..6e604ff 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,21 +16,21 @@ */ package kafka.server -import kafka.cluster.{Broker, Partition, Replica} import collection._ import mutable.HashMap -import org.I0Itec.zkclient.ZkClient -import java.io.{File, IOException} -import java.util.concurrent.atomic.AtomicBoolean +import kafka.cluster.{Broker, Partition, Replica} import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge -import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController import org.apache.log4j.Logger +import org.I0Itec.zkclient.ZkClient +import com.yammer.metrics.core.Gauge +import java.util.concurrent.atomic.AtomicBoolean +import java.io.{IOException, File} +import java.util.concurrent.TimeUnit object ReplicaManager { @@ -214,9 +214,9 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %s with correlation id %d since its controller epoch %d is old." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { @@ -235,17 +235,17 @@ class ReplicaManager(val config: KafkaConfig, if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo) else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) } } else { // Otherwise record the error code in response - stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } @@ -357,48 +357,60 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitions --= partitionState.keySet } - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + var partitionsToMakeFollower: Set[Partition] = Set() - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => + // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + partitionState.foreach{ case (partition, partitionStateInfo) => + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader + leaders.find(_.id == newLeaderBrokerId) match { + case Some(leaderBroker) => + partition.makeFollower(controllerId, partitionStateInfo, correlationId) + partitionsToMakeFollower += partition + case None => + // During controller failover, the controller always first sends the current partition state info + // before doing the leader election. So, it could happen that the leader broker is not present in the leaderAndIsrRequest. + // In this case, we should ignore the transition process for this partition + stateChangeLogger.warn(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + partition.topic, partition.partitionId, newLeaderBrokerId)) + } + } + + replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) + partitionsToMakeFollower.foreach(partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) + ) - logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => - new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark - }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + + logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) + + partitionsToMakeFollower.foreach(partition => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) - } - if (!isShuttingDown.get()) { - val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() - partitionState.foreach { - case (partition, partitionStateInfo) => - val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == leader) match { - case Some(leaderBroker) => - partitionAndOffsets.put(new TopicAndPartition(partition), - BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) - case None => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) - } - } - replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) + partition.topic, partition.partitionId, correlationId, controllerId, epoch)) + ) + + if (isShuttingDown.get()) { + partitionsToMakeFollower.foreach(partition => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, partition.topic, partition.partitionId)) + ) } else { - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + // we do not need to check if the leader exists again since this has been done at the beginning of this process + val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => + new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + + partitionsToMakeFollower.foreach(partition => + stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition [%s,%d]") + .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId)) + ) } } catch { case e: Throwable => @@ -409,11 +421,11 @@ class ReplicaManager(val config: KafkaConfig, throw e } - partitionState.foreach { state => + partitionState.foreach(state => stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-follower transition for partition %s") .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + ) } private def maybeShrinkIsr(): Unit = {