diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3..d7e4956 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -33,7 +33,6 @@ import util.Random * Helper functions common to clients (producer, consumer, or admin) */ object ClientUtils extends Logging{ - /** * Used by the producer to send a metadata request since it has access to the ProducerConfig * @param topics The topics for which the metadata needs to be fetched diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..9d9b346 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -17,20 +17,20 @@ package kafka.consumer -import org.I0Itec.zkclient.ZkClient -import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} -import kafka.cluster.{Cluster, Broker} -import scala.collection.immutable -import scala.collection.Map -import collection.mutable.HashMap -import scala.collection.mutable +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock + +import kafka.client.ClientUtils +import kafka.cluster.{Broker, Cluster} +import kafka.common.TopicAndPartition +import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset} import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} -import kafka.common.TopicAndPartition -import kafka.client.ClientUtils -import java.util.concurrent.atomic.AtomicInteger +import org.I0Itec.zkclient.ZkClient + +import scala.collection.{immutable, mutable} +import scala.collection.mutable.HashMap /** * Usage: @@ -39,9 +39,10 @@ import java.util.concurrent.atomic.AtomicInteger */ class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, - private val zkClient : ZkClient) + private val zkClient : ZkClient, + private val metricOwner: Any = null) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), - config.clientId, config.numConsumerFetchers) { + config.clientId, config.numConsumerFetchers, metricOwner) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..f85677e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,12 +17,13 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging, Utils} -import java.util.concurrent.{TimeUnit, BlockingQueue} -import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference -import kafka.message.{MessageAndOffset, MessageAndMetadata} +import java.util.concurrent.{BlockingQueue, TimeUnit} + import kafka.common.{KafkaException, MessageSizeTooLargeException} +import kafka.message.{MessageAndMetadata, MessageAndOffset} +import kafka.serializer.Decoder +import kafka.utils.{IteratorTemplate, Logging} /** @@ -34,13 +35,14 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String, + val metricOwner : Any = null) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L - private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) + private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId, metricOwner) override def next(): MessageAndMetadata[K, V] = { val item = super.next() @@ -49,7 +51,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerTopicStats(topic, metricOwner).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } @@ -110,6 +112,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk current.set(null) } } + } class ConsumerTimeoutException() extends RuntimeException() diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index ff5f470..3a688a4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -23,24 +23,24 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.ClientIdAndTopic @threadsafe -class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) +class ConsumerTopicMetrics(metricId: ClientIdAndTopic, metricOwner : Any) extends KafkaMetricsGroup { + val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) } /** * Tracks metrics for each topic the given consumer client has consumed data from. * @param clientId The clientId of the given consumer client. */ -class ConsumerTopicStats(clientId: String) extends Logging { - private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) - private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) - private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics +class ConsumerTopicStats(clientId: String, metricOwner : Any) extends Logging { + private val valueFactory = ((k: ClientIdAndTopic, metricOwner: Any) => new ConsumerTopicMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndTopic, Any), ConsumerTopicMetrics](Some(valueFactory)) + private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"), metricOwner) // to differentiate from a topic named AllTopics def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats - def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + def getConsumerTopicStats(topic: String, metricOwner: Any): ConsumerTopicMetrics = { + stats.getAndMaybePut((new ClientIdAndTopic(clientId, topic + "-"), metricOwner)) } } @@ -48,10 +48,14 @@ class ConsumerTopicStats(clientId: String) extends Logging { * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map. */ object ConsumerTopicStatsRegistry { - private val valueFactory = (k: String) => new ConsumerTopicStats(k) - private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ConsumerTopicStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ConsumerTopicStats](Some(valueFactory)) - def getConsumerTopicStat(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getConsumerTopicStat(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeConsumerTopicStat(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 875eeeb..4afb57f 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -22,24 +22,24 @@ import kafka.utils.Pool import java.util.concurrent.TimeUnit import kafka.common.ClientIdAndBroker -class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "FetchResponseSize") +class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, metricOwner)) + val requestSizeHist = newHistogram(metricId + "FetchResponseSize", true, metricOwner) } /** * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers. * @param clientId ClientId of the given consumer */ -class FetchRequestAndResponseStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k) - private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) - private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) +class FetchRequestAndResponseStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndBroker, metricOwner: Any) => new FetchRequestAndResponseMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndBroker, Any), FetchRequestAndResponseMetrics](Some(valueFactory)) + private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"), metricOwner) def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + stats.getAndMaybePut(((new ClientIdAndBroker(clientId, brokerInfo + "-")), metricOwner)) } } @@ -47,11 +47,15 @@ class FetchRequestAndResponseStats(clientId: String) { * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map. */ object FetchRequestAndResponseStatsRegistry { - private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k) - private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new FetchRequestAndResponseStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), FetchRequestAndResponseStats](Some(valueFactory)) - def getFetchRequestAndResponseStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getFetchRequestAndResponseStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeFetchRequestAndResponseStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index 805e916..ad919d4 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String, + metricOwner: Any = null) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId, metricOwner) /** * Create an iterator over messages in the stream. @@ -48,4 +49,5 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], override def toString(): String = { "%s kafka stream".format(clientId) } + } diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 9c779ce..d7716e5 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -28,12 +28,13 @@ class PartitionTopicInfo(val topic: String, private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, private val fetchSize: AtomicInteger, - private val clientId: String) extends Logging { + private val clientId: String, + private val metricOwner: Any = null) extends Logging { debug("initial consumer offset of " + this + " is " + consumedOffset.get) debug("initial fetch offset of " + this + " is " + fetchedOffset.get) - private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) + private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId, metricOwner) def getConsumeOffset() = consumedOffset.get @@ -60,7 +61,7 @@ class PartitionTopicInfo(val topic: String, chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) debug("updated fetch offset of (%s) to %d".format(this, next)) - consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size) + consumerTopicStats.getConsumerTopicStats(topic, metricOwner).byteRate.mark(size) consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size) } else if(messages.sizeInBytes > 0) { chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..bfbc27d 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,6 +18,7 @@ package kafka.consumer import kafka.api._ +import kafka.metrics.KafkaMetricsGroup import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} @@ -30,13 +31,13 @@ class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, val bufferSize: Int, - val clientId: String) extends Logging { + val clientId: String) extends Logging with KafkaMetricsGroup { ConsumerConfig.validateClientId(clientId) private val lock = new Object() private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) - private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId, this) private var isClosed = false private def connect(): BlockingChannel = { @@ -57,6 +58,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { + removeAllMetrics(this) + FetchRequestAndResponseStatsRegistry.removeFetchRequestAndResponseStats(clientId, this) disconnect() isClosed = true } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 65f518d..1e6569a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -17,28 +17,28 @@ package kafka.consumer +import java.net.InetAddress +import java.util.UUID import java.util.concurrent._ import java.util.concurrent.atomic._ -import locks.ReentrantLock -import collection._ +import java.util.concurrent.locks.ReentrantLock + +import com.yammer.metrics.core.Gauge +import kafka.api._ +import kafka.client.ClientUtils import kafka.cluster._ -import kafka.utils._ -import org.I0Itec.zkclient.exception.ZkNodeExistsException -import java.net.InetAddress -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} -import org.apache.zookeeper.Watcher.Event.KeeperState -import java.util.UUID -import kafka.serializer._ -import kafka.utils.ZkUtils._ -import kafka.utils.Utils.inLock import kafka.common._ -import com.yammer.metrics.core.Gauge import kafka.metrics._ import kafka.network.BlockingChannel -import kafka.client.ClientUtils -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition +import kafka.serializer._ +import kafka.utils.Utils.inLock +import kafka.utils.ZkUtils._ +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} +import org.apache.zookeeper.Watcher.Event.KeeperState + +import scala.collection._ /** @@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null // useful for tracking migration of consumers to store offsets in kafka - private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) - private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) - private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, this) + private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, this) + private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, this)) val consumerIdString = { var consumerUuid : String = null @@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def createFetcher() { if (enableFetcher) - fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) + fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient, this)) } private def connectZk() { @@ -181,6 +181,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def shutdown() { + removeAllMetrics(this) + ConsumerTopicStatsRegistry.removeConsumerTopicStat(config.clientId, this) val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { info("ZKConsumerConnector shutting down") @@ -228,7 +230,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId, this) (queue, stream) }) ).flatten.toList @@ -511,7 +513,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } class ZKRebalancerListener(val group: String, val consumerIdString: String, - val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) + val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]], + val metricOwner: Any) extends IZkChildListener { private var isWatcherTriggered = false private val lock = new ReentrantLock @@ -819,7 +822,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, consumedOffset, fetchedOffset, new AtomicInteger(config.fetchMessageMaxBytes), - config.clientId) + config.clientId, + metricOwner) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) checkpointedOffsets.put(TopicAndPartition(topic, partition), offset) @@ -836,7 +840,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (loadBalancerListener == null) { val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]], this) } // create listener for session expired event if not exist yet @@ -884,7 +888,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { def value = q.size - } + }, this ) }) diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index a20ab90..e880a4a 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -18,14 +18,17 @@ package kafka.metrics -import com.yammer.metrics.core.{Gauge, MetricName} -import kafka.utils.Logging import java.util.concurrent.TimeUnit + import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.utils.{Logging, Pool} +import scala.collection._ -trait KafkaMetricsGroup extends Logging { +trait KafkaMetricsGroup extends Logging { + val metricRegistry = KafkaMetricsGroup.kafkaMetricRegistry; /** * Creates a new MetricName object for gauges, meters, etc. created for this * metrics group. @@ -39,16 +42,62 @@ trait KafkaMetricsGroup extends Logging { new MetricName(pkg, simpleName, name) } - def newGauge[T](name: String, metric: Gauge[T]) = - Metrics.defaultRegistry().newGauge(metricName(name), metric) + def newGauge[T](name: String, metric: Gauge[T], metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newGauge(metricName(name), metric) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added histogram for " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } + + def newMeter(name: String, eventType: String, timeUnit: TimeUnit, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newMeter(name: String, eventType: String, timeUnit: TimeUnit) = - Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) + def newHistogram(name: String, biased: Boolean = true, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newHistogram(metricName(name), biased) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newHistogram(name: String, biased: Boolean = true) = - Metrics.defaultRegistry().newHistogram(metricName(name), biased) + def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, metricOwner : Any = null) = { + val meter = Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + metricRegistry.getAndMaybePut(metricOwner).synchronized { + metricRegistry.get(metricOwner) += metricName(name) + } + debug("Added " + metricOwner + " to metricMap, map size = " + metricRegistry.size) + meter + } - def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = - Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + def removeMetric(name: String) { + Metrics.defaultRegistry().removeMetric(metricName(name)) + } + def removeAllMetrics(metricOwner : Any) { + metricRegistry.getAndMaybePut(metricOwner).synchronized { + if (metricRegistry.get(metricOwner) == null) { + debug("Metrics for " + metricOwner + " has already been removed.") + } else { + debug("Removing " + metricOwner + " from metricMap, map size = " + metricRegistry.size + "," + metricRegistry.get(metricOwner)) + metricRegistry.get(metricOwner).foreach(metric => Metrics.defaultRegistry().removeMetric(metric)) + metricRegistry.remove(metricOwner) + debug("Removed " + metricOwner + " from metricMap, map size = " + metricRegistry.size) + } + } + } } + +object KafkaMetricsGroup { + val valueFactory = (metricOwner: Any) => new mutable.HashSet[MetricName] + val kafkaMetricRegistry = new Pool[Any, mutable.HashSet[MetricName]](Some(valueFactory)) + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4798481..1ea7e20 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -16,19 +16,19 @@ */ package kafka.producer -import async.{DefaultEventHandler, ProducerSendThread, EventHandler} -import kafka.utils._ -import java.util.Random -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + import kafka.common.QueueFullException import kafka.metrics._ +import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} +import kafka.serializer.Encoder +import kafka.utils._ class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing - extends Logging { + extends Logging with KafkaMetricsGroup { private val hasShutdown = new AtomicBoolean(false) private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) @@ -50,7 +50,7 @@ class Producer[K,V](val config: ProducerConfig, producerSendThread.start() } - private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) + private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId, this) KafkaMetricsReporter.startReporters(config.props) @@ -126,6 +126,8 @@ class Producer[K,V](val config: ProducerConfig, val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") + removeAllMetrics(this) + ProducerTopicStatsRegistry.removeProducerTopicStats(config.clientId, this) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 9694220..1994952 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -21,24 +21,24 @@ import java.util.concurrent.TimeUnit import kafka.utils.Pool import kafka.common.ClientIdAndBroker -class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "ProducerRequestSize") +class ProducerRequestMetrics(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, metricOwner)) + val requestSizeHist = newHistogram(metricId + "ProducerRequestSize", true, metricOwner) } /** * Tracks metrics of requests made by a given producer client to all brokers. * @param clientId ClientId of the given producer */ -class ProducerRequestStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k) - private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory)) - private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) +class ProducerRequestStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndBroker, metricOwner: Any) => new ProducerRequestMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndBroker, Any), ProducerRequestMetrics](Some(valueFactory)) + private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"), metricOwner) def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + stats.getAndMaybePut((new ClientIdAndBroker(clientId, brokerInfo + "-"), metricOwner)) } } @@ -46,11 +46,15 @@ class ProducerRequestStats(clientId: String) { * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map. */ object ProducerRequestStatsRegistry { - private val valueFactory = (k: String) => new ProducerRequestStats(k) - private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerRequestStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ProducerRequestStats](Some(valueFactory)) - def getProducerRequestStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getProducerRequestStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerRequestStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index e1610d3..f493439 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -20,20 +20,24 @@ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit import kafka.utils.Pool -class ProducerStats(clientId: String) extends KafkaMetricsGroup { - val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) - val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS) - val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) +class ProducerStats(clientId: String, metricOwner: Any) extends KafkaMetricsGroup { + val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, metricOwner) + val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS, metricOwner) + val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, metricOwner) } /** * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map. */ object ProducerStatsRegistry { - private val valueFactory = (k: String) => new ProducerStats(k) - private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerStats(k, metricOwner)).tupled + private val statsRegistry = new Pool[(String, Any), ProducerStats](Some(valueFactory)) - def getProducerStats(clientId: String) = { - statsRegistry.getAndMaybePut(clientId) + def getProducerStats(clientId: String, metricOwner: Any) = { + statsRegistry.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerStats(clientId: String, metricOwner: Any) = { + statsRegistry.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index ed209f4..b757d26 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -23,25 +23,25 @@ import java.util.concurrent.TimeUnit @threadsafe -class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) - val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) +class ProducerTopicMetrics(metricId: ClientIdAndTopic, metricOwner: Any) extends KafkaMetricsGroup { + val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) + val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, metricOwner) } /** * Tracks metrics for each topic the given producer client has produced data to. * @param clientId The clientId of the given producer client. */ -class ProducerTopicStats(clientId: String) { - private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k) - private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory)) - private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics +class ProducerTopicStats(clientId: String, metricOwner: Any) { + private val valueFactory = ((k: ClientIdAndTopic, metricOwner: Any) => new ProducerTopicMetrics(k, metricOwner)).tupled + private val stats = new Pool[(ClientIdAndTopic, Any), ProducerTopicMetrics](Some(valueFactory)) + private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"), metricOwner) // to differentiate from a topic named AllTopics def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats def getProducerTopicStats(topic: String): ProducerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + stats.getAndMaybePut((new ClientIdAndTopic(clientId, topic + "-"), metricOwner)) } } @@ -49,10 +49,14 @@ class ProducerTopicStats(clientId: String) { * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map. */ object ProducerTopicStatsRegistry { - private val valueFactory = (k: String) => new ProducerTopicStats(k) - private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory)) + private val valueFactory = ((k: String, metricOwner: Any) => new ProducerTopicStats(k, metricOwner)).tupled + private val globalStats = new Pool[(String, Any), ProducerTopicStats](Some(valueFactory)) - def getProducerTopicStats(clientId: String) = { - globalStats.getAndMaybePut(clientId) + def getProducerTopicStats(clientId: String, metricOwner: Any) = { + globalStats.getAndMaybePut((clientId, metricOwner)) + } + + def removeProducerTopicStats(clientId: String, metricOwner: Any) = { + globalStats.remove((clientId, metricOwner)) } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 489f007..be39f1e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,10 +17,12 @@ package kafka.producer +import java.util.Random + import kafka.api._ +import kafka.metrics.KafkaMetricsGroup import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ -import java.util.Random object SyncProducer { val RequestKey: Short = 0 @@ -31,14 +33,14 @@ object SyncProducer { * Send a message set. */ @threadsafe -class SyncProducer(val config: SyncProducerConfig) extends Logging { +class SyncProducer(val config: SyncProducerConfig) extends Logging with KafkaMetricsGroup { private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.sendBufferBytes, config.requestTimeoutMs) val brokerInfo = "host_%s-port_%s".format(config.host, config.port) - val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) + val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId, this) trace("Instantiating Scala Sync Producer") @@ -116,6 +118,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { def close() = { lock synchronized { disconnect() + removeAllMetrics(this) + ProducerRequestStatsRegistry.removeProducerRequestStats(config.clientId, this) shutdown = true } } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..7cc0763 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -19,6 +19,7 @@ package kafka.producer.async import kafka.common._ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.metrics.KafkaMetricsGroup import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging, SystemTime} @@ -34,7 +35,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) - extends EventHandler[K,V] with Logging { + extends EventHandler[K,V] with Logging with KafkaMetricsGroup { val isSync = ("sync" == config.producerType) val correlationId = new AtomicInteger(0) @@ -45,8 +46,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val topicMetadataToRefresh = Set.empty[String] private val sendPartitionPerTopicCache = HashMap.empty[String, Int] - private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId) - private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) + private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId, this) + private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId, this) def handle(events: Seq[KeyedMessage[K,V]]) { val serializedData = serialize(events) @@ -332,5 +333,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, def close() { if (producerPool != null) producerPool.close + removeAllMetrics(this) + ProducerTopicStatsRegistry.removeProducerTopicStats(config.clientId, this) + ProducerStatsRegistry.removeProducerStats(config.clientId, this) } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 42e9c74..3d6c499 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -37,7 +37,7 @@ class ProducerSendThread[K,V](val threadName: String, newGauge(clientId + "-ProducerQueueSize", new Gauge[Int] { def value = queue.size - }) + }, this) override def run { try { @@ -50,6 +50,7 @@ class ProducerSendThread[K,V](val threadName: String, } def shutdown = { + removeAllMetrics(this) info("Begin shutting down ProducerSendThread") queue.put(shutdownCommand) shutdownLatch.await diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 9390edf..16fd884 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge -abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) +abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1, metricOwner: Any = null) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] @@ -42,7 +42,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: curMaxThread.max(fetcherLagStatsEntry._2.lag) }).max(curMaxAll) }) - } + }, metricOwner ) newGauge( @@ -59,7 +59,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: }) } } - } + }, metricOwner ) private def getFetcherId(topic: String, partitionId: Int) : Int = { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..ea7fbe9 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -40,15 +40,15 @@ import java.util.concurrent.atomic.AtomicLong abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) - extends ShutdownableThread(name, isInterruptible) { + extends ShutdownableThread(name, isInterruptible) with KafkaMetricsGroup { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port) private val metricId = new ClientIdAndBroker(clientId, brokerInfo) - val fetcherStats = new FetcherStats(metricId) - val fetcherLagStats = new FetcherLagStats(metricId) + val fetcherStats = new FetcherStats(metricId, this) + val fetcherLagStats = new FetcherLagStats(metricId, this) val fetchRequestBuilder = new FetchRequestBuilder(). clientId(clientId). replicaId(fetcherBrokerId). @@ -69,6 +69,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def shutdown(){ super.shutdown() + removeAllMetrics(this) simpleConsumer.close() } @@ -203,13 +204,13 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } -class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { +class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition, metricOwner: Any) extends KafkaMetricsGroup { private[this] val lagVal = new AtomicLong(-1L) newGauge( metricId + "-ConsumerLag", new Gauge[Long] { def value = lagVal.get - } + }, metricOwner ) def lag_=(newLag: Long) { @@ -219,18 +220,18 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet def lag = lagVal.get } -class FetcherLagStats(metricId: ClientIdAndBroker) { - private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k) - val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) +class FetcherLagStats(metricId: ClientIdAndBroker, metricOwner: Any) { + private val valueFactory = ((k: ClientIdBrokerTopicPartition, metricOwner: Any) => new FetcherLagMetrics(k, metricOwner)).tupled + val stats = new Pool[(ClientIdBrokerTopicPartition, Any), FetcherLagMetrics](Some(valueFactory)) def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { - stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId)) + stats.getAndMaybePut((new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId), metricOwner)) } } -class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS) +class FetcherStats(metricId: ClientIdAndBroker, metricOwner: Any) extends KafkaMetricsGroup { + val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS, metricOwner) + val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS, metricOwner) } case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) { diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl=