diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 03ef9cf..3d87252 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -69,6 +69,26 @@ class ControllerContext(val zkClient: ZkClient, def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying def liveOrShuttingDownBrokers = liveBrokersUnderlying + + def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = { + partitionReplicaAssignment + .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) } + .map { case(topicAndPartition, replicas) => topicAndPartition } + .toSet + } + + def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { + brokerIds.map { brokerId => + partitionReplicaAssignment + .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) } + .map { case(topicAndPartition, replicas) => + new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) } + }.flatten.toSet + } + + def allLiveReplicas(): Set[PartitionAndReplica] = { + replicasOnBrokers(liveBrokerIds) + } } trait KafkaControllerMBean { @@ -190,13 +210,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(",")) } - val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized { - getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map { - case(topic, partition) => - val topicAndPartition = TopicAndPartition(topic, partition) - (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size) + val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] = + controllerContext.controllerLock synchronized { + controllerContext.partitionsOnBroker(id) + .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)) } - } allPartitionsAndReplicationFactorOnBroker.foreach { case(topicAndPartition, replicationFactor) => @@ -328,7 +346,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg sendUpdateMetadataRequest(newBrokers) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica) + replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica) // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() @@ -366,7 +384,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // handle dead replicas - replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica) + replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica) } /** @@ -399,57 +417,60 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /** * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener. - * Reassigning replicas for a partition goes through a few stages - + * Reassigning replicas for a partition goes through a few steps listed in the code. * RAR = Reassigned replicas - * AR = Original list of replicas for partition - * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send a LeaderAndIsr request with - * AR = AR + RAR to all replicas in (AR + RAR) - * 2. Start new replicas RAR - AR. - * 3. Wait until new replicas are in sync with the leader - * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr - * will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent. - * In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in - * RAR - AR back in the ISR - * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part - * of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to - * the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in - * RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR. - * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR. - * 7. Remove partition from the /admin/reassign_partitions path + * OAR = Original list of replicas for partition + * + * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK + * may go through the following transition. + * AR leader/isr + * {1,2,3} 1/{1,2,3} (initial state) + * {1,2,3,4,5,6} 1/{1,2,3} (step 2) + * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) + * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) + * {1,2,3,4,5,6} 4/{4,5,6} (step 8) + * {4,5,6} 4/{4,5,6} (step 10) + * + * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently. + * This way, if the controller crashes before that step, we can still recover. */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { case true => + //4. wait until replicas in RAR join isr val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet - // mark the new replicas as online + //5.replicas in RAR -> OnlineReplica reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica) } - // check if current leader is in the new replicas list. If not, controller needs to trigger leader election + //6.set AR to RAR in memory + //7.send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and + // a new assigned replica list (using RAR) and same isr to every broker in RAR moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) - // stop older replicas + //8.replicas in OAR - RAR -> Offline (force those replicas out of isr) + //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) - // write the new list of replicas for this partition in zookeeper + //10. update assigned replica list to RAR in ZK updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) - // update the /admin/reassign_partitions path to remove this partition + //11. update the /admin/reassign_partitions path in ZK to remove this partition removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) - // after electing leader, the replicas and isr information changes, so resend the update metadata request + //12. after electing leader, the replicas and isr information changes, so resend the update metadata request to every broker sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) case false => info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader") val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet - // write the expanded list of replicas to zookeeper + //1. update assigned replica list in ZK with OAR + RAR replicas updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) - // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + //2. send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR) updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq) - // start new replicas + //3. replicas in RAR - OAR -> NewReplica startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader") diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index a47b142..7cc41df 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -26,19 +26,20 @@ trait PartitionLeaderSelector { * @param topicAndPartition The topic and partition whose leader needs to be elected * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive - * @return The leader and isr request, with the newly selected leader info, to send to the brokers - * Also, returns the list of replicas the returned leader and isr request should be sent to - * This API selects a new leader for the input partition + * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive + * the LeaderAndIsrRequest. */ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) } /** - * This API selects a new leader for the input partition - - * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader - * 2. Else, it picks some alive broker from the assigned replica list as the new leader + * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): + * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live + * isr as the new isr. + * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException + * receiving replicas = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { @@ -82,7 +83,9 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten } /** - * Picks one of the alive in-sync reassigned replicas as the new leader. + * new leader = a live in-sync reassigned replica + * new isr = current isr + * receiving replicas = reassigned replicas */ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " @@ -94,7 +97,8 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && + currentLeaderAndIsr.isr.contains(r)) val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match { case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, @@ -106,16 +110,16 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) case _ => throw new StateChangeFailedException("None of the reassigned replicas for partition " + - "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) + "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) } } } } /** - * Picks the preferred replica as the new leader if - - * 1. It is already not the current leader - * 2. It is alive + * new leader = preferred (first assigned) replica (if in isr and alive); + * new isr = current isr; + * receiving replicas = assigned replicas */ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { @@ -145,8 +149,9 @@ with Logging { } /** - * Picks one of the alive replicas (other than the current leader) in ISR as - * new leader, fails if there are no other replicas in ISR. + * new leader = replica in isr that's not being shutdown; + * new isr = current isr - shutdown replica; + * receiving replicas = live assigned replicas */ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector @@ -164,8 +169,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) - val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader && - !controllerContext.shuttingDownBrokerIds.contains(brokerId)) + val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) val newLeaderOpt = newIsr.headOption newLeaderOpt match { case Some(newLeader) => @@ -174,8 +178,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => - throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" + - " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) + throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) } } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5859ce7..f7d53fd 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -119,7 +119,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { /** * This API exercises the partition's state machine. It ensures that every state transition happens from a legal - * previous state to the target state. + * previous state to the target state. Valid state transitions are: + * NonExistentPartition -> NewPartition: + * --load assigned replicas from ZK to controller cache + * + * NewPartition -> OnlinePartition + * --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition + * --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker + * + * OnlinePartition,OfflinePartition -> OnlinePartition + * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK + * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker + * + * NewPartition,OnlinePartition -> OfflinePartition + * --nothing other than marking partition state as Offline + * + * OfflinePartition -> NonExistentPartition + * --nothing other than marking the partition state as NonExistentPartition * @param topic The topic of the partition for which the state transition is invoked * @param partition The partition for which the state transition is invoked * @param targetState The end state that the partition should be moved to diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ad4ee53..483559a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -58,8 +58,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { initializeReplicaState() hasStarted.set(true) // move all Online replicas to Online - handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq, - controllerContext.liveBrokerIds.toSeq), OnlineReplica) + handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) } @@ -95,7 +94,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { /** * This API exercises the replica's state machine. It ensures that every state transition happens from a legal - * previous state to the target state. + * previous state to the target state. Valid state transitions are: + * NonExistentReplica --> NewReplica + * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker + * + * NewReplica -> OnlineReplica + * --add the new replica to the assigned replica list if needed + * + * OnlineReplica,OfflineReplica -> OnlineReplica + * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker + * + * NewReplica,OnlineReplica -> OfflineReplica + * --send StopReplicaRequest to the replica (w/o deletion) + * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker. + * + * OfflineReplica -> NonExistentReplica + * --send StopReplicaRequest to the replica (with deletion) + * * @param topic The topic of the replica for which the state transition is invoked * @param partition The partition of the replica for which the state transition is invoked * @param replicaId The replica for which the state transition is invoked @@ -228,20 +243,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } - private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = { - brokerIds.map { brokerId => - val partitionsAssignedToThisBroker = - controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId)) - if(partitionsAssignedToThisBroker.size == 0) - info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) - partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId)) - }.flatten.toSet - } - - def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = { - controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq - } - /** * This is the zookeeper listener that triggers all the state transitions for a replica */ diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 73902b2..b42e52b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -185,12 +185,6 @@ object ZkUtils extends Logging { } } - def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { - val replicas = getReplicasForPartition(zkClient, topic, partition) - debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas)) - replicas.contains(brokerId.toString) - } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString @@ -500,8 +494,6 @@ object ZkUtils extends Logging { client.exists(path) } - def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1) - def getCluster(zkClient: ZkClient) : Cluster = { val cluster = new Cluster val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath) @@ -571,17 +563,6 @@ object ZkUtils extends Logging { ret } - def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): - mutable.Map[(String, Int), Seq[Int]] = { - val ret = new mutable.HashMap[(String, Int), Seq[Int]] - for((topic, partitionAssignment) <- topicPartitionAssignment){ - for((partition, replicaAssignment) <- partitionAssignment){ - ret.put((topic, partition), replicaAssignment) - } - } - ret - } - def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = { getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap => val topic = topicAndPartitionMap._1 @@ -591,19 +572,6 @@ object ZkUtils extends Logging { } } - def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Seq[(String, Int)] = { - val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics) - topicsAndPartitions.map { topicAndPartitionMap => - val topic = topicAndPartitionMap._1 - val partitionMap = topicAndPartitionMap._2 - val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) ) - val relevantPartitions = relevantPartitionsMap.map(_._1) - for(relevantPartition <- relevantPartitions) yield { - (topic, relevantPartition) - } - }.flatten[(String, Int)].toSeq - } - def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext] = { // read the partitions and their new replica list val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1 @@ -677,17 +645,6 @@ object ZkUtils extends Logging { } } - def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = { - Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId => - // read all the partitions and their assigned replicas into a map organized by - // { replica id -> partition 1, partition 2... - val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId) - if(partitionsAssignedToThisBroker.size == 0) - info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) - partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId)) - }.flatten - } - def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = { // read the partitions and their new replica list val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1