diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5c9307d..fe0877f 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -28,7 +28,9 @@ import kafka.metrics.KafkaMetricsGroup import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import kafka.common._ +import scala.Some +import kafka.common.TopicAndPartition /** @@ -173,10 +175,11 @@ class Partition(val topic: String, /** * Make the local replica the follower by setting the new leader and ISR to empty + * Return the leader broker info from the list of leaders back to the caller */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - leaders: Set[Broker], correlationId: Int): Boolean = { + leaders: Set[Broker], correlationId: Int): Broker = { leaderIsrUpdateLock synchronized { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -186,23 +189,29 @@ class Partition(val topic: String, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 - leaders.find(_.id == newLeaderBrokerId) match { - case Some(leaderBroker) => - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - case None => // we should not come here - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] new leader %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - topic, partitionId, newLeaderBrokerId)) + val leaderBroker = leaders.find(_.id == newLeaderBrokerId).getOrElse(null) + + if (leaderBroker == null) { + // We could come to here if the leaderAndIsr request received is stale but with + // the updated leader epoch, in this case an exception is thrown to let the replica manager + // not to continue processing this request + throw new LeaderNotAvailableException(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + topic, partitionId, newLeaderBrokerId)) } - true + else { + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + } + + leaderBroker } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 242c18d..fb64f22 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -214,9 +214,9 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %s with correlation id %d since its controller epoch %d is old." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { @@ -235,17 +235,17 @@ class ReplicaManager(val config: KafkaConfig, if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo) else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") + .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, + topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) } } else { // Otherwise record the error code in response - stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + + "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d") .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) + topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } @@ -344,75 +344,72 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - partitionState.foreach(state => - stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) - - for (partition <- partitionState.keys) - responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) - - try { - leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet + if (isShuttingDown.get()) { + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } + } + else { + partitionState.foreach(state => + stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-follower transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + for (partition <- partitionState.keys) + responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + try { + leaderPartitionsLock synchronized { + leaderPartitions --= partitionState.keySet + } - logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => - new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark - }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + - "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) - } - if (!isShuttingDown.get()) { val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() - partitionState.foreach { - case (partition, partitionStateInfo) => - val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == leader) match { - case Some(leaderBroker) => - partitionAndOffsets.put(new TopicAndPartition(partition), - BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) - case None => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) - } + + partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => + val leaderBroker = partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId) + partitionAndOffsets.put(new TopicAndPartition(partition), + BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) + } + + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) } + + logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => + new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark + }) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, + TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) + } + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) - } - else { partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + stateChangeLogger.trace(("Broker %d started fetchers to new leader as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) } + } catch { + case e: LeaderNotAvailableException => stateChangeLogger.warn(e.getMessage) + case e2: Throwable => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e2) + // Re-throw the exception for it to be caught in KafkaApis + throw e2 } - } catch { - case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + - "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) - stateChangeLogger.error(errorMsg, e) - // Re-throw the exception for it to be caught in KafkaApis - throw e - } - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + } } }