Index: core/src/main/scala/kafka/cluster/Partition.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(revision )
@@ -16,6 +16,8 @@
  */
 package kafka.cluster
 
+import javax.management.ObjectName
+
 import kafka.common._
 import kafka.admin.AdminUtils
 import kafka.utils._
@@ -63,7 +65,7 @@
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
   newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+    "UnderReplicated", "topic=%s,partitionId=%s".format(ObjectName.quote(topic), ObjectName.quote(partitionId.toString)),
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
Index: core/src/test/scala/other/kafka/TestOffsetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/scala/other/kafka/TestOffsetManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/test/scala/other/kafka/TestOffsetManager.scala	(revision )
@@ -106,7 +106,7 @@
         extends ShutdownableThread("fetch-thread")
         with KafkaMetricsGroup {
 
-    private val timer = newTimer("fetch-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+    private val timer = newTimer("fetch-thread", null, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
     private val fetchTimer = new KafkaTimer(timer)
 
     private val channels = mutable.Map[Int, BlockingChannel]()
Index: core/src/main/scala/kafka/network/SocketServer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(revision )
@@ -23,6 +23,7 @@
 import java.net._
 import java.io._
 import java.nio.channels._
+import javax.management.ObjectName
 
 import scala.collection._
 
@@ -67,7 +68,7 @@
                                     time, 
                                     maxRequestSize, 
                                     aggregateIdleMeter,
-                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+                                    newMeter("IdlePercent", "NetworkProcessorNum=" + ObjectName.quote(i.toString), "percent", TimeUnit.NANOSECONDS),
                                     numProcessorThreads, 
                                     requestChannel,
                                     quotas,
Index: core/src/main/scala/kafka/tools/MirrorMaker.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/tools/MirrorMaker.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala	(revision )
@@ -196,7 +196,7 @@
     // time should be discounted by # threads.
     private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
     private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
-    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
+    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size", true)
 
     def put(record: ProducerRecord) {
       // If the key of the message is empty, use round-robin to select the queue
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision )
@@ -22,6 +22,7 @@
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.ReentrantLock
+import javax.management.ObjectName
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -104,9 +105,9 @@
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
   // useful for tracking migration of consumers to store offsets in kafka
-  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "clientId=%s".format(ObjectName.quote(config.clientId)), "commits", TimeUnit.SECONDS)
+  private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "clientId=%s".format(ObjectName.quote(config.clientId)), "commits", TimeUnit.SECONDS)
+  private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", "clientId=%s".format(ObjectName.quote(config.clientId)), TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -115,11 +116,11 @@
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
-      consumerUuid = "%s-%d-%s".format(
-        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
-        uuid.getMostSignificantBits().toHexString.substring(0,8))
+      consumerUuid = "consumerHostName=%s,timestamp=%s,uuid=%s".format(
+        ObjectName.quote(InetAddress.getLocalHost.getHostName), ObjectName.quote(System.currentTimeMillis.toString),
+        ObjectName.quote(uuid.getMostSignificantBits().toHexString.substring(0,8)))
     }
-    config.groupId + "_" + consumerUuid
+    "groupId=" + ObjectName.quote(config.groupId) + "," + consumerUuid
   }
   this.logIdent = "[" + consumerIdString + "], "
 
@@ -229,7 +230,7 @@
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, "clientId=%s".format(ObjectName.quote(config.clientId)))
         (queue, stream)
       })
     ).flatten.toList
@@ -518,12 +519,12 @@
     private val cond = lock.newCondition()
     
     @volatile private var allTopicsOwnedPartitionsCount = 0
-    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+    newGauge("OwnedPartitionsCount", "clientId=%s,groupId=%s,allTopics=%s".format(ObjectName.quote(config.clientId), ObjectName.quote(config.groupId), ObjectName.quote("true")), new Gauge[Int] {
       def value() = allTopicsOwnedPartitionsCount
     })
 
     private def ownedPartitionsCountMetricName(topic: String) =
-      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+      "clientId=%s,groupId=%s,topic=%s".format(ObjectName.quote(config.clientId), ObjectName.quote(config.groupId), ObjectName.quote(topic))
 
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
@@ -576,7 +577,7 @@
         for(partition <- infos.keys) {
           deletePartitionOwnershipFromZK(topic, partition)
         }
-        removeMetric(ownedPartitionsCountMetricName(topic))
+        removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricName(topic))
         localTopicRegistry.remove(topic)
       }
       allTopicsOwnedPartitionsCount = 0
@@ -679,7 +680,7 @@
 
             partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
-              newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
+              newGauge("OwnedPartitionsCount", ownedPartitionsCountMetricName(topic), new Gauge[Int] {
                 def value() = partitionThreadPairs.size
               })
             }
@@ -801,7 +802,7 @@
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchMessageMaxBytes),
-                                                 config.clientId)
+                                                 "clientId=%s".format(ObjectName.quote(config.clientId)))
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
       checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
@@ -863,7 +864,7 @@
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        "FetchQueueSize", "clientId=%s,topicThreadId=%s,%s".format(ObjectName.quote(config.clientId), ObjectName.quote(topicThreadId._1), topicThreadId._2.toString),
         new Gauge[Int] {
           def value = q.size
         }
@@ -910,7 +911,7 @@
                                           config.consumerTimeoutMs,
                                           keyDecoder,
                                           valueDecoder,
-                                          config.clientId)
+                                          "clientId=%s".format(ObjectName.quote(config.clientId)))
         (queue, stream)
     }).toList
 
Index: core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala	(revision )
@@ -20,7 +20,7 @@
 import java.net.URI
 import java.text.SimpleDateFormat
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
@@ -42,6 +42,7 @@
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
+    ConsumerConfig.validateClientId(config.clientId)
     val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
 
     // reset to latest or smallest offset
Index: core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala	(revision )
@@ -31,7 +31,7 @@
   this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
   private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
-    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", keyLabel, "requests", TimeUnit.SECONDS)
   }
 
   private val producerRequestMetricsForKey = {
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(revision )
@@ -87,8 +87,8 @@
       def value = underReplicatedPartitionCount()
     }
   )
-  val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
+  val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
-  val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
+  val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount(): Int = {
       getLeaderPartitions().count(_.isUnderReplicated)
Index: core/src/main/scala/kafka/common/ClientIdTopic.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/common/ClientIdTopic.scala	(revision )
+++ core/src/main/scala/kafka/common/ClientIdTopic.scala	(revision )
@@ -0,0 +1,35 @@
+package kafka.common
+
+import javax.management.ObjectName
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (clientId, topic) pairs are used in the creation
+ * of many Stats objects.
+ */
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
+  override def toString = if (clientId == "") "topic=%s".format(ObjectName.quote(topic)) else "%s,topic=%s".format(clientId, ObjectName.quote(topic))
+}
+
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+  override def toString = if (clientId == "") "allTopics=%s".format(ObjectName.quote("true")) else "%s,allTopics=%s".format(clientId, ObjectName.quote("true"))
+}
Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.producer.async
 
+import javax.management.ObjectName
+
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
 import collection.mutable.ArrayBuffer
@@ -34,7 +36,7 @@
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize",
+  newGauge("ProducerQueueSize", "clientId=%s".format(ObjectName.quote(clientId)),
           new Gauge[Int] {
             def value = queue.size
           })
Index: core/src/main/scala/kafka/utils/Throttler.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/utils/Throttler.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/utils/Throttler.scala	(revision )
@@ -37,11 +37,12 @@
                 val checkIntervalMs: Long = 100L, 
                 val throttleDown: Boolean = true,
                 metricName: String = "throttler",
+                mBeanName: String = null,
                 units: String = "entries",
                 val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   
   private val lock = new Object
-  private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
+  private val meter = newMeter(metricName, mBeanName, units, TimeUnit.SECONDS)
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
   
\ No newline at end of file
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
@@ -94,22 +96,22 @@
 }
 
 class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
-  val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
-  val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec",  "bytes", TimeUnit.SECONDS)
-  val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val messagesInRate = newMeter("MessagesInPerSec", name,  "messages", TimeUnit.SECONDS)
+  val bytesInRate = newMeter("BytesInPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val bytesOutRate = newMeter("BytesOutPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val bytesRejectedRate = newMeter("BytesRejectedPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", name,  "requests", TimeUnit.SECONDS)
+  val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", name,  "requests", TimeUnit.SECONDS)
 }
 
 object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics("allTopics=" + ObjectName.quote("true"))
 
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut("topic=%s".format(ObjectName.quote(topic)))
   }
 }
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.api._
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
@@ -39,8 +41,8 @@
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
-  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(ObjectName.quote(config.host), ObjectName.quote(config.port.toString))
+  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats("clientId=" + ObjectName.quote(config.clientId))
 
   trace("Instantiating Scala Sync Producer")
 
Index: core/src/main/scala/kafka/producer/ProducerRequestStats.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/ProducerRequestStats.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/ProducerRequestStats.scala	(revision )
@@ -16,14 +16,16 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 import kafka.common.ClientIdAndBroker
 
 class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+  val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", metricId.toString, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram("ProducerRequestSize", metricId.toString)
 }
 
 /**
@@ -33,12 +35,12 @@
 class ProducerRequestStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "allBrokers=" + ObjectName.quote("true")))
 
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
   def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
Index: core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala	(revision )
@@ -19,9 +19,10 @@
 
 
 import java.util.concurrent.TimeUnit
+import javax.management.ObjectName
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Gauge, MetricName}
+import com.yammer.metrics.core._
 import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
 import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
 import kafka.utils.Logging
@@ -35,30 +36,67 @@
    * Creates a new MetricName object for gauges, meters, etc. created for this
    * metrics group.
    * @param name Descriptive name of the metric.
+   * @param mBeanName Descriptive mBeanName of the metric.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String) = {
+  private def metricName(name: String, mBeanName: String) = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    new MetricName(pkg, simpleName, name)
+    if (mBeanName == null) new MetricName(pkg, simpleName, name)
+    else {
+      val nameBuilder: StringBuilder = new StringBuilder
+
+      nameBuilder.append(ObjectName.quote(pkg))
+
+      nameBuilder.append(":type=")
+
+      nameBuilder.append(ObjectName.quote(simpleName))
+
+      if (name.length > 0) {
+        nameBuilder.append(",name=")
+        nameBuilder.append(ObjectName.quote(name))
-  }
+      }
 
-  def newGauge[T](name: String, metric: Gauge[T]) =
-    Metrics.defaultRegistry().newGauge(metricName(name), metric)
+      if (name.length > 0) {
+        nameBuilder.append(",")
+        nameBuilder.append(mBeanName)
+      }
 
-  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
-    Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
+      new MetricName(pkg, simpleName, name, null, nameBuilder.toString())
+    }
+  }
 
-  def newHistogram(name: String, biased: Boolean = true) =
-    Metrics.defaultRegistry().newHistogram(metricName(name), biased)
+  def newGauge[T](name: String, mBeanName: String, metric: Gauge[T]) =
+    Metrics.defaultRegistry().newGauge(metricName(name, mBeanName), metric)
 
-  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
-    Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+  def newMeter(name: String, mBeanName: String, eventType: String, timeUnit: TimeUnit) =
+    Metrics.defaultRegistry().newMeter(metricName(name, mBeanName), eventType, timeUnit)
 
-  def removeMetric(name: String) =
-    Metrics.defaultRegistry().removeMetric(metricName(name))
+  def newHistogram(name: String, mBeanName: String, biased: Boolean = true) =
+    Metrics.defaultRegistry().newHistogram(metricName(name, mBeanName), biased)
 
+  def newTimer(name: String, mBeanName: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
+    Metrics.defaultRegistry().newTimer(metricName(name, mBeanName), durationUnit, rateUnit)
+
+  def removeMetric(name: String, mBeanName: String) =
+    Metrics.defaultRegistry().removeMetric(metricName(name, mBeanName))
+
+  def newGauge[T](name: String, metric: Gauge[T]) : Gauge[T] =
+    newGauge(name, null, metric)
+
+  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) : Meter =
+    newMeter(name, null, eventType, timeUnit)
+
+  def newHistogram(name: String, biased: Boolean) : Histogram =
+    newHistogram(name, null, biased)
+
+  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) : Timer =
+    newTimer(name, null, durationUnit, rateUnit)
+
+  def removeMetric(name: String) : Unit =
+    removeMetric(name, null)
+
 }
 
 object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
@@ -68,70 +106,70 @@
    */
   private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.consumer.ZookeeperConsumerConnector
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+    new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
 
     // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
 
     // kafka.consumer.ConsumerTopicStats
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
-    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
 
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
      * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
      */
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
   private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
     // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
