diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88792c2..e2ad682 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,7 +28,7 @@ import kafka.common._ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ -import kafka.utils.{Json, Utils, ZkUtils, Logging} +import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler} import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} @@ -112,6 +112,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, config.brokerId) + // have a separate scheduler for the controller to be able to start and stop independently of the + // kafka server + private val autoRebalanceScheduler = new KafkaScheduler(1) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) @@ -250,6 +253,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg initializeAndMaybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + if (config.autoLeaderRebalanceEnable) { + info("starting the partition rebalance scheduler") + autoRebalanceScheduler.startup() + autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, + 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) + } } else info("Controller has been shut down, aborting startup/failover") @@ -456,7 +465,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { + def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean = true) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions @@ -464,7 +473,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { - removePartitionsFromPreferredReplicaElection(partitions) + removePartitionsFromPreferredReplicaElection(partitions, updateZk) } } @@ -493,6 +502,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg isRunning = false partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if (config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null @@ -731,7 +742,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { + def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], updateZK : Boolean) { for(partition <- partitionsToBeRemoved) { // check the status val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader @@ -742,7 +753,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } - ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) + if (updateZK) + ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -898,6 +910,47 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } } + + private def checkAndTriggerPartitionRebalance(): Unit = { + if (isActive()) { + info("checking need to trigger partition rebalance") + // get all the active brokers + var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null; + controllerContext.controllerLock synchronized { + preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head) + } + debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) + // for each broker, check if a preferred replica election needs to be triggered + preferredReplicasForTopicsByBrokers.foreach( brokerInfo => { + var imbalanceRatio: Double = 0 + var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null + controllerContext.controllerLock synchronized { + val brokerIds = controllerContext.liveBrokerIds + if (brokerIds.contains(brokerInfo._1) && + controllerContext.partitionsBeingReassigned.size == 0) { + // do this check only if the broker is live and there are no partitions being reassigned currently + topicsNotInPreferredReplica = + brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != brokerInfo._1); + debug("topics not in preferred replica " + topicsNotInPreferredReplica) + val totalTopicPartitionsForBroker = brokerInfo._2.size + val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size + imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker + info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio)) + } + } + // check ratio and if greater than desired ratio, trigger a rebalance for the topics + // that need to be on this broker + if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + topicsNotInPreferredReplica.foreach(topicPartition => { + controllerContext.controllerLock synchronized { + onPreferredReplicaElection(Set(topicPartition._1), false) + } + }) + } + } + ) + } + } } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b324344..921f456 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -229,6 +229,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the purge interval (in number of requests) of the producer request purgatory */ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) + /* Enables auto leader balancing. A background thread checks and triggers leader + * balance if required at regular intervals */ + val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", false) + + /* the ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above + * this value per broker. The value is specified in percentage. */ + val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage", 10) + + /* the frequency with which the partition rebalance check is triggered by the controller */ + val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) + + /*********** Controlled shutdown configuration ***********/ /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */