diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..4e9a354 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -17,6 +17,8 @@
 
 package kafka.api
 
+import javax.management.ObjectName
+
 import kafka.common.KafkaException
 import java.nio.ByteBuffer
 
@@ -36,19 +38,19 @@ object RequestKeys {
   val HeartbeatKey: Short = 12
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
-    Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
-        FetchKey -> ("Fetch", FetchRequest.readFrom),
-        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
-        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
-        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
-        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
-        ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
-        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
-        JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+    Map(ProduceKey -> ("request=" + ObjectName.quote("Produce"), ProducerRequest.readFrom),
+        FetchKey -> ("request=" + ObjectName.quote("Fetch"), FetchRequest.readFrom),
+        OffsetsKey -> ("request=" + ObjectName.quote("Offsets"), OffsetRequest.readFrom),
+        MetadataKey -> ("request=" + ObjectName.quote("Metadata"), TopicMetadataRequest.readFrom),
+        LeaderAndIsrKey -> ("request=" + ObjectName.quote("LeaderAndIsr"), LeaderAndIsrRequest.readFrom),
+        StopReplicaKey -> ("request=" + ObjectName.quote("StopReplica"), StopReplicaRequest.readFrom),
+        UpdateMetadataKey -> ("request=" + ObjectName.quote("UpdateMetadata"), UpdateMetadataRequest.readFrom),
+        ControlledShutdownKey -> ("request=" + ObjectName.quote("ControlledShutdown"), ControlledShutdownRequest.readFrom),
+        OffsetCommitKey -> ("request=" + ObjectName.quote("OffsetCommit"), OffsetCommitRequest.readFrom),
+        OffsetFetchKey -> ("request=" + ObjectName.quote("OffsetFetch"), OffsetFetchRequest.readFrom),
+        ConsumerMetadataKey -> ("request=" + ObjectName.quote("ConsumerMetadata"), ConsumerMetadataRequest.readFrom),
+        JoinGroupKey -> ("request=" + ObjectName.quote("JoinGroup"), JoinGroupRequestAndHeader.readFrom),
+        HeartbeatKey -> ("request=" + ObjectName.quote("Heartbeat"), HeartbeatRequestAndHeader.readFrom)
     )
 
   def nameForKey(key: Short): String = {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ff106b4..104a7c5 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,6 +16,8 @@
  */
 package kafka.cluster
 
+import javax.management.ObjectName
+
 import kafka.common._
 import kafka.admin.AdminUtils
 import kafka.utils._
@@ -63,7 +65,7 @@ class Partition(val topic: String,
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
   newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+    "UnderReplicated", "topic=%s,partitionId=%s".format(ObjectName.quote(topic), ObjectName.quote(partitionId.toString)),
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..521a06b 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -1,5 +1,7 @@
 package kafka.common
 
+import javax.management.ObjectName
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -22,5 +24,5 @@ package kafka.common
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
 case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+  override def toString = if (clientId == "") "%s".format(brokerInfo) else "%s,%s".format(clientId, brokerInfo)
 }
diff --git a/core/src/main/scala/kafka/common/ClientIdTopic.scala b/core/src/main/scala/kafka/common/ClientIdTopic.scala
index a11607b..e02da40 100644
--- a/core/src/main/scala/kafka/common/ClientIdTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdTopic.scala
@@ -1,8 +1,35 @@
 package kafka.common
 
+import javax.management.ObjectName
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 /**
- * Created by wawanawna on 10/13/14.
+ * Convenience case class since (clientId, topic) pairs are used in the creation
+ * of many Stats objects.
  */
-class ClientIdTopic {
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
+  override def toString = if (clientId == "") "topic=%s".format(ObjectName.quote(topic)) else "%s,topic=%s".format(clientId, ObjectName.quote(topic))
+}
 
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+  override def toString = if (clientId == "") "allTopics=%s".format(ObjectName.quote("true")) else "%s,allTopics=%s".format(clientId, ObjectName.quote("true"))
 }
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..9f2d429 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -17,11 +17,12 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import org.I0Itec.zkclient.ZkClient
 import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
 import kafka.cluster.{Cluster, Broker}
 import scala.collection.immutable
-import scala.collection.Map
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
@@ -41,7 +42,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
-                                       config.clientId, config.numConsumerFetchers) {
+                                       "clientId=%s".format(ObjectName.quote(config.clientId)), config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
@@ -116,7 +117,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      "threadName=%s,%s,fetcherId=%s,sourceBrokerId=%s".format(ObjectName.quote("ConsumerFetcherThread"), consumerIdString, ObjectName.quote(fetcherId.toString), ObjectName.quote(sourceBroker.id.toString)),
       config, sourceBroker, partitionMap, this)
   }
 
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..ee6139c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index f63e6c5..7d010e2 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -20,12 +20,12 @@ package kafka.consumer
 import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAllTopics, ClientIdAndTopic, ClientIdTopic}
 
 @threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter("MessagesPerSec", metricId.toString, "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
 }
 
 /**
@@ -33,14 +33,14 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given consumer client.
  */
 class ConsumerTopicStats(clientId: String) extends Logging {
-  private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 5243f41..a03fd4b 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -18,14 +18,15 @@
 package kafka.consumer
 
 import java.util.concurrent.TimeUnit
+import javax.management.ObjectName
 
 import kafka.common.ClientIdAndBroker
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", metricId.toString, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram("FetchResponseSize", metricId.toString)
 }
 
 /**
@@ -35,12 +36,12 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
 class FetchRequestAndResponseStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
-  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "allBrokers=" + ObjectName.quote("true")))
 
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
   def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
@@ -56,7 +57,7 @@ object FetchRequestAndResponseStatsRegistry {
   }
 
   def removeConsumerFetchRequestAndResponseStats(clientId: String) {
-    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val pattern = (clientId + "ConsumerFetcherThread.*").r
     val keys = globalStats.keys
     for (key <- keys) {
       pattern.findFirstIn(key) match {
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d349a30..af1333f 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -17,6 +17,8 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import kafka.api._
 import kafka.network._
 import kafka.utils._
@@ -32,11 +34,9 @@ class SimpleConsumer(val host: String,
                      val soTimeout: Int,
                      val bufferSize: Int,
                      val clientId: String) extends Logging {
-
-  ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(ObjectName.quote(host), ObjectName.quote(port.toString))
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 0954b3c..cf5693b 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -17,6 +17,8 @@
 
 package kafka.consumer
 
+import javax.management.ObjectName
+
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
@@ -31,7 +33,7 @@ private[kafka] trait TopicCount {
 }
 
 case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
-  override def toString = "%s-%d".format(consumer, threadId)
+  override def toString = "%s,threadId=%s".format(consumer, ObjectName.quote(threadId.toString))
 
   def compare(that: ConsumerThreadId) = toString.compare(that.toString)
 }
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..6aab200 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -22,6 +22,7 @@ import java.util.UUID
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.ReentrantLock
+import javax.management.ObjectName
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -104,9 +105,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
   // useful for tracking migration of consumers to store offsets in kafka
-  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "clientId=%s".format(ObjectName.quote(config.clientId)), "commits", TimeUnit.SECONDS)
+  private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "clientId=%s".format(ObjectName.quote(config.clientId)), "commits", TimeUnit.SECONDS)
+  private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", "clientId=%s".format(ObjectName.quote(config.clientId)), TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -115,11 +116,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
-      consumerUuid = "%s-%d-%s".format(
-        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
-        uuid.getMostSignificantBits().toHexString.substring(0,8))
+      consumerUuid = "consumerHostName=%s,timestamp=%s,uuid=%s".format(
+        ObjectName.quote(InetAddress.getLocalHost.getHostName), ObjectName.quote(System.currentTimeMillis.toString),
+        ObjectName.quote(uuid.getMostSignificantBits().toHexString.substring(0,8)))
     }
-    config.groupId + "_" + consumerUuid
+    "groupId=" + ObjectName.quote(config.groupId) + "," + consumerUuid
   }
   this.logIdent = "[" + consumerIdString + "], "
 
@@ -229,7 +230,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, "clientId=%s".format(ObjectName.quote(config.clientId)))
         (queue, stream)
       })
     ).flatten.toList
@@ -518,12 +519,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val cond = lock.newCondition()
     
     @volatile private var allTopicsOwnedPartitionsCount = 0
-    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+    newGauge("OwnedPartitionsCount", "clientId=%s,groupId=%s,allTopics=%s".format(ObjectName.quote(config.clientId), ObjectName.quote(config.groupId), ObjectName.quote("true")), new Gauge[Int] {
       def value() = allTopicsOwnedPartitionsCount
     })
 
     private def ownedPartitionsCountMetricName(topic: String) =
-      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+      "clientId=%s,groupId=%s,topic=%s".format(ObjectName.quote(config.clientId), ObjectName.quote(config.groupId), ObjectName.quote(topic))
 
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
@@ -576,7 +577,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         for(partition <- infos.keys) {
           deletePartitionOwnershipFromZK(topic, partition)
         }
-        removeMetric(ownedPartitionsCountMetricName(topic))
+        removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricName(topic))
         localTopicRegistry.remove(topic)
       }
       allTopicsOwnedPartitionsCount = 0
@@ -679,7 +680,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
-              newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
+              newGauge("OwnedPartitionsCount", ownedPartitionsCountMetricName(topic), new Gauge[Int] {
                 def value() = partitionThreadPairs.size
               })
             }
@@ -801,7 +802,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchMessageMaxBytes),
-                                                 config.clientId)
+                                                 "clientId=%s".format(ObjectName.quote(config.clientId)))
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
       checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
@@ -863,7 +864,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        "FetchQueueSize", "clientId=%s,topicThreadId=%s,%s".format(ObjectName.quote(config.clientId), ObjectName.quote(topicThreadId._1), topicThreadId._2.toString),
         new Gauge[Int] {
           def value = q.size
         }
@@ -910,7 +911,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                           config.consumerTimeoutMs,
                                           keyDecoder,
                                           valueDecoder,
-                                          config.clientId)
+                                          "clientId=%s".format(ObjectName.quote(config.clientId)))
         (queue, stream)
     }).toList
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 51a5bad..4180080 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -174,14 +174,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
 
   newGauge(
-    "ActiveControllerCount",
+    "ActiveControllerCount", null,
     new Gauge[Int] {
       def value() = if (isActive) 1 else 0
     }
   )
 
   newGauge(
-    "OfflinePartitionsCount",
+    "OfflinePartitionsCount", null,
     new Gauge[Int] {
       def value(): Int = {
         inLock(controllerContext.controllerLock) {
@@ -195,7 +195,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   )
 
   newGauge(
-    "PreferredReplicaImbalanceCount",
+    "PreferredReplicaImbalanceCount", null,
     new Gauge[Int] {
       def value(): Int = {
         inLock(controllerContext.controllerLock) {
@@ -1335,6 +1335,6 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle
 }
 
 object ControllerStats extends KafkaMetricsGroup {
-  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
-  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", null, "elections", TimeUnit.SECONDS)
+  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", null, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index b2652dd..908e117 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -274,5 +274,5 @@ class FileMessageSet private[kafka](@volatile var file: File,
 }
 
 object LogFlushStats extends KafkaMetricsGroup {
-  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", null, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a123cdc..40c62e2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import javax.management.ObjectName
+
 import kafka.utils._
 import kafka.message._
 import kafka.common._
@@ -73,20 +75,32 @@ class Log(val dir: File,
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value = numberOfSegments })
 
-  newGauge(name + "-" + "LogStartOffset",
-           new Gauge[Long] { def value = logStartOffset })
+  val metricName = buildMetricName(Log.parseTopicPartitionName(name))
+
+  newGauge("NumLogSegments", metricName,
+    new Gauge[Int] {
+      def value = numberOfSegments
+    })
+
+
+  newGauge("LogEndOffset", metricName,
+    new Gauge[Long] {
+      def value = logEndOffset
+    })
 
-  newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value = logEndOffset })
-           
-  newGauge(name + "-" + "Size", 
-           new Gauge[Long] {def value = size})
+
+  newGauge("Size", metricName,
+    new Gauge[Long] {
+      def value = size
+    })
+
+  private def buildMetricName(topicPartition: TopicAndPartition) = {
+    "topic=%s,partitionId=%s".format(ObjectName.quote(topicAndPartition.topic), ObjectName.quote(topicAndPartition.partition.toString))
+  }
 
   /** The name of this log */
