diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 00a1f98..f12ffc2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -603,7 +603,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { + def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions @@ -612,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { - removePartitionsFromPreferredReplicaElection(partitions) + removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic)) } } @@ -914,7 +914,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { + def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], + isTriggeredByAutoRebalance : Boolean) { for(partition <- partitionsToBeRemoved) { // check the status val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader @@ -925,7 +926,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } - ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) + if (!isTriggeredByAutoRebalance) + ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -1090,6 +1092,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case(topicPartition, replicas) => { + controllerContext.partitionLeadershipInfo.contains(topicPartition) && controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker } } @@ -1102,26 +1105,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - inLock(controllerContext.controllerLock) { - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && - controllerContext.partitionsBeingReassigned.size == 0 && - controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) { - val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath - val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) - val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) - try { - ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) - info("Created preferred replica election path with %s".format(jsonData)) - } catch { - case e2: ZkNodeExistsException => - val partitionsUndergoingPreferredReplicaElection = - PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) - error("Preferred replica leader election currently in progress for " + - "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)); - case e3: Throwable => - error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys)) + topicsNotInPreferredReplica.foreach { + case(topicPartition, replicas) => { + inLock(controllerContext.controllerLock) { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.size == 0 && + controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && + !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && + !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) && + controllerContext.allTopics.contains(topicPartition.topic)) { + onPreferredReplicaElection(Set(topicPartition), false) + } } } }