-    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
 
     // kafka.producer.ProducerSendThread
-    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+    new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
 
     // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
   def removeAllConsumerMetrics(clientId: String) {
Index: core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala	(revision )
@@ -30,7 +30,7 @@
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,
Index: core/src/main/scala/kafka/log/LogCleanerManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/log/LogCleanerManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/log/LogCleanerManager.scala	(revision )
@@ -59,7 +59,7 @@
   
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
-  newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
+  newGauge("MaxDirtyPercent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
 
   /**
    * @return the position processed for all logs.
Index: core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala	(revision )
@@ -17,11 +17,13 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
-                                       "Replica", brokerConfig.numReplicaFetchers) {
+                                       "source=" + ObjectName.quote("Replica"), brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
\ No newline at end of file
Index: core/src/main/scala/kafka/server/DelayedRequestKey.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/DelayedRequestKey.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/DelayedRequestKey.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.common.TopicAndPartition
 
 /**
@@ -27,12 +29,12 @@
 }
 
 object DelayedRequestKey {
-  val globalLabel = "All"
+  val globalLabel = "allTopics=" + ObjectName.quote("true")
 }
 
 case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
 
   def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
 
-  override def keyLabel = "%s-%d".format(topic, partition)
+  override def keyLabel = "topic=%s,partitionId=%s".format(ObjectName.quote(topic), ObjectName.quote(partition.toString))
 }
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision )
@@ -29,7 +29,7 @@
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = name,
+                                clientId = brokerConfig.brokerId.toString,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import kafka.api._
 import kafka.network._
 import kafka.utils._
@@ -32,11 +34,9 @@
                      val soTimeout: Int,
                      val bufferSize: Int,
                      val clientId: String) extends Logging {
-
-  ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(ObjectName.quote(host), ObjectName.quote(port.toString))
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
Index: core/src/main/scala/kafka/producer/ProducerTopicStats.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/ProducerTopicStats.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/ProducerTopicStats.scala	(revision )
@@ -16,17 +16,19 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAndTopic, ClientIdAllTopics, ClientIdTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
 
 @threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter("MessagesPerSec", metricId.toString, "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter("DroppedMessagesPerSec", metricId.toString, "drops", TimeUnit.SECONDS)
 }
 
 /**
@@ -34,14 +36,14 @@
  * @param clientId The clientId of the given producer client.
  */
 class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics("clientId=%s".format(ObjectName.quote(clientId)))) // to differentiate from a topic named AllTopics
 
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic("clientId=%s".format(ObjectName.quote(clientId)), topic))
   }
 }
 
Index: core/src/main/scala/kafka/log/Log.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/log/Log.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import javax.management.ObjectName
+
 import kafka.utils._
 import kafka.message._
 import kafka.common._
@@ -73,20 +75,32 @@
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value = numberOfSegments })
 
-  newGauge(name + "-" + "LogStartOffset",
-           new Gauge[Long] { def value = logStartOffset })
+  val metricName = buildMetricName(Log.parseTopicPartitionName(name))
 
-  newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value = logEndOffset })
+  newGauge("NumLogSegments", metricName,
+    new Gauge[Int] {
+      def value = numberOfSegments
+    })
-           
+
-  newGauge(name + "-" + "Size", 
-           new Gauge[Long] {def value = size})
 
+  newGauge("LogEndOffset", metricName,
+    new Gauge[Long] {
+      def value = logEndOffset
+    })
+
+
+  newGauge("Size", metricName,
+    new Gauge[Long] {
+      def value = size
+    })
+
+  private def buildMetricName(topicPartition: TopicAndPartition) = {
+    "topic=%s,partitionId=%s".format(ObjectName.quote(topicAndPartition.topic), ObjectName.quote(topicAndPartition.partition.toString))
+  }
+
   /** The name of this log */
