From 9a4371a7f843c8c3b34b963e535427a850ca15d5 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Fri, 8 Aug 2014 18:41:23 -0700 Subject: [PATCH] Addressed Jun's comments. --- .../scala/kafka/consumer/ConsumerTopicStats.scala | 4 + .../consumer/FetchRequestAndResponseStats.scala | 16 ++- .../consumer/ZookeeperConsumerConnector.scala | 37 +++--- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 127 ++++++++++++++++++++- core/src/main/scala/kafka/producer/Producer.scala | 13 ++- .../kafka/producer/ProducerRequestStats.scala | 4 + .../main/scala/kafka/producer/ProducerStats.scala | 4 + .../scala/kafka/producer/ProducerTopicStats.scala | 4 + 8 files changed, 182 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index ff5f470..f63e6c5 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -54,4 +54,8 @@ object ConsumerTopicStatsRegistry { def getConsumerTopicStat(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeConsumerTopicStat(clientId: String) { + globalStats.remove(clientId) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 875eeeb..93372d5 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -17,10 +17,11 @@ package kafka.consumer -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import kafka.utils.Pool import java.util.concurrent.TimeUnit + import kafka.common.ClientIdAndBroker +import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} +import kafka.utils.Pool class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) @@ -53,6 +54,17 @@ object FetchRequestAndResponseStatsRegistry { def getFetchRequestAndResponseStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeFetchRequestAndResponseStats(clientId: String) { + val pattern = (".*" + clientId + ".*").r + val keys = globalStats.keys + for (key <- keys) { + pattern.findFirstIn(key) match { + case Some(_) => globalStats.remove(key) + case _ => + } + } + } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d..acfd064 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -17,28 +17,28 @@ package kafka.consumer +import java.net.InetAddress +import java.util.UUID import java.util.concurrent._ import java.util.concurrent.atomic._ -import locks.ReentrantLock -import collection._ +import java.util.concurrent.locks.ReentrantLock + +import com.yammer.metrics.core.Gauge +import kafka.api._ +import kafka.client.ClientUtils import kafka.cluster._ -import kafka.utils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import java.net.InetAddress -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.UUID -import kafka.serializer._ -import kafka.utils.ZkUtils._ -import kafka.utils.Utils.inLock import kafka.common._ -import com.yammer.metrics.core.Gauge import kafka.metrics._ import kafka.network.BlockingChannel -import kafka.client.ClientUtils -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition +import kafka.serializer._ +import kafka.utils.Utils.inLock +import kafka.utils.ZkUtils._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +import scala.collection._ /** @@ -184,7 +184,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { info("ZKConsumerConnector shutting down") - + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId) rebalanceLock synchronized { if (wildcardTopicWatcher != null) wildcardTopicWatcher.shutdown() @@ -208,7 +209,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case e: Throwable => fatal("error during consumer connector shutdown", e) } - info("ZKConsumerConnector shut down completed") + info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index a20ab90..081dc91 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -18,10 +18,15 @@ package kafka.metrics -import com.yammer.metrics.core.{Gauge, MetricName} -import kafka.utils.Logging import java.util.concurrent.TimeUnit + import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry} +import kafka.producer.{ProducerStatsRegistry, ProducerTopicStatsRegistry, ProducerRequestStatsRegistry} +import kafka.utils.Logging + +import scala.collection.immutable trait KafkaMetricsGroup extends Logging { @@ -51,4 +56,122 @@ trait KafkaMetricsGroup extends Logging { def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + def removeMetric(name: String) = + Metrics.defaultRegistry().removeMetric(metricName(name)) + +} + +object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { + /** + * To make sure all the metrics be de-registered after consumer/producer close, the metric names should be + * put into the metric name set. Some metric names might be shared by consumer, producer and server. In that case + * package names are used to select the correct metrics instance for de-registration. + */ + private val consumerMetricNameSet: immutable.HashSet[String] = immutable.HashSet( + // kafka.consumer.ZookeeperConsumerConnector + "FetchQueueSize", + "KafkaCommitsPerSec", + "ZookeeperCommitsPerSec", + "RebalanceRateAndTime", + + // kafak.server.AbstractFetcherManager + "MaxLag", + "MinFetchRate", + + // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread + "ConsumerLag", + + // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo} + "MessagesPerSec", + + // kafka.consumer.ConsumerTopicStats + // kafka.server.AbstractFetcherThread + "BytesPerSec", + + // kafka.server.{AbstractFetcherThread, KafkaRequestHandler} + // kafka.network.RequestChannel + "RequestsPerSec", + + // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleCosumer + "FetchResponseSize", + "FetchRequestRateAndTimeMs", + + /** + * ProducerRequestStats <-- SyncProducer + * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed. + */ + "ProducerRequestRateAndTimeMs", + "ProducerRequestSize" + ) + + private val producerMetricNameSet: immutable.HashSet[String] = immutable.HashSet ( + // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer + "SerializationErrorsPerSec", + "ResendsPerSec", + "FailedSendsPerSec", + + // kafka.producer.ProducerSendThread + "ProducerQueueSize", + + // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler} + "MessagesPerSec", // "DroppedMessagePerSec" is included in the "MessagePerSec" + "BytesPerSec", + + // kafka.producer.ProducerRequestStats <-- SyncProducer + "ProducerRequestRateAndTimeMs", + "ProducerRequestSize" + ) + + def removeAllConsumerMetrics(clientId: String) { + FetchRequestAndResponseStatsRegistry.removeFetchRequestAndResponseStats(clientId) + ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) + ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) + KafkaMetricsGroup.consumerMetricNameSet.foreach(metric => { + val pattern = (clientId + ".*" + metric +".*").r + val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) + for (registeredMetric <- registeredMetrics) { + /** For consumer, the FetcherStats and FetcherLagStats are in kafka.server package. + * Class name ProducerRequestMetrics is used to remove SyncProducer metrics while excluding other sensor names + * shared by consumer and producer. + */ + if ((registeredMetric.getGroup.startsWith("kafka.consumer")) || + registeredMetric.getGroup.startsWith("kafka.server") || + registeredMetric.getType.equals("ProducerRequestMetrics")) { + pattern.findFirstIn(registeredMetric.getName) match { + case Some(_) => { + val beforeRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + Metrics.defaultRegistry().removeMetric(registeredMetric) + val afterRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + trace("[%d -> %d] removing metric ".format(beforeRemoval, afterRemoval) + registeredMetric.toString) + } + case _ => + } + } + } + }) + + } + + def removeAllProducerMetrics(clientId: String) { + ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) + ProducerTopicStatsRegistry.removeProducerTopicStats(clientId) + ProducerStatsRegistry.removeProducerStats(clientId) + KafkaMetricsGroup.producerMetricNameSet.foreach(metric => { + val pattern = (clientId + ".*" + metric +".*").r + val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) + for (registeredMetric <- registeredMetrics) { + if (registeredMetric.getGroup.startsWith("kafka.producer")) { + pattern.findFirstIn(registeredMetric.getName) match { + case Some(_) => { + val beforeRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + Metrics.defaultRegistry().removeMetric(registeredMetric) + val afterRemoval = Metrics.defaultRegistry().allMetrics().keySet().size + trace("[%d -> %d] removing metric ".format(beforeRemoval, afterRemoval) + registeredMetric.toString) + } + case _ => + } + } + } + }) + } } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..cd634f6 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,14 +16,14 @@ */ package kafka.producer -import async.{DefaultEventHandler, ProducerSendThread, EventHandler} -import kafka.utils._ -import java.util.Random -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + import kafka.common.QueueFullException import kafka.metrics._ +import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} +import kafka.serializer.Encoder +import kafka.utils._ class Producer[K,V](val config: ProducerConfig, @@ -126,9 +126,12 @@ class Producer[K,V](val config: ProducerConfig, val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") + val startTime = System.nanoTime() + KafkaMetricsGroup.removeAllProducerMetrics(config.clientId) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close + info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 9694220..1c46d72 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -52,5 +52,9 @@ object ProducerRequestStatsRegistry { def getProducerRequestStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerRequestStats(clientId: String) { + globalStats.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index e1610d3..35e3aae 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -36,4 +36,8 @@ object ProducerStatsRegistry { def getProducerStats(clientId: String) = { statsRegistry.getAndMaybePut(clientId) } + + def removeProducerStats(clientId: String) { + statsRegistry.remove(clientId) + } } diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index ed209f4..9bb1419 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -55,4 +55,8 @@ object ProducerTopicStatsRegistry { def getProducerTopicStats(clientId: String) = { globalStats.getAndMaybePut(clientId) } + + def removeProducerTopicStats(clientId: String) { + globalStats.remove(clientId) + } } -- 1.7.12.4