diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2f706c9..70d1b81 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -104,10 +104,14 @@ object ReassignPartitionsCommand extends Logging { } } } else if(options.has(topicsToMoveJsonFileOpt)) { + if(!options.has(brokerListOpt)) { + System.err.println("broker-list is required if topics-to-move-json-file is used") + parser.printHelpOn(System.err) + System.exit(1) + } val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) - val brokerList = options.valueOf(brokerListOpt) + val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) - val brokerListToReassign = brokerList.split(',') map (_.toInt) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -117,7 +121,6 @@ object ReassignPartitionsCommand extends Logging { topicInfo._2.head._2.size) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } - } else if (options.has(manualAssignmentJsonFileOpt)) { val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) @@ -175,8 +178,11 @@ object ReassignPartitionsCommand extends Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) if(assignedReplicas == newReplicas) ReassignmentCompleted - else + else { + println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" + + " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition)) ReassignmentFailed + } } } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d8078bd..02ccc17 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -20,7 +20,7 @@ import scala.collection._ import kafka.admin.AdminUtils import kafka.utils._ import java.lang.Object -import kafka.api.LeaderAndIsr +import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge @@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} /** @@ -131,25 +131,34 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + def removeReplica(replicaId: Int) { + assignedReplicaMap.remove(replicaId) + } + def getLeaderEpoch(): Int = { leaderIsrUpdateLock synchronized { return this.leaderEpoch } } - /** * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * and setting the new leader and ISR */ - def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, + partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { + val allReplicas = partitionStateInfo.allReplicas + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - + // add replicas that are new + allReplicas.foreach(replica => getOrCreateReplica(replica)) val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) // reset LogEndOffset for remote replicas assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas @@ -165,16 +174,24 @@ class Partition(val topic: String, /** * Make the local replica the follower by setting the new leader and ISR to empty */ - def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, + partitionStateInfo: PartitionStateInfo, + leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { + val allReplicas = partitionStateInfo.allReplicas + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val newLeaderBrokerId: Int = leaderAndIsr.leader + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - // record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion @@ -192,7 +209,13 @@ class Partition(val topic: String, def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { leaderIsrUpdateLock synchronized { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) - val replica = getOrCreateReplica(replicaId) + val replicaOpt = getReplica(replicaId) + if(!replicaOpt.isDefined) { + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + val replica = replicaOpt.get replica.logEndOffset = offset // check if this replica needs to be added to the ISR @@ -200,7 +223,11 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { + // For a replica to get added back to ISR, it has to satisfy 3 conditions- + // 1. It is not already in the ISR + // 2. It is part of the assigned replica list. See KAFKA-1097 + // 3. It's log end offset >= leader's highwatermark + if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" diff --git a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala new file mode 100644 index 0000000..409d112 --- /dev/null +++ b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class NotAssignedReplicaException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) +} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88d130f..510f42f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,6 +35,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import scala.Some import kafka.common.TopicAndPartition +import org.apache.log4j.Logger class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int, @@ -105,6 +106,7 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) @@ -359,17 +361,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Reassigning replicas for a partition goes through a few stages - * RAR = Reassigned replicas * AR = Original list of replicas for partition - * 1. Start new replicas RAR - AR. - * 2. Wait until new replicas are in sync with the leader - * 3. If the leader is not in RAR, elect a new leader from RAR - * 4. Stop old replicas AR - RAR - * 5. Write new AR - * 6. Remove partition from the /admin/reassign_partitions path + * 1. Write new AR = AR + RAR + * 2. Start new replicas RAR - AR. + * 3. Wait until new replicas are in sync with the leader + * 4. If the leader is not in RAR, elect a new leader from RAR + * 5. Stop old replicas AR - RAR + * 6. Write new AR = RAR + * 7. Remove partition from the /admin/reassign_partitions path */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { case true => + val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // mark the new replicas as online reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, @@ -378,9 +382,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check if current leader is in the new replicas list. If not, controller needs to trigger leader election moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) // stop older replicas - stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext) + stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) // write the new list of replicas for this partition in zookeeper - updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext) + updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext.newReplicas) // update the /admin/reassign_partitions path to remove this partition removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) @@ -390,8 +394,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case false => info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader") + val newReplicas = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet + val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet + // write the expanded list of replicas to zookeeper + updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) + // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), + newAndOldReplicas.toSeq) // start new replicas - startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext) + startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicas) info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader") } @@ -602,6 +613,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader + // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr + // request to the current or new leader. This will prevent it from adding the old replicas to the ISR + val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) @@ -613,6 +628,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case true => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) + // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas) case false => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) @@ -622,12 +639,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas + reassignedPartitionContext: ReassignedPartitionsContext, + oldReplicas: Set[Int]) { val topic = topicAndPartition.topic val partition = topicAndPartition.partition - // send stop replica state change request to the old replicas - val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // first move the replica to offline state (the controller removes it from the ISR) oldReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica) @@ -639,31 +654,44 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas + replicas: Seq[Int]) { val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic)) - partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas) + partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas) updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic) - info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(","))) + info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(","))) // update the assigned replica list after a successful zookeeper write - controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) - // stop watching the ISR changes for this partition - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), - controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas) } private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { + reassignedPartitionContext: ReassignedPartitionsContext, + newReplicas: Set[Int]) { // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned // replicas list - val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition) - val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas - val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq newReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica) } } + private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) { + brokerRequestBatch.newBatch() + updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer + // allows old replicas to enter ISR + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, + topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + + "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, + newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition)) + case None => // fail the reassignment + stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " + + "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, + newAssignedReplicas.mkString(","), topicAndPartition)) + } + } + private def registerReassignedPartitionsListener() = { zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) } @@ -677,6 +705,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { + // stop watching the ISR changes for this partition + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) // read the current list of reassigned partitions from zookeeper val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) // remove this partition from that list @@ -793,6 +824,52 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg finalLeaderIsrAndControllerEpoch } + /** + * Does not change leader or isr, but just increments the leader epoch + * @param topic topic + * @param partition partition + * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. + */ + private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = { + val topicAndPartition = TopicAndPartition(topic, partition) + debug("Updating leader epoch for partition %s.".format(topicAndPartition)) + var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None + var zkWriteCompleteOrUnnecessary = false + while (!zkWriteCompleteOrUnnecessary) { + // refresh leader and isr from zookeeper again + val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { + case Some(leaderIsrAndEpoch) => + val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) + // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded + // assigned replica list + val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, + leaderAndIsr.isr, leaderAndIsr.zkVersion + 1) + // update the new leadership decision in zookeeper or retry + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( + zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), + leaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) + if (updateSucceeded) + info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch)) + updateSucceeded + case None => + throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " + + "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition)) + true + } + } + finalLeaderIsrAndControllerEpoch + } + class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " @throws(classOf[Exception]) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 212c05d..c52225a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -165,6 +165,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) + // send stop replica command to the replica so that it stops fetching from the leader + brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7b8f89e..161f581 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ -import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest} +import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger @@ -143,7 +143,7 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = stopReplicaRequest.controllerEpoch val responseMap = new HashMap[(String, Int), Short] // First stop fetchers for all partitions, then stop the corresponding replicas - replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{ + replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map { case (topic, partition) => TopicAndPartition(topic, partition) }) for((topic, partitionId) <- stopReplicaRequest.partitions){ @@ -222,27 +222,27 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = leaderAndISRRequest.controllerEpoch // First check partition's leader epoch - val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]() + val partitionState = new HashMap[Partition, PartitionStateInfo]() leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) val partitionLeaderEpoch = partition.getLeaderEpoch() if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch) + partitionState.put(partition, partitionStateInfo) } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition, partitionLeaderEpoch)) + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } - val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch - .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} - val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys) + val partitionsTobeLeader = partitionState + .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} + val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys) if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) @@ -271,28 +271,30 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], + private def makeLeaders(controllerId: Int, epoch: Int, + partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partitions %s") .format(localBrokerId, correlationId, controllerId, epoch, - partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) - for (partition <- partitionLeaderISRAndControllerEpochs.keys) + for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { // First stop fetchers for all the partitions - replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) // Update the partition information to be the leader - partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)} + partitionState.foreach{ case (partition, partitionStateInfo) => + partition.makeLeader(controllerId, partitionStateInfo, correlationId)} // Finally add these partitions to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { - leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet + leaderPartitions ++= partitionState.keySet } } catch { case e: Throwable => @@ -305,7 +307,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-leader transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } /* @@ -325,30 +328,33 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], + private def makeFollowers(controllerId: Int, epoch: Int, + partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partitions %s") .format(localBrokerId, correlationId, controllerId, epoch, - partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) - for (partition <- partitionLeaderISRAndControllerEpochs.keys) + for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { - replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) - logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => + logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) if (!isShuttingDown.get()) { - replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => - new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)} + replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => + new TopicAndPartition(partition) -> + BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, + partition.getReplica().get.logEndOffset)} ) } else { @@ -357,10 +363,11 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, epoch)) } - partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => + partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} leaderPartitionsLock synchronized { - leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet + leaderPartitions --= partitionState.keySet } } catch { case e: Throwable => @@ -373,7 +380,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-follower transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } private def maybeShrinkIsr(): Unit = { diff --git a/kafka-patch-review.py b/kafka-patch-review.py index daf2c35..7fa6cb5 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -95,12 +95,12 @@ def main(): comment="Created reviewboard " if not opt.reviewboard: - print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch + print 'Created a new reviewboard ',rb_url, else: - print 'Updated reviewboard',opt.reviewboard + print 'Updated reviewboard' comment="Updated reviewboard " - comment = comment + rb_url + comment = comment + rb_url + ' against branch ' + opt.branch jira.add_comment(opt.jira, comment) if __name__ == '__main__': diff --git a/project/Build.scala b/project/Build.scala index bcd1ca5..40e0c4f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -44,7 +44,7 @@ object KafkaBuild extends Build { crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"), scalaVersion := "2.8.0", - version := "0.8.0", + version := "0.8.1", publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), credentials += Credentials(Path.userHome / ".m2" / ".credentials"), buildNumber := System.getProperty("build.number", ""),