diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5c9307d..d52e24e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -28,7 +28,9 @@ import kafka.metrics.KafkaMetricsGroup import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import kafka.common._ +import scala.Some +import kafka.common.TopicAndPartition /** @@ -173,10 +175,11 @@ class Partition(val topic: String, /** * Make the local replica the follower by setting the new leader and ISR to empty + * Return the leader broker info from the list of leaders back to the caller */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - leaders: Set[Broker], correlationId: Int): Boolean = { + correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -185,23 +188,15 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 - leaders.find(_.id == newLeaderBrokerId) match { - case Some(leaderBroker) => - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - case None => // we should not come here - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] new leader %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - topic, partitionId, newLeaderBrokerId)) - } + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + true } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 242c18d..364a252 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -214,9 +214,9 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %s with correlation id %d since its controller epoch %d is old." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { @@ -235,17 +235,17 @@ class ReplicaManager(val config: KafkaConfig, if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo) else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) } } else { // Otherwise record the error code in response - stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d") .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) + topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } @@ -357,47 +357,57 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitions --= partitionState.keySet } - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + val partitionsToMakeFollower = mutable.Map[Partition, BrokerAndInitialOffset]() - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => + // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + partitionState.foreach{ case (partition, partitionStateInfo) => + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader + leaders.find(_.id == newLeaderBrokerId) match { + case Some(leaderBroker) => + partition.makeFollower(controllerId, partitionStateInfo, correlationId) + partitionsToMakeFollower.put(partition, + BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) + case None => + // During controller failover, the controller always first sends the current partition state info + // before doing the leader election. So, it could happen that the leader broker is not present in the leaderAndIsrRequest. + // In this case, we should ignore the transition process for this partition + stateChangeLogger.warn(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + partition.topic, partition.partitionId, newLeaderBrokerId)) + } + } + + replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet.map(new TopicAndPartition(_))) + partitionsToMakeFollower.keys.foreach { partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => + logManager.truncateTo(partitionsToMakeFollower.map{ case (partition, brokerAndOffset) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + + partitionsToMakeFollower.keys.foreach { partition => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) + partition.topic, partition.partitionId, correlationId, controllerId, epoch)) } - if (!isShuttingDown.get()) { - val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() - partitionState.foreach { - case (partition, partitionStateInfo) => - val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == leader) match { - case Some(leaderBroker) => - partitionAndOffsets.put(new TopicAndPartition(partition), - BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) - case None => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) - } + + if (isShuttingDown.get()) { + partitionsToMakeFollower.keys.foreach { partition => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, partition.topic, partition.partitionId)) } - replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower.map{ case (partition, brokerAndOffset) => new TopicAndPartition(partition) -> brokerAndOffset}) + partitionsToMakeFollower.keys.foreach { partition => + stateChangeLogger.trace(("Broker %d started fetchers to new leader as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition [%s,%d]") + .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId)) } } } catch {