From 83b05c778e71882175db6249e7297167ce953cfe Mon Sep 17 00:00:00 2001
From: wawanawna <sematex@mail.com>
Date: Thu, 30 Oct 2014 21:34:55 +0300
Subject: [PATCH] kafka-1481; JMX renaming

---
 core/src/main/scala/kafka/cluster/Partition.scala  |   6 +-
 .../scala/kafka/common/ClientIdAndBroker.scala     |  14 +-
 .../main/scala/kafka/common/ClientIdAndTopic.scala |  14 +-
 .../kafka/consumer/ConsumerFetcherThread.scala     |   2 +-
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |  18 ++-
 .../consumer/FetchRequestAndResponseStats.scala    |  27 ++--
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   5 +-
 .../consumer/ZookeeperConsumerConnector.scala      |  27 ++--
 core/src/main/scala/kafka/log/Log.scala            |  38 +++--
 .../scala/kafka/metrics/KafkaMetricsGroup.scala    | 139 +++++++++++-------
 .../main/scala/kafka/network/RequestChannel.scala  |  27 ++--
 .../main/scala/kafka/network/SocketServer.scala    |   2 +-
 .../kafka/producer/ProducerRequestStats.scala      |  23 +--
 .../main/scala/kafka/producer/ProducerStats.scala  |   9 +-
 .../scala/kafka/producer/ProducerTopicStats.scala  |  25 ++--
 .../main/scala/kafka/producer/SyncProducer.scala   |   5 +-
 .../kafka/producer/async/ProducerSendThread.scala  |   5 +-
 .../kafka/server/AbstractFetcherManager.scala      |  31 ++--
 .../scala/kafka/server/AbstractFetcherThread.scala |  36 +++--
 .../scala/kafka/server/DelayedRequestKey.scala     |   4 -
 .../scala/kafka/server/KafkaRequestHandler.scala   |  25 ++--
 .../kafka/server/ProducerRequestPurgatory.scala    |  22 ++-
 .../scala/unit/kafka/metrics/MetricsTest.scala     | 161 +++++++++++++++++++++
 23 files changed, 464 insertions(+), 201 deletions(-)
 create mode 100644 core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e88ecf2..43110b6 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -62,13 +62,13 @@ class Partition(val topic: String,
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
-  newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+  newGauge("UnderReplicated",
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
       }
-    }
+    },
+    toMap("topic" -> topic, "partitionId" -> partitionId.toString)
   )
 
   def isUnderReplicated(): Boolean = {
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..3b09041 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -8,7 +8,7 @@ package kafka.common
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,14 @@ package kafka.common
  * Convenience case class since (clientId, brokerInfo) pairs are used to create
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
-case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+
+trait ClientIdBroker {
+}
+
+case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker {
+  override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort)
+}
+
+case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker {
+  override def toString = "%s-%s".format(clientId, "AllBrokers")
 }
diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
index 7acf9e7..5825aad 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,11 +15,21 @@ package kafka.common
  * limitations under the License.
  */
 
+package kafka.common
+
 /**
  * Convenience case class since (clientId, topic) pairs are used in the creation
  * of many Stats objects.
  */
-case class ClientIdAndTopic(clientId: String, topic: String) {
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
   override def toString = "%s-%s".format(clientId, topic)
 }
 
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+  override def toString = "%s-%s".format(clientId, "AllTopics")
+}
+
+
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..ee6139c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index f63e6c5..9ca4161 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -20,12 +20,18 @@ package kafka.consumer
 import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 
 @threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val tags = metricId
+  match {
+    case ClientIdAndTopic(clientId, topic) => toMap("clientId" -> clientId, "topic" -> topic)
+    case ClientIdAllTopics(clientId) => toMap("clientId" -> clientId, "allTopics" -> "true")
+  }
+
+  val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
 }
 
 /**
@@ -35,12 +41,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 5243f41..e28f448 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -19,13 +19,20 @@ package kafka.consumer
 
 import java.util.concurrent.TimeUnit
 
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
-class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+  val tags = metricId
+  match {
+    case ClientIdAndBroker(clientId, brokerHost, brokerPort) => toMap("clientId" -> clientId, "brokerHost" -> brokerHost,
+      "brokerPort" -> brokerPort.toString)
+    case ClientIdAllBrokers(clientId) => toMap("clientId" -> clientId, "allBrokers" -> "true")
+  }
+
+  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+  val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
 }
 
 /**
@@ -33,14 +40,14 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
  * @param clientId ClientId of the given consumer
  */
 class FetchRequestAndResponseStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
-  private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
-  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
+  private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAllBrokers(clientId))
 
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
-  def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+  def getFetchRequestAndResponseStats(brokerHost: String, brokerPort: Int): FetchRequestAndResponseMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
   }
 }
 
@@ -56,7 +63,7 @@ object FetchRequestAndResponseStatsRegistry {
   }
 
   def removeConsumerFetchRequestAndResponseStats(clientId: String) {
-    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val pattern = (".*" + clientId + ".*").r
     val keys = globalStats.keys
     for (key <- keys) {
       pattern.findFirstIn(key) match {
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d349a30..e53ee51 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -36,7 +36,6 @@ class SimpleConsumer(val host: String,
   ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
@@ -106,7 +105,7 @@ class SimpleConsumer(val host: String,
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer
+    val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
     val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
     aggregateTimer.time {
       specificTimer.time {
@@ -115,7 +114,7 @@ class SimpleConsumer(val host: String,
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
     fetchResponse
   }
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..df4a253 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
   // useful for tracking migration of consumers to store offsets in kafka
-  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, toMap("clientId" -> config.clientId))
+  private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, toMap("clientId" -> config.clientId))
+  private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, toMap("clientId" -> config.clientId)))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -516,14 +516,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
-    
+
     @volatile private var allTopicsOwnedPartitionsCount = 0
-    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+    newGauge("OwnedPartitionsCount", new Gauge[Int] {
       def value() = allTopicsOwnedPartitionsCount
-    })
+    }, toMap("clientId" -> config.clientId, "groupId" -> config.groupId, "allTopics" -> "true"))
 
     private def ownedPartitionsCountMetricName(topic: String) =
-      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+      toMap("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic)
 
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
@@ -576,7 +576,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         for(partition <- infos.keys) {
           deletePartitionOwnershipFromZK(topic, partition)
         }
-        removeMetric(ownedPartitionsCountMetricName(topic))
+        removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricName(topic))
         localTopicRegistry.remove(topic)
       }
       allTopicsOwnedPartitionsCount = 0
@@ -679,9 +679,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
-              newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
+              newGauge("OwnedPartitionsCount", new Gauge[Int] {
                 def value() = partitionThreadPairs.size
-              })
+              }, ownedPartitionsCountMetricName(topic))
             }
 
             topicRegistry = currentTopicRegistry
@@ -863,10 +863,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        "FetchQueueSize",
         new Gauge[Int] {
           def value = q.size
-        }
+        },
+        toMap("clientId" -> config.clientId,
+          "topic" -> topicThreadId._1,
+          "threadId" -> topicThreadId._2.threadId.toString)
       )
     })
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 157d673..55247b9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -73,17 +73,31 @@ class Log(val dir: File,
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value = numberOfSegments })
-
-  newGauge(name + "-" + "LogStartOffset",
-           new Gauge[Long] { def value = logStartOffset })
-
-  newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value = logEndOffset })
-           
-  newGauge(name + "-" + "Size", 
-           new Gauge[Long] {def value = size})
+  val tags = toMap("topic" -> topicAndPartition.topic, "partitionId" -> topicAndPartition.partition.toString)
+
+  newGauge("NumLogSegments",
+    new Gauge[Int] {
+      def value = numberOfSegments
+    },
+    tags)
+
+  newGauge("LogStartOffset",
+    new Gauge[Long] {
+      def value = logStartOffset
+    },
+    tags)
+
+  newGauge("LogEndOffset",
+    new Gauge[Long] {
+      def value = logEndOffset
+    },
+    tags)
+
+  newGauge("Size",
+    new Gauge[Long] {
+      def value = size
+    },
+    tags)
 
   /** The name of this log */
   def name  = dir.getName()
@@ -153,7 +167,7 @@ class Log(val dir: File,
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0, new LogSegment(dir = dir, 
+      segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 2313a57..0794708 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,7 +26,7 @@ import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsR
 import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
 
-import scala.collection.immutable
+import scala.collection.{mutable, immutable}
 
 
 trait KafkaMetricsGroup extends Logging {
@@ -35,29 +35,54 @@ trait KafkaMetricsGroup extends Logging {
    * Creates a new MetricName object for gauges, meters, etc. created for this
    * metrics group.
    * @param name Descriptive name of the metric.
+   * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String) = {
+  private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    new MetricName(pkg, simpleName, name)
+
+    explicitMetricName(pkg, simpleName, name, tags)
+  }
+
+
+  private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+    val nameBuilder: StringBuilder = new StringBuilder
+
+    nameBuilder.append(group)
+
+    nameBuilder.append(":type=")
+
+    nameBuilder.append(typeName)
+
+    if (name.length > 0) {
+      nameBuilder.append(",name=")
+      nameBuilder.append(name)
+    }
+
+    KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+
+    new MetricName(group, typeName, name, null, nameBuilder.toString())
   }
 
-  def newGauge[T](name: String, metric: Gauge[T]) =
-    Metrics.defaultRegistry().newGauge(metricName(name), metric)
+  def toMap(entries: (String, String)*) = mutable.LinkedHashMap(entries: _*)
+
+  def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
 
-  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
-    Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
+  def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
 
-  def newHistogram(name: String, biased: Boolean = true) =
-    Metrics.defaultRegistry().newHistogram(metricName(name), biased)
+  def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
 
-  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
-    Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
+
+  def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
+    Metrics.defaultRegistry().removeMetric(metricName(name, tags))
 
-  def removeMetric(name: String) =
-    Metrics.defaultRegistry().removeMetric(metricName(name))
 
 }
 
@@ -68,72 +93,75 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
    */
   private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.consumer.ZookeeperConsumerConnector
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+    new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
 
     // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
 
     // kafka.consumer.ConsumerTopicStats
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
-    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
 
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
      * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
      */
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
-  private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
+  private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
-    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
 
     // kafka.producer.ProducerSendThread
-    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+    new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
 
     // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
+  def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
+    val filteredTags = tags
+      .filter(_._2 != "")
+    if (filteredTags.nonEmpty) {
+      val tagsString = filteredTags
+        .filter(_._2 != "").map { case (key, value) => "%s=%s".format(key, value)}
+        .mkString(",")
+
+      Some(tagsString)
+    }
+    else {
+      None
+    }
+  }
+
   def removeAllConsumerMetrics(clientId: String) {
     FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
     ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
@@ -150,18 +178,19 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
 
   private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
     metricNameList.foreach(metric => {
-      val pattern = (clientId + ".*" + metric.getName +".*").r
+      val pattern = (".*" + metric.getName + ".*" + clientId + ".*").r
       val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
       for (registeredMetric <- registeredMetrics) {
         if (registeredMetric.getGroup == metric.getGroup &&
+          registeredMetric.getName == metric.getName &&
           registeredMetric.getType == metric.getType) {
-          pattern.findFirstIn(registeredMetric.getName) match {
+          pattern.findFirstIn(registeredMetric.getMBeanName) match {
             case Some(_) => {
               val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
               Metrics.defaultRegistry().removeMetric(registeredMetric)
               val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
               trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
-                  registeredMetric, beforeRemovalSize, afterRemovalSize))
+                registeredMetric, beforeRemovalSize, afterRemovalSize))
             }
             case _ =>
           }
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4560d8f..e77d72e 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -125,12 +125,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
   })
 
-  for(i <- 0 until numProcessors) {
-    newGauge(
-      "Processor-" + i + "-ResponseQueueSize",
+  for (i <- 0 until numProcessors) {
+    newGauge("ResponseQueueSize",
       new Gauge[Int] {
         def value = responseQueues(i).size()
-      }
+      },
+      toMap("Processor" -> i.toString)
     )
   }
 
@@ -187,24 +187,25 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
+  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
   (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val tags = toMap("request" -> name)
+  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+  val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeMs")
+  val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+  val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+  val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
+  val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
 }
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cee76b3..e099276 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int,
                                     time, 
                                     maxRequestSize, 
                                     aggregateIdleMeter,
-                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, toMap("NetworkProcessor" -> i.toString)),
                                     numProcessorThreads, 
                                     requestChannel,
                                     quotas,
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 1c46d72..6b1c740 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -19,11 +19,16 @@ package kafka.producer
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 
-class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndBroker(clientId, brokerHost, brokerPort) => toMap("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort"->brokerPort.toString)
+    case ClientIdAllBrokers(clientId) => toMap("clientId" -> clientId, "allBrokers" -> "true")
+  }
+
+  val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+  val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
 }
 
 /**
@@ -31,14 +36,14 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
  * @param clientId ClientId of the given producer
  */
 class ProducerRequestStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
-  private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
+  private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
 
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
-  def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+  def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 35e3aae..162f4be 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -20,10 +20,13 @@ import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
+import scala.collection.mutable
+
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val tags: mutable.LinkedHashMap[String, String] = toMap("clientId" -> clientId)
+  val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
+  val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
+  val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 9bb1419..80a6da0 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -17,16 +17,23 @@
 package kafka.producer
 
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
+import scala.collection.mutable
+
 
 @threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val tags = metricId match {
+    case ClientIdAndTopic(clientId, topic) => toMap("clientId" -> clientId, "topic" -> topic)
+    case ClientIdAllTopics(clientId) => toMap("clientId" -> clientId, "allTopics" -> "true")
+  }
+
+  val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
 }
 
 /**
@@ -34,14 +41,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given producer client.
  */
 class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..86fc10c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -39,7 +39,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer")
@@ -93,11 +92,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     val requestSize = producerRequest.sizeInBytes
-    producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+    producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
     producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
 
     var response: Receive = null
-    val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer
+    val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
     val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
     aggregateTimer.time {
       specificTimer.time {
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 42e9c74..19d06a2 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,10 +34,11 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize",
+  newGauge("ProducerQueueSize",
           new Gauge[Int] {
             def value = queue.size
-          })
+          },
+          toMap("clientId" -> clientId))
 
   override def run {
     try {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..b27b30e 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1)
+abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
   extends Logging with KafkaMetricsGroup {
   // map of (source broker_id, fetcher_id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    "MaxLag",
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -42,24 +42,25 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
           curMaxThread.max(fetcherLagStatsEntry._2.lag)
         }).max(curMaxAll)
       })
-    }
+    },
+    toMap("clientId" -> clientId)
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
-    {
-      new Gauge[Double] {
-        // current min fetch rate across all fetchers/topics/partitions
-        def value = {
-          val headRate: Double =
-            fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
-
-          fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
-            fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
-          })
-        }
+  "MinFetchRate", {
+    new Gauge[Double] {
+      // current min fetch rate across all fetchers/topics/partitions
+      def value = {
+        val headRate: Double =
+          fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+        fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
+          fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+        })
       }
     }
+  },
+  toMap("clientId" -> clientId)
   )
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..4dcdcd5 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -26,9 +26,7 @@ import kafka.utils.Utils.inLock
 import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 
-import scala.collection.mutable
-import scala.collection.Set
-import scala.collection.Map
+import scala.collection.{mutable, Set, Map}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.atomic.AtomicLong
@@ -46,8 +44,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+  private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
   val fetchRequestBuilder = new FetchRequestBuilder().
@@ -204,13 +201,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 }
 
-class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
-  newGauge(
-    metricId + "-ConsumerLag",
+  newGauge("ConsumerLag",
     new Gauge[Long] {
       def value = lagVal.get
-    }
+    },
+    toMap("clientId" -> metricId.clientId,
+      "topic" -> metricId.topic,
+      "partitionId" -> metricId.partitionId.toString)
   )
 
   def lag_=(newLag: Long) {
@@ -221,20 +220,25 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 }
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
-  private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+  val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
+    stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
   }
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val tags = toMap("clientId" -> metricId.clientId,
+    "brokerHost" -> metricId.brokerHost,
+    "brokerPort" -> metricId.brokerPort.toString)
+
+  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+
+  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
 }
 
-case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
 }
 
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
index 628ef59..06ee023 100644
--- a/core/src/main/scala/kafka/server/DelayedRequestKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
@@ -26,10 +26,6 @@ trait DelayedRequestKey {
   def keyLabel: String
 }
 
-object DelayedRequestKey {
-  val globalLabel = "All"
-}
-
 case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
 
   def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..2e27300 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -93,23 +93,28 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 }
 
-class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
-  val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
-  val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec",  "bytes", TimeUnit.SECONDS)
-  val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
+class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
+  val tags = name match {
+    case None => toMap("allTopics" -> "true")
+    case Some(topic) => toMap("topic" -> topic)
+  }
+
+  val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags)
+  val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
+  val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
 }
 
 object BrokerTopicStats extends Logging {
-  private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
+  private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics(None)
 
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut(topic)
   }
 }
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
index d4a7d4a..21e9f27 100644
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import kafka.common.TopicAndPartition
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
@@ -30,19 +31,24 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
   extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
   this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
-  private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
-    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+  private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup {
+    val tags = metricId match {
+      case Some(topicAndPartition) => toMap("topic" -> topicAndPartition.topic, "partitionId" -> topicAndPartition.partition.toString)
+      case None => toMap("allTopics" -> "true")
+    }
+
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", "requests", TimeUnit.SECONDS, tags)
   }
 
   private val producerRequestMetricsForKey = {
-    val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
-    new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
+    val valueFactory = (k: TopicAndPartition) => new DelayedProducerRequestMetrics(Some(k))
+    new Pool[TopicAndPartition, DelayedProducerRequestMetrics](Some(valueFactory))
   }
 
-  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics(None)
 
-  private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
-    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+  private def recordDelayedProducerKeyExpired(metricId: TopicAndPartition) {
+    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(metricId)
     List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
   }
 
@@ -57,7 +63,7 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
   def expire(delayedProduce: DelayedProduce) {
     debug("Expiring produce request %s.".format(delayedProduce.produce))
     for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
-      recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
+      recordDelayedProducerKeyExpired(topicPartition)
     respond(delayedProduce)
   }
 
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
new file mode 100644
index 0000000..3bfed93
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import java.util.{Properties}
+
+import com.yammer.metrics.Metrics
+import junit.framework.Assert._
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import scala.collection._
+import org.scalatest.junit.JUnit3Suite
+import kafka.message._
+import kafka.serializer._
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import kafka.producer.{KeyedMessage, Producer}
+import kafka.utils.TestUtils._
+
+class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+
+
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val numNodes = 2
+  val numParts = 2
+  val topic = "topic1"
+  val configs =
+    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val zkConnect = zookeeperConnect
+      override val numPartitions = numParts
+    }
+  val nMessages = 2
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testMetricNames() {
+    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
+
+    // create topic topic1 with 1 partition on broker 0
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    // force creation not client's specific metrics.
+    createAndShutdownStep("group1", "consumer1", "producer1")
+
+    val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+
+
+    zkClient.close()
+  }
+
+  def testMetricsLeak() {
+    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
+
+    // create topic topic1 with 1 partition on broker 0
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    // force creation not client's specific metrics.
+    createAndShutdownStep("group1", "consumer1", "producer1")
+
+    val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+
+    createAndShutdownStep("group1", "consumer1", "producer1")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    createAndShutdownStep("group2", "consumer1", "producer1")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    createAndShutdownStep("group3", "consumer2", "producer1")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    createAndShutdownStep("group1", "consumer1", "producer2")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    createAndShutdownStep("group2", "consumer1", "producer2")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    createAndShutdownStep("group3", "consumer2", "producer2")
+
+    assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+
+    zkClient.close()
+  }
+
+  def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1, producerId)
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+
+    zkConsumerConnector1.shutdown()
+  }
+
+  def sendMessages(config: KafkaConfig,
+                   messagesPerNode: Int,
+                   header: String,
+                   compression: CompressionCodec,
+                   numParts: Int,
+                   producerId: String): List[String] = {
+    var messages: List[String] = Nil
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    props.put("client.id", producerId)
+    val producer: Producer[Int, String] =
+      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*)
+      messages ++= ms
+      debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
+    }
+    producer.close()
+    messages
+  }
+
+  def getMessages(nMessagesPerThread: Int,
+                  topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = {
+    var messages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next.message
+          messages ::= message
+          debug("received message: " + message)
+        }
+      }
+    }
+    messages.reverse
+  }
+}
-- 
1.9.1