-  def name  = dir.getName()
+  def name = dir.getName
 
   /* Load the log segments from the log files on disk */
   private def loadSegments() {
@@ -152,7 +166,7 @@
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0, new LogSegment(dir = dir, 
+      segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
Index: core/src/main/scala/kafka/consumer/TopicCount.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/TopicCount.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/TopicCount.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
@@ -31,7 +33,7 @@
 }
 
 case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
-  override def toString = "%s-%d".format(consumer, threadId)
+  override def toString = "%s,threadId=%s".format(consumer, ObjectName.quote(threadId.toString))
 
   def compare(that: ConsumerThreadId) = toString.compare(that.toString)
 }
Index: core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala	(revision )
@@ -18,14 +18,15 @@
 package kafka.consumer
 
 import java.util.concurrent.TimeUnit
+import javax.management.ObjectName
 
 import kafka.common.ClientIdAndBroker
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", metricId.toString, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram("FetchResponseSize", metricId.toString)
 }
 
 /**
@@ -35,12 +36,12 @@
 class FetchRequestAndResponseStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
-  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "allBrokers=" + ObjectName.quote("true")))
 
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
   def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
@@ -56,7 +57,7 @@
   }
 
   def removeConsumerFetchRequestAndResponseStats(clientId: String) {
-    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val pattern = (clientId + "ConsumerFetcherThread.*").r
     val keys = globalStats.keys
     for (key <- keys) {
       pattern.findFirstIn(key) match {
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision )
@@ -167,6 +167,7 @@
       System.exit(1)
     }
     if (startingOffset < 0) {
+      ConsumerConfig.validateClientId(clientId)
       val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
                                               ConsumerConfig.SocketBufferSize, clientId)
       try {
@@ -189,6 +190,7 @@
     val replicaString = if(replicaId > 0) "leader" else "replica"
     info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+    ConsumerConfig.validateClientId(clientId)
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
Index: core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/FetchRequestPurgatory.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/FetchRequestPurgatory.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel
 import kafka.api.FetchResponseSend
@@ -31,9 +33,9 @@
   this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
   private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
-    private val metricPrefix = if (forFollower) "Follower" else "Consumer"
+    private val metricPrefix = "source=" + ObjectName.quote((if (forFollower) "Follower" else "Consumer"))
 
-    val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", metricPrefix, "requests", TimeUnit.SECONDS)
   }
 
   private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision )
@@ -109,8 +109,8 @@
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -134,8 +134,8 @@
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer2,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -197,8 +197,8 @@
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -222,8 +222,8 @@
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer2,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -268,8 +268,8 @@
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer0-0"),
-                           ("1", "group1_consumer0-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer0,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer0,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     zkConsumerConnector1.shutdown
@@ -336,7 +336,7 @@
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.cluster.Broker
 import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
@@ -45,13 +47,17 @@
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+
+  private val brokerInfo = "%s,brokerHost=%s,brokerPort=%s".format(name, ObjectName.quote(sourceBroker.host), ObjectName.quote(sourceBroker.port.toString))
+  private val clientInfo = "clientId=%s".format(ObjectName.quote(clientId))
+  private val metricId = new ClientIdAndBroker(clientInfo, brokerInfo)
+
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientInfo + "," + name)
+
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
   val fetchRequestBuilder = new FetchRequestBuilder().
-          clientId(clientId).
+          clientId(clientInfo + "," + name).
           replicaId(fetcherBrokerId).
           maxWait(maxWait).
           minBytes(minBytes)
@@ -207,7 +213,7 @@
 class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
   newGauge(
-    metricId + "-ConsumerLag",
+    "ConsumerLag", metricId.toString,
     new Gauge[Long] {
       def value = lagVal.get
     }
@@ -230,11 +236,12 @@
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val requestRate = newMeter("RequestsPerSec", metricId.toString, "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
 }
 
 case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+  override def toString = "%s,%s,topic=%s,partitionId=%s".format(clientId, brokerInfo,
+    ObjectName.quote(topic), ObjectName.quote(partitionId.toString))
 }
 
Index: core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(revision )
@@ -20,12 +20,12 @@
 import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAllTopics, ClientIdAndTopic, ClientIdTopic}
 
 @threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter("MessagesPerSec", metricId.toString, "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
 }
 
 /**
@@ -33,14 +33,14 @@
  * @param clientId The clientId of the given consumer client.
  */
 class ConsumerTopicStats(clientId: String) extends Logging {
-  private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
\ No newline at end of file
Index: core/src/main/scala/kafka/server/KafkaServer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(revision )
@@ -59,7 +59,7 @@
   var zkClient: ZkClient = null
 
   newGauge(
-    "BrokerState",
+    "BrokerState", null,
     new Gauge[Int] {
       def value = brokerState.currentState
     }
Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(revision )
@@ -17,11 +17,12 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import org.I0Itec.zkclient.ZkClient
 import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
 import kafka.cluster.{Cluster, Broker}
 import scala.collection.immutable
-import scala.collection.Map
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
@@ -41,7 +42,7 @@
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
-                                       config.clientId, config.numConsumerFetchers) {
+                                       "clientId=%s".format(ObjectName.quote(config.clientId)), config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
@@ -116,7 +117,7 @@
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      "threadName=%s,%s,fetcherId=%s,sourceBrokerId=%s".format(ObjectName.quote("ConsumerFetcherThread"), consumerIdString, ObjectName.quote(fetcherId.toString), ObjectName.quote(sourceBroker.id.toString)),
       config, sourceBroker, partitionMap, this)
   }
 
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/Producer.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/Producer.scala	(revision )
@@ -18,6 +18,7 @@
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import javax.management.ObjectName
 
 import kafka.common.QueueFullException
 import kafka.metrics._
Index: core/src/main/scala/kafka/network/RequestChannel.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/network/RequestChannel.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/network/RequestChannel.scala	(revision )
@@ -18,6 +18,7 @@
 package kafka.network
 
 import java.util.concurrent._
+import javax.management.ObjectName
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.nio.ByteBuffer
@@ -127,7 +128,7 @@
 
   for(i <- 0 until numProcessors) {
     newGauge(
-      "Processor-" + i + "-ResponseQueueSize",
+      "ResponseQueueSize", "ProcessorNum=" + ObjectName.quote(i.toString),
       new Gauge[Int] {
         def value = responseQueues(i).size()
       }
@@ -187,24 +188,24 @@
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  val consumerFetchMetricName = "request=" + ObjectName.quote("FetchConsumer")
+  val followFetchMetricName = "request=" + ObjectName.quote("FetchFollower")
   (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val requestRate = newMeter("RequestsPerSec", name,  "requests", TimeUnit.SECONDS)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+  val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", name)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeMs")
+  val localTimeHist = newHistogram("LocalTimeMs", name)
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  val remoteTimeHist = newHistogram("RemoteTimeMs", name)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+  val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs",name)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+  val responseSendTimeHist = newHistogram("ResponseSendTimeMs", name)
+  val totalTimeHist = newHistogram("TotalTimeMs", name)
 }
 
Index: core/src/main/scala/kafka/log/LogCleaner.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/log/LogCleaner.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/log/LogCleaner.scala	(revision )
@@ -78,7 +78,8 @@
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
                                         checkIntervalMs = 300, 
                                         throttleDown = true, 
-                                        "cleaner-io",
+                                        "CleanerIO",
+                                         null,
                                         "bytes",
                                         time = time)
   
@@ -86,12 +87,12 @@
   private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
   
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
-  newGauge("max-buffer-utilization-percent", 
+  newGauge("MaxBufferUtilizationPercent",
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt
            })
   /* a metric to track the recopy rate of each thread's last cleaning */
-  newGauge("cleaner-recopy-percent", 
+  newGauge("CleanerRecopyPercent",
            new Gauge[Int] {
              def value: Int = {
                val stats = cleaners.map(_.lastStats)
@@ -100,7 +101,7 @@
              }
            })
   /* a metric to track the maximum cleaning time for the last cleaning from each thread */
-  newGauge("max-clean-time-secs",
+  newGauge("MaxCleanTimeSecs",
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
\ No newline at end of file
Index: core/src/main/scala/kafka/common/ClientIdAndTopic.scala
===================================================================
--- core/src/main/scala/kafka/common/ClientIdAndTopic.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/common/ClientIdAndTopic.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
@@ -1,27 +0,0 @@
-package kafka.common
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Convenience case class since (clientId, topic) pairs are used in the creation
- * of many Stats objects.
- */
-case class ClientIdAndTopic(clientId: String, topic: String) {
-  override def toString = "%s-%s".format(clientId, topic)
-}
-
Index: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherManager.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala	(revision )
@@ -34,7 +34,7 @@
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    "MaxLag", metricPrefix,
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -46,7 +46,7 @@
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
+    "MinFetchRate", metricPrefix,
     {
       new Gauge[Double] {
         // current min fetch rate across all fetchers/topics/partitions
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/ProducerStats.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/producer/ProducerStats.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/producer/ProducerStats.scala	(revision )
@@ -16,14 +16,16 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val serializationErrorRate = newMeter("SerializationErrorsPerSec", "clientId=%s".format(ObjectName.quote(clientId)), "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter("ResendsPerSec", "clientId=%s".format(ObjectName.quote(clientId)), "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter("FailedSendsPerSec" ,"clientId=%s".format(ObjectName.quote(clientId)), "failed sends", TimeUnit.SECONDS)
 }
 
 /**
Index: core/src/main/scala/kafka/api/RequestKeys.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/api/RequestKeys.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/api/RequestKeys.scala	(revision )
@@ -17,6 +17,8 @@
 
 package kafka.api
 
+import javax.management.ObjectName
+
 import kafka.common.KafkaException
 import java.nio.ByteBuffer
 
@@ -36,19 +38,19 @@
   val HeartbeatKey: Short = 12
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
-    Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
-        FetchKey -> ("Fetch", FetchRequest.readFrom),
-        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
-        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
-        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
-        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
-        ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
-        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
-        JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+    Map(ProduceKey -> ("request=" + ObjectName.quote("Produce"), ProducerRequest.readFrom),
+        FetchKey -> ("request=" + ObjectName.quote("Fetch"), FetchRequest.readFrom),
+        OffsetsKey -> ("request=" + ObjectName.quote("Offsets"), OffsetRequest.readFrom),
+        MetadataKey -> ("request=" + ObjectName.quote("Metadata"), TopicMetadataRequest.readFrom),
+        LeaderAndIsrKey -> ("request=" + ObjectName.quote("LeaderAndIsr"), LeaderAndIsrRequest.readFrom),
+        StopReplicaKey -> ("request=" + ObjectName.quote("StopReplica"), StopReplicaRequest.readFrom),
+        UpdateMetadataKey -> ("request=" + ObjectName.quote("UpdateMetadata"), UpdateMetadataRequest.readFrom),
+        ControlledShutdownKey -> ("request=" + ObjectName.quote("ControlledShutdown"), ControlledShutdownRequest.readFrom),
+        OffsetCommitKey -> ("request=" + ObjectName.quote("OffsetCommit"), OffsetCommitRequest.readFrom),
+        OffsetFetchKey -> ("request=" + ObjectName.quote("OffsetFetch"), OffsetFetchRequest.readFrom),
+        ConsumerMetadataKey -> ("request=" + ObjectName.quote("ConsumerMetadata"), ConsumerMetadataRequest.readFrom),
+        JoinGroupKey -> ("request=" + ObjectName.quote("JoinGroup"), JoinGroupRequestAndHeader.readFrom),
+        HeartbeatKey -> ("request=" + ObjectName.quote("Heartbeat"), HeartbeatRequestAndHeader.readFrom)
     )
 
   def nameForKey(key: Short): String = {
Index: core/src/main/scala/kafka/common/ClientIdAndBroker.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/common/ClientIdAndBroker.scala	(revision c63e06200d341345a1756e35ff0f48af2ec34d4a)
+++ core/src/main/scala/kafka/common/ClientIdAndBroker.scala	(revision )
@@ -1,5 +1,7 @@
 package kafka.common
 
+import javax.management.ObjectName
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -22,5 +24,5 @@
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
 case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+  override def toString = if (clientId == "") "%s".format(brokerInfo) else "%s,%s".format(clientId, brokerInfo)
 }