-  def name  = dir.getName()
+  def name = dir.getName
 
   /* Load the log segments from the log files on disk */
   private def loadSegments() {
@@ -152,7 +166,7 @@ class Log(val dir: File,
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0, new LogSegment(dir = dir, 
+      segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c20de4a..305ad2f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -78,7 +78,8 @@ class LogCleaner(val config: CleanerConfig,
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
                                         checkIntervalMs = 300, 
                                         throttleDown = true, 
-                                        "cleaner-io",
+                                        "CleanerIO",
+                                         null,
                                         "bytes",
                                         time = time)
   
@@ -86,12 +87,12 @@ class LogCleaner(val config: CleanerConfig,
   private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
   
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
-  newGauge("max-buffer-utilization-percent", 
+  newGauge("MaxBufferUtilizationPercent", null,
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt
            })
   /* a metric to track the recopy rate of each thread's last cleaning */
-  newGauge("cleaner-recopy-percent", 
+  newGauge("CleanerRecopyPercent", null,
            new Gauge[Int] {
              def value: Int = {
                val stats = cleaners.map(_.lastStats)
@@ -100,7 +101,7 @@ class LogCleaner(val config: CleanerConfig,
              }
            })
   /* a metric to track the maximum cleaning time for the last cleaning from each thread */
-  newGauge("max-clean-time-secs",
+  newGauge("MaxCleanTimeSecs", null,
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index e8ced6a..8ddaeb0 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -59,7 +59,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
-  newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
+  newGauge("MaxDirtyPercent", null, new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
 
   /**
    * @return the position processed for all logs.
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 2313a57..5ddcf5a 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -19,6 +19,7 @@ package kafka.metrics
 
 
 import java.util.concurrent.TimeUnit
+import javax.management.ObjectName
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, MetricName}
@@ -35,29 +36,51 @@ trait KafkaMetricsGroup extends Logging {
    * Creates a new MetricName object for gauges, meters, etc. created for this
    * metrics group.
    * @param name Descriptive name of the metric.
+   * @param mBeanName Descriptive mBeanName of the metric.
    * @return Sanitized metric name object.
    */
-  private def metricName(name: String) = {
+  private def metricName(name: String, mBeanName: String) = {
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-    new MetricName(pkg, simpleName, name)
+    if (mBeanName == null) new MetricName(pkg, simpleName, name)
+    else {
+      val nameBuilder: StringBuilder = new StringBuilder
+
+      nameBuilder.append(ObjectName.quote(pkg))
+
+      nameBuilder.append(":type=")
+
+      nameBuilder.append(ObjectName.quote(simpleName))
+
+      if (name.length > 0) {
+        nameBuilder.append(",name=")
+        nameBuilder.append(ObjectName.quote(name))
+      }
+
+      if (name.length > 0) {
+        nameBuilder.append(",")
+        nameBuilder.append(mBeanName)
+      }
+
+      new MetricName(pkg, simpleName, name, null, nameBuilder.toString())
+    }
   }
 
-  def newGauge[T](name: String, metric: Gauge[T]) =
-    Metrics.defaultRegistry().newGauge(metricName(name), metric)
+  def newGauge[T](name: String, mBeanName: String, metric: Gauge[T]) =
+    Metrics.defaultRegistry().newGauge(metricName(name, mBeanName), metric)
 
-  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
-    Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
+  def newMeter(name: String, mBeanName: String, eventType: String, timeUnit: TimeUnit) =
+    Metrics.defaultRegistry().newMeter(metricName(name, mBeanName), eventType, timeUnit)
 
-  def newHistogram(name: String, biased: Boolean = true) =
-    Metrics.defaultRegistry().newHistogram(metricName(name), biased)
+  def newHistogram(name: String, mBeanName: String, biased: Boolean = true) =
+    Metrics.defaultRegistry().newHistogram(metricName(name, mBeanName), biased)
 
-  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
-    Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+  def newTimer(name: String, mBeanName: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
+    Metrics.defaultRegistry().newTimer(metricName(name, mBeanName), durationUnit, rateUnit)
 
-  def removeMetric(name: String) =
-    Metrics.defaultRegistry().removeMetric(metricName(name))
+  def removeMetric(name: String, mBeanName: String) =
+    Metrics.defaultRegistry().removeMetric(metricName(name, mBeanName))
 
 }
 
@@ -68,70 +91,70 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
    */
   private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.consumer.ZookeeperConsumerConnector
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+    new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
 
     // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
 
     // kafka.consumer.ConsumerTopicStats
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
-    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
 
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
      * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
      */
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
   private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
     // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
-    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
 
     // kafka.producer.ProducerSendThread
-    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+    new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
 
     // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
   )
 
   def removeAllConsumerMetrics(clientId: String) {
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4560d8f..f544e23 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -18,6 +18,7 @@
 package kafka.network
 
 import java.util.concurrent._
+import javax.management.ObjectName
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.nio.ByteBuffer
@@ -115,19 +116,19 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
 
   newGauge(
-    "RequestQueueSize",
+    "RequestQueueSize", null,
     new Gauge[Int] {
       def value = requestQueue.size
     }
   )
 
-  newGauge("ResponseQueueSize", new Gauge[Int]{
+  newGauge("ResponseQueueSize", null, new Gauge[Int]{
     def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
   })
 
   for(i <- 0 until numProcessors) {
     newGauge(
-      "Processor-" + i + "-ResponseQueueSize",
+      "ResponseQueueSize", "ProcessorNum=" + ObjectName.quote(i.toString),
       new Gauge[Int] {
         def value = responseQueues(i).size()
       }
@@ -187,24 +188,24 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  val consumerFetchMetricName = "request=" + ObjectName.quote("FetchConsumer")
+  val followFetchMetricName = "request=" + ObjectName.quote("FetchFollower")
   (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val requestRate = newMeter("RequestsPerSec", name,  "requests", TimeUnit.SECONDS)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+  val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", name)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeMs")
+  val localTimeHist = newHistogram("LocalTimeMs", name)
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  val remoteTimeHist = newHistogram("RemoteTimeMs", name)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+  val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs",name)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+  val responseSendTimeHist = newHistogram("ResponseSendTimeMs", name)
+  val totalTimeHist = newHistogram("TotalTimeMs", name)
 }
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cee76b3..3e9456b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic._
 import java.net._
 import java.io._
 import java.nio.channels._
+import javax.management.ObjectName
 
 import scala.collection._
 
@@ -55,7 +56,7 @@ class SocketServer(val brokerId: Int,
   val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
   /* a meter to track the average free capacity of the network processors */
-  private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", null, "percent", TimeUnit.NANOSECONDS)
 
   /**
    * Start the socket server
@@ -67,7 +68,7 @@ class SocketServer(val brokerId: Int,
                                     time, 
                                     maxRequestSize, 
                                     aggregateIdleMeter,
-                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+                                    newMeter("IdlePercent", "NetworkProcessorNum=" + ObjectName.quote(i.toString), "percent", TimeUnit.NANOSECONDS),
                                     numProcessorThreads, 
                                     requestChannel,
                                     quotas,
@@ -75,7 +76,7 @@ class SocketServer(val brokerId: Int,
       Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
     }
 
-    newGauge("ResponsesBeingSent", new Gauge[Int] {
+    newGauge("ResponsesBeingSent", null, new Gauge[Int] {
       def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
     })
 
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index cd634f6..d169ceb 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -18,6 +18,7 @@ package kafka.producer
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import javax.management.ObjectName
 
 import kafka.common.QueueFullException
 import kafka.metrics._
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 1c46d72..51cd403 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -16,14 +16,16 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 import kafka.common.ClientIdAndBroker
 
 class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+  val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", metricId.toString, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram("ProducerRequestSize", metricId.toString)
 }
 
 /**
@@ -33,12 +35,12 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
 class ProducerRequestStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
   private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
-  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "allBrokers=" + ObjectName.quote("true")))
 
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
   def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 35e3aae..efe96db 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -16,14 +16,16 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val serializationErrorRate = newMeter("SerializationErrorsPerSec", "clientId=%s".format(ObjectName.quote(clientId)), "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter("ResendsPerSec", "clientId=%s".format(ObjectName.quote(clientId)), "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter("FailedSendsPerSec" ,"clientId=%s".format(ObjectName.quote(clientId)), "failed sends", TimeUnit.SECONDS)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 9bb1419..4f174fb 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -16,17 +16,19 @@
  */
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAndTopic, ClientIdAllTopics, ClientIdTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
 
 @threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter("MessagesPerSec", metricId.toString, "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter("DroppedMessagesPerSec", metricId.toString, "drops", TimeUnit.SECONDS)
 }
 
 /**
@@ -34,14 +36,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given producer client.
  */
 class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics("clientId=%s".format(ObjectName.quote(clientId)))) // to differentiate from a topic named AllTopics
 
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic("clientId=%s".format(ObjectName.quote(clientId)), topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..87aa3b6 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -17,6 +17,8 @@
 
 package kafka.producer
 
+import javax.management.ObjectName
+
 import kafka.api._
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
@@ -39,8 +41,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
-  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(ObjectName.quote(config.host), ObjectName.quote(config.port.toString))
+  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats("clientId=" + ObjectName.quote(config.clientId))
 
   trace("Instantiating Scala Sync Producer")
 
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 42e9c74..5ae57d3 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -17,6 +17,8 @@
 
 package kafka.producer.async
 
+import javax.management.ObjectName
+
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
 import collection.mutable.ArrayBuffer
@@ -34,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize",
+  newGauge("ProducerQueueSize", "clientId=%s".format(ObjectName.quote(clientId)),
           new Gauge[Int] {
             def value = queue.size
           })
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..1581353 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    "MaxLag", metricPrefix,
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
+    "MinFetchRate", metricPrefix,
     {
       new Gauge[Double] {
         // current min fetch rate across all fetchers/topics/partitions
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..31d1c94 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.cluster.Broker
 import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
@@ -45,13 +47,17 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+
+  private val brokerInfo = "%s,brokerHost=%s,brokerPort=%s".format(name, ObjectName.quote(sourceBroker.host), ObjectName.quote(sourceBroker.port.toString))
+  private val clientInfo = "clientId=%s".format(ObjectName.quote(clientId))
+  private val metricId = new ClientIdAndBroker(clientInfo, brokerInfo)
+
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientInfo + "," + name)
+
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
   val fetchRequestBuilder = new FetchRequestBuilder().
-          clientId(clientId).
+          clientId(clientInfo + "," + name).
           replicaId(fetcherBrokerId).
           maxWait(maxWait).
           minBytes(minBytes)
@@ -207,7 +213,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
   newGauge(
-    metricId + "-ConsumerLag",
+    "ConsumerLag", metricId.toString,
     new Gauge[Long] {
       def value = lagVal.get
     }
@@ -230,11 +236,12 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val requestRate = newMeter("RequestsPerSec", metricId.toString, "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter("BytesPerSec", metricId.toString, "bytes", TimeUnit.SECONDS)
 }
 
 case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+  override def toString = "%s,%s,topic=%s,partitionId=%s".format(clientId, brokerInfo,
+    ObjectName.quote(topic), ObjectName.quote(partitionId.toString))
 }
 
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
index 628ef59..96e432e 100644
--- a/core/src/main/scala/kafka/server/DelayedRequestKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.common.TopicAndPartition
 
 /**
@@ -27,12 +29,12 @@ trait DelayedRequestKey {
 }
 
 object DelayedRequestKey {
-  val globalLabel = "All"
+  val globalLabel = "allTopics=" + ObjectName.quote("true")
 }
 
 case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
 
   def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
 
-  override def keyLabel = "%s-%d".format(topic, partition)
+  override def keyLabel = "topic=%s,partitionId=%s".format(ObjectName.quote(topic), ObjectName.quote(partition.toString))
 }
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
index ed13188..943f91e 100644
--- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel
 import kafka.api.FetchResponseSend
@@ -31,9 +33,9 @@ class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: Requ
   this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
   private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
-    private val metricPrefix = if (forFollower) "Follower" else "Consumer"
+    private val metricPrefix = "source=" + ObjectName.quote((if (forFollower) "Follower" else "Consumer"))
 
-    val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", metricPrefix, "requests", TimeUnit.SECONDS)
   }
 
   private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..2dafaa4 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
@@ -72,7 +74,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
                               numThreads: Int) extends Logging with KafkaMetricsGroup {
 
   /* a meter to track the average free capacity of the request handlers */
-  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", null, "percent", TimeUnit.NANOSECONDS)
 
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
@@ -94,22 +96,22 @@ class KafkaRequestHandlerPool(val brokerId: Int,
 }
 
 class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
-  val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
-  val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
-  val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec",  "bytes", TimeUnit.SECONDS)
-  val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val messagesInRate = newMeter("MessagesInPerSec", name,  "messages", TimeUnit.SECONDS)
+  val bytesInRate = newMeter("BytesInPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val bytesOutRate = newMeter("BytesOutPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val bytesRejectedRate = newMeter("BytesRejectedPerSec", name,  "bytes", TimeUnit.SECONDS)
+  val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", name,  "requests", TimeUnit.SECONDS)
+  val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", name,  "requests", TimeUnit.SECONDS)
 }
 
 object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics("allTopics=" + ObjectName.quote("true"))
 
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut("topic=%s".format(ObjectName.quote(topic)))
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3e9e91f..8241c49 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -59,7 +59,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var zkClient: ZkClient = null
 
   newGauge(
-    "BrokerState",
+    "BrokerState", null,
     new Gauge[Int] {
       def value = brokerState.currentState
     }
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 43eb2a3..0360b74 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -100,13 +100,13 @@ class OffsetManager(val config: OffsetManagerConfig,
                      period = config.offsetsRetentionCheckIntervalMs,
                      unit = TimeUnit.MILLISECONDS)
 
-  newGauge("NumOffsets",
+  newGauge("NumOffsets", null,
     new Gauge[Int] {
       def value = offsetsCache.size
     }
   )
 
-  newGauge("NumGroups",
+  newGauge("NumGroups", null,
     new Gauge[Int] {
       def value = offsetsCache.keys.map(_.group).toSet.size
     }
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
index d4a7d4a..b037507 100644
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -31,7 +31,7 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
   this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
 
   private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
-    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+    val expiredRequestMeter = newMeter("ExpiresPerSecond", keyLabel, "requests", TimeUnit.SECONDS)
   }
 
   private val producerRequestMetricsForKey = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 351dbba..325610c 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -17,11 +17,13 @@
 
 package kafka.server
 
+import javax.management.ObjectName
+
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
-                                       "Replica", brokerConfig.numReplicaFetchers) {
+                                       "source=" + ObjectName.quote("Replica"), brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6879e73..9eb4ba4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -29,7 +29,7 @@ class ReplicaFetcherThread(name:String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = name,
+                                clientId = brokerConfig.brokerId.toString,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 78b7514..0535384 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig,
   var fetchRequestPurgatory: FetchRequestPurgatory = null
 
   newGauge(
-    "LeaderCount",
+    "LeaderCount", null,
     new Gauge[Int] {
       def value = {
           getLeaderPartitions().size
@@ -76,19 +76,19 @@ class ReplicaManager(val config: KafkaConfig,
     }
   )
   newGauge(
-    "PartitionCount",
+    "PartitionCount", null,
     new Gauge[Int] {
       def value = allPartitions.size
     }
   )
   newGauge(
-    "UnderReplicatedPartitions",
+    "UnderReplicatedPartitions", null,
     new Gauge[Int] {
       def value = underReplicatedPartitionCount()
     }
   )
-  val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
-  val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
+  val isrExpandRate = newMeter("IsrExpandsPerSec", null,  "expands", TimeUnit.SECONDS)
+  val isrShrinkRate = newMeter("IsrShrinksPerSec", null,  "shrinks", TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount(): Int = {
       getLeaderPartitions().count(_.isUnderReplicated)
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 9d76234..4d95815 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -76,14 +76,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
   private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
 
   newGauge(
-    "PurgatorySize",
+    "PurgatorySize", null,
     new Gauge[Int] {
       def value = watched()
     }
   )
 
   newGauge(
-    "NumDelayedRequests",
+    "NumDelayedRequests", null,
     new Gauge[Int] {
       def value = delayed()
     }
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b8698ee..3ae881f 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -194,9 +194,9 @@ object MirrorMaker extends Logging {
     // Since meter is calculated as total_recorded_value / time_window and
     // time_window is independent of the number of threads, each recorded wait
     // time should be discounted by # threads.
-    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
-    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
-    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
+    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", null, "percent", TimeUnit.NANOSECONDS)
+    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", null, "percent", TimeUnit.NANOSECONDS)
+    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size", null)
 
     def put(record: ProducerRecord) {
       // If the key of the message is empty, use round-robin to select the queue
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 7602b8d..c6451ea 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import java.net.URI
 import java.text.SimpleDateFormat
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
@@ -42,6 +42,7 @@ object SimpleConsumerPerformance {
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
+    ConsumerConfig.validateClientId(config.clientId)
     val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
 
     // reset to latest or smallest offset
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4f903b..126bab3 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -167,6 +167,7 @@ object SimpleConsumerShell extends Logging {
       System.exit(1)
     }
     if (startingOffset < 0) {
+      ConsumerConfig.validateClientId(clientId)
       val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
                                               ConsumerConfig.SocketBufferSize, clientId)
       try {
@@ -189,6 +190,7 @@ object SimpleConsumerShell extends Logging {
     val replicaString = if(replicaId > 0) "leader" else "replica"
     info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+    ConsumerConfig.validateClientId(clientId)
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index d1a144d..644a3de 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -37,11 +37,12 @@ class Throttler(val desiredRatePerSec: Double,
                 val checkIntervalMs: Long = 100L, 
                 val throttleDown: Boolean = true,
                 metricName: String = "throttler",
+                mBeanName: String = null,
                 units: String = "entries",
                 val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   
   private val lock = new Object
-  private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
+  private val meter = newMeter(metricName, mBeanName, units, TimeUnit.SECONDS)
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
   
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 41f334d..743c61d 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -56,7 +56,7 @@ object TestOffsetManager {
     private var offset = 0L
     val numErrors = new AtomicInteger(0)
     val numCommits = new AtomicInteger(0)
-    val timer = newTimer("commit-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+    val timer = newTimer("commit-thread", null, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
     private val commitTimer = new KafkaTimer(timer)
     val shutdownLock = new Object
 
@@ -106,7 +106,7 @@ object TestOffsetManager {
         extends ShutdownableThread("fetch-thread")
         with KafkaMetricsGroup {
 
-    private val timer = newTimer("fetch-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+    private val timer = newTimer("fetch-thread", null, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
     private val fetchTimer = new KafkaTimer(timer)
 
     private val channels = mutable.Map[Int, BlockingChannel]()
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..f565755 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -109,8 +109,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -134,8 +134,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer2,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -197,8 +197,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -222,8 +222,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer2,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -268,8 +268,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer0-0"),
-                           ("1", "group1_consumer0-0"))
+    val expected_2 = List( ("0", "groupId=\"group1\",consumer0,threadId=\"0\""),
+                           ("1", "groupId=\"group1\",consumer0,threadId=\"0\""))
     assertEquals(expected_2, actual_2)
 
     zkConsumerConnector1.shutdown
@@ -336,7 +336,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=\"group1\",consumer1,threadId=\"0\""))
     assertEquals(expected_1, actual_1)
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
