diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a0267ae..f4f00b2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -546,7 +546,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { + def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions @@ -554,7 +554,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { - removePartitionsFromPreferredReplicaElection(partitions) + removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) } } @@ -823,7 +823,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { + def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], + isTriggeredByAutoRebalance : Boolean) { for(partition <- partitionsToBeRemoved) { // check the status val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader @@ -834,7 +835,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } - ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) + if (!isTriggeredByAutoRebalance) + ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -1018,26 +1020,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - controllerContext.controllerLock synchronized { - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && - controllerContext.partitionsBeingReassigned.size == 0 && - controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) { - val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath - val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) - val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) - try { - ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) - info("Created preferred replica election path with %s".format(jsonData)) - } catch { - case e2: ZkNodeExistsException => - val partitionsUndergoingPreferredReplicaElection = - PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) - error("Preferred replica leader election currently in progress for " + - "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)); - case e3: Throwable => - error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys)) + topicsNotInPreferredReplica.foreach { + case(topicPartition, replicas) => { + controllerContext.controllerLock synchronized { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.size == 0 && + controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) { + onPreferredReplicaElection(Set(topicPartition), false) + } } } }