From 7adaca1a89fdacbd43641bf425afef9e9fd6d137 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 7 Aug 2014 14:24:43 -0700 Subject: [PATCH] Followed Jun's proposal to remove the metrics based on regex. --- .../scala/kafka/consumer/ConsumerTopicStats.scala | 4 ++ .../consumer/FetchRequestAndResponseStats.scala | 16 +++++- .../consumer/ZookeeperConsumerConnector.scala | 37 +++++++------ .../scala/kafka/metrics/KafkaMetricsGroup.scala | 64 +++++++++++++++++++++- core/src/main/scala/kafka/producer/Producer.scala | 13 +++-- .../kafka/producer/ProducerRequestStats.scala | 4 ++ .../main/scala/kafka/producer/ProducerStats.scala | 4 ++ .../scala/kafka/producer/ProducerTopicStats.scala | 4 ++ 8 files changed, 119 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index ff5f470..f63e6c5 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry { def getConsumerTopicStat(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeConsumerTopicStat(clientId: String) { + globalStats.remove(clientId) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 875eeeb..93372d5 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -17,10 +17,11 @@ package kafka.consumer -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import kafka.utils.Pool import java.util.concurrent.TimeUnit + import kafka.common.ClientIdAndBroker +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} +import kafka.utils.Pool class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) @@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry { def getFetchRequestAndResponseStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeFetchRequestAndResponseStats(clientId: String) { + val pattern = (".*" + clientId + ".*").r + val keys = globalStats.keys + for (key <- keys) { + pattern.findFirstIn(key) match { + case Some(_) => globalStats.remove(key) + case _ => + } + } + } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d..8436452 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -17,28 +17,28 @@ package kafka.consumer +import java.net.InetAddress +import java.util.UUID import java.util.concurrent._ import java.util.concurrent.atomic._ -import locks.ReentrantLock -import collection._ +import java.util.concurrent.locks.ReentrantLock + +import com.yammer.metrics.core.Gauge +import kafka.api._ +import kafka.client.ClientUtils import kafka.cluster._ -import kafka.utils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import java.net.InetAddress -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.UUID -import kafka.serializer._ -import kafka.utils.ZkUtils._ -import kafka.utils.Utils.inLock import kafka.common._ -import com.yammer.metrics.core.Gauge import kafka.metrics._ import kafka.network.BlockingChannel -import kafka.client.ClientUtils -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition +import kafka.serializer._ +import kafka.utils.Utils.inLock +import kafka.utils.ZkUtils._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +import scala.collection._ /** @@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { info("ZKConsumerConnector shutting down") - + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllMetrics(config.clientId) rebalanceLock synchronized { if (wildcardTopicWatcher != null) wildcardTopicWatcher.shutdown() @@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case e: Throwable => fatal("error during consumer connector shutdown", e) } - info("ZKConsumerConnector shut down completed") + info("ZKConsumerConnector shut down completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index a20ab90..193becd 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -18,10 +18,15 @@ package kafka.metrics -import com.yammer.metrics.core.{Gauge, MetricName} -import kafka.utils.Logging import java.util.concurrent.TimeUnit + import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.consumer.{FetchRequestAndResponseStatsRegistry, ConsumerTopicStatsRegistry} +import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry} +import kafka.utils.Logging + +import scala.collection.immutable trait KafkaMetricsGroup extends Logging { @@ -51,4 +56,59 @@ trait KafkaMetricsGroup extends Logging { def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + def removeMetric(name: String) = + Metrics.defaultRegistry().removeMetric(metricName(name)) + + +} + +object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { + + private val metricNameSet: immutable.HashSet[String] = immutable.HashSet( + "FetchQueueSize", + "ProducerQueueSize", + "MaxLag", + "MinFetchRate", + "ConsumerLag", + "MessagesPerSec", + "BytesPerSec", + "KafkaCommitsPerSec", + "ZookeeperCommitsPerSec", + "SerializationErrorsPerSec", + "ResendPerSec", + "FailedSendPerSec", + "RequestsPerSec", + "BytesPerSec", + "FetchResponseSize", + "ProducerRequestSize", + "FetchRequestRateAndTimeMs", + "FetchResponseSize", + "ProducerRequestRateAndTimeMs", + "KafkaCommitsPerSec", + "ZooKeeperCommitsPerSec", + "RebalanceRateAndTime" + ) + + def removeAllMetrics(clientId: String) { + KafkaMetricsGroup.metricNameSet.foreach(metric => { + val pattern = (".*" + clientId + ".*" + metric +".*").r + val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) + for (registeredMetric <- registeredMetrics) { + pattern.findFirstIn(registeredMetric.getName) match { + case Some(_) => { + val beforeRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + Metrics.defaultRegistry().removeMetric(registeredMetric) + val afterRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + trace("[%d -> %d] removing metric ".format(beforeRemoval, afterRemoval) + registeredMetric.toString) + } + case _ => + } + } + }) + FetchRequestAndResponseStatsRegistry.removeFetchRequestAndResponseStats(clientId) + ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) + ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) + ProducerTopicStatsRegistry.removeProducerTopicStats(clientId) + ProducerStatsRegistry.removeProducerStats(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..02814de 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,14 +16,14 @@ */ package kafka.producer -import async.{DefaultEventHandler, ProducerSendThread, EventHandler} -import kafka.utils._ -import java.util.Random -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + import kafka.common.QueueFullException import kafka.metrics._ +import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} +import kafka.serializer.Encoder +import kafka.utils._ class Producer[K,V](val config: ProducerConfig, @@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig, val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllMetrics(config.clientId) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close + info("Producer shut down completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 9694220..1c46d72 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry { def getProducerRequestStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerRequestStats(clientId: String) { + globalStats.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index e1610d3..35e3aae 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -36,4 +36,8 @@ object ProducerStatsRegistry { def getProducerStats(clientId: String) = { statsRegistry.getAndMaybePut(clientId) } + + def removeProducerStats(clientId: String) { + statsRegistry.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index ed209f4..9bb1419 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry { def getProducerTopicStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerTopicStats(clientId: String) { + globalStats.remove(clientId) + } } -- 1.7.12.4