diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index e9b4dc6..fea85d4 100644 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -1336,7 +1336,13 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle } } -object ControllerStats extends KafkaMetricsGroup { +class ControllerStats() extends KafkaMetricsGroup { val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) } + +object ControllerStats extends KafkaMetricsGroup { + val controllerStats = new ControllerStats() + + def getControllerStats():ControllerStats = controllerStats +} diff --git core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 4a31c72..013d882 100644 --- core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -76,7 +76,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => - ControllerStats.uncleanLeaderElectionRate.mark() + ControllerStats.getControllerStats().uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 3e87e1d..8466554 100644 --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -355,7 +355,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { - ControllerStats.leaderElectionTimer.time { + ControllerStats.getControllerStats().leaderElectionTimer.time { try { val curBrokerIds = currentBrokerList.map(_.toInt).toSet val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds diff --git core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/server/KafkaServer.scala index 426e522..6d6138b 100644 --- core/src/main/scala/kafka/server/KafkaServer.scala +++ core/src/main/scala/kafka/server/KafkaServer.scala @@ -203,8 +203,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg */ private def registerStats() { BrokerTopicStats.getBrokerAllTopicsStats() - ControllerStats.uncleanLeaderElectionRate - ControllerStats.leaderElectionTimer + ControllerStats.getControllerStats() } /**