diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ff106b4..c5d8da7 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -63,7 +63,7 @@ class Partition(val topic: String,
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
   newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+    "topic=%s,partitionId=%d,UnderReplicated".format(topic, partitionId),
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..0c4cf6d 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -22,5 +22,5 @@ package kafka.common
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
 case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+  override def toString = "clientId=%s,%s".format(clientId, brokerInfo)
 }
diff --git a/core/src/main/scala/kafka/common/ClientIdTopic.scala b/core/src/main/scala/kafka/common/ClientIdTopic.scala
index a11607b..192ce40 100644
--- a/core/src/main/scala/kafka/common/ClientIdTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdTopic.scala
@@ -1,8 +1,33 @@
 package kafka.common
 
 /**
- * Created by wawanawna on 10/13/14.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-class ClientIdTopic {
 
+/**
+ * Convenience case class since (clientId, topic) pairs are used in the creation
+ * of many Stats objects.
+ */
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
+  override def toString = "clientId=%s,topic=%s".format(clientId, topic)
+}
+
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+  override def toString = "clientId=%s,AllTopics".format(clientId)
 }
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..c949cca 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
-                                       config.clientId, config.numConsumerFetchers) {
+                                       "clientId=%s".format(config.clientId), config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
@@ -116,7 +116,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      "ConsumerFetcherThread,%s,fetcherId=%d,sourceBrokerId=%d".format(consumerIdString, fetcherId, sourceBroker.id),
       config, sourceBroker, partitionMap, this)
   }
 
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..5d4767c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId + "," + name,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index f63e6c5..f2e009d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -20,12 +20,12 @@ package kafka.consumer
 import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAllTopics, ClientIdAndTopic, ClientIdTopic}
 
 @threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + ",MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + ",BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
 /**
@@ -33,14 +33,14 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given consumer client.
  */
 class ConsumerTopicStats(clientId: String) extends Logging {
-  private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 5243f41..13b4969 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -24,8 +24,8 @@ import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+  val requestTimer = new KafkaTimer(newTimer(metricId + ",FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + ",FetchResponseSize")
 }
 
 /**
@@ -40,7 +40,7 @@ class FetchRequestAndResponseStats(clientId: String) {
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
   def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
@@ -56,7 +56,7 @@ object FetchRequestAndResponseStatsRegistry {
   }
 
   def removeConsumerFetchRequestAndResponseStats(clientId: String) {
-    val pattern = (clientId + "-ConsumerFetcherThread.*").r
+    val pattern = (clientId + ",ConsumerFetcherThread.*").r
     val keys = globalStats.keys
     for (key <- keys) {
       pattern.findFirstIn(key) match {
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d349a30..25946fd 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -32,11 +32,9 @@ class SimpleConsumer(val host: String,
                      val soTimeout: Int,
                      val bufferSize: Int,
                      val clientId: String) extends Logging {
-
-  ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(host, port)
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 0954b3c..8d250e0 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -31,7 +31,7 @@ private[kafka] trait TopicCount {
 }
 
 case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
-  override def toString = "%s-%d".format(consumer, threadId)
+  override def toString = "%s,threadId=%d".format(consumer, threadId)
 
   def compare(that: ConsumerThreadId) = toString.compare(that.toString)
 }
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbc680f..fd21c4b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
   // useful for tracking migration of consumers to store offsets in kafka
-  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
-  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  private val kafkaCommitMeter = newMeter("clientId=%s,KafkaCommitsPerSec".format(config.clientId), "commits", TimeUnit.SECONDS)
+  private val zkCommitMeter = newMeter("clientId=%s,ZooKeeperCommitsPerSec".format(config.clientId), "commits", TimeUnit.SECONDS)
+  private val rebalanceTimer = new KafkaTimer(newTimer("clientId=%s,RebalanceRateAndTime".format(config.clientId), TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -115,11 +115,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
-      consumerUuid = "%s-%d-%s".format(
+      consumerUuid = "consumerHostName=%s,timestamp=%d,uuid=%s".format(
         InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
         uuid.getMostSignificantBits().toHexString.substring(0,8))
     }
-    config.groupId + "_" + consumerUuid
+    "groupId=" + config.groupId + "," + consumerUuid
   }
   this.logIdent = "[" + consumerIdString + "], "
 
@@ -518,12 +518,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val cond = lock.newCondition()
     
     @volatile private var allTopicsOwnedPartitionsCount = 0
-    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+    newGauge("clientId=%s,groupId=%s,AllTopics,OwnedPartitionsCount".format(config.clientId, config.groupId), new Gauge[Int] {
       def value() = allTopicsOwnedPartitionsCount
     })
 
     private def ownedPartitionsCountMetricName(topic: String) =
-      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+      "clientId=%s,groupId=%s,topic=%s,OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
 
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
@@ -863,7 +863,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        "clientId=%s,groupId=%s,topicThreadId=%s,%s,FetchQueueSize".format(config.clientId, config.groupId, topicThreadId._1, topicThreadId._2),
         new Gauge[Int] {
           def value = q.size
         }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a123cdc..4e472a8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -73,20 +73,32 @@ class Log(val dir: File,
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value = numberOfSegments })
 
-  newGauge(name + "-" + "LogStartOffset",
-           new Gauge[Long] { def value = logStartOffset })
+  val metricName = buildMetricName(Log.parseTopicPartitionName(name))
 
-  newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value = logEndOffset })
-           
-  newGauge(name + "-" + "Size", 
-           new Gauge[Long] {def value = size})
+  newGauge(metricName + "," + "NumLogSegments",
+    new Gauge[Int] {
+      def value = numberOfSegments
+    })
+
+
+  newGauge(metricName + "," + "LogEndOffset",
+    new Gauge[Long] {
+      def value = logEndOffset
+    })
+
+
+  newGauge(metricName + "," + "Size",
+    new Gauge[Long] {
+      def value = size
+    })
+
+  private def buildMetricName(topicPartition: TopicAndPartition) = {
+    "topic=%s,partitionId=%s".format(topicAndPartition.topic, topicAndPartition.partition)
+  }
 
   /** The name of this log */
-  def name  = dir.getName()
+  def name = dir.getName
 
   /* Load the log segments from the log files on disk */
   private def loadSegments() {
@@ -152,7 +164,7 @@ class Log(val dir: File,
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at offset 0
-      segments.put(0, new LogSegment(dir = dir, 
+      segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 2313a57..119cb79 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -68,70 +68,70 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
    */
   private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
     // kafka.consumer.ZookeeperConsumerConnector
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
-    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", ",FetchQueueSize"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", ",KafkaCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", ",ZooKeeperCommitsPerSec"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", ",RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", ",OwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopics,OwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
-    new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", ",MaxLag"),
+    new MetricName("kafka.consumer", "ConsumerFetcherManager", ",MinFetchRate"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+    new MetricName("kafka.server", "FetcherLagMetrics", ",ConsumerLag"),
 
     // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", ",MessagesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "AllTopics,MessagesPerSec"),
 
     // kafka.consumer.ConsumerTopicStats
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", ",BytesPerSec"),
+    new MetricName("kafka.consumer", "ConsumerTopicMetrics", "AllTopics,BytesPerSec"),
 
     // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
-    new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
-    new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+    new MetricName("kafka.server", "FetcherStats", ",BytesPerSec"),
+    new MetricName("kafka.server", "FetcherStats", ",RequestsPerSec"),
 
     // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
-    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", ",FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", ",FetchRequestRateAndTimeMs"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "AllBrokers,FetchResponseSize"),
+    new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "AllBrokers,FetchRequestRateAndTimeMs"),
 
     /**
      * ProducerRequestStats <-- SyncProducer
      * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
      */
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", ",ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", ",ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "AllBrokers,ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "AllBrokers,ProducerRequestSize")
   )
 
   private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
     // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
-    new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
-    new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", ",SerializationErrorsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", ",ResendsPerSec"),
+    new MetricName("kafka.producer", "ProducerStats", ",FailedSendsPerSec"),
 
     // kafka.producer.ProducerSendThread
-    new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+    new MetricName("kafka.producer.async", "ProducerSendThread", ",ProducerQueueSize"),
 
     // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
-    new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", ",MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", ",DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", ",BytesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "AllTopics,MessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "AllTopics,DroppedMessagesPerSec"),
+    new MetricName("kafka.producer", "ProducerTopicMetrics", "AllTopics,BytesPerSec"),
 
     // kafka.producer.ProducerRequestStats <-- SyncProducer
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
-    new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+    new MetricName("kafka.producer", "ProducerRequestMetrics", ",ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", ",ProducerRequestSize"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "AllBrokers,ProducerRequestRateAndTimeMs"),
+    new MetricName("kafka.producer", "ProducerRequestMetrics", "AllBrokers,ProducerRequestSize")
   )
 
   def removeAllConsumerMetrics(clientId: String) {
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4560d8f..7a66d34 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -127,7 +127,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
   for(i <- 0 until numProcessors) {
     newGauge(
-      "Processor-" + i + "-ResponseQueueSize",
+      "ProcessorNum=" + i + ",ResponseQueueSize",
       new Gauge[Int] {
         def value = responseQueues(i).size()
       }
@@ -187,24 +187,24 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + ",Consumer"
+  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + ",Follower"
   (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val requestRate = newMeter(name + ",RequestsPerSec",  "requests", TimeUnit.SECONDS)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+  val requestQueueTimeHist = newHistogram(name + ",RequestQueueTimeMs")
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeMs")
+  val localTimeHist = newHistogram(name + ",LocalTimeMs")
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+  val remoteTimeHist = newHistogram(name + ",RemoteTimeMs")
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+  val responseQueueTimeHist = newHistogram(name + ",ResponseQueueTimeMs")
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+  val responseSendTimeHist = newHistogram(name + ",ResponseSendTimeMs")
+  val totalTimeHist = newHistogram(name + ",TotalTimeMs")
 }
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cee76b3..d22a241 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int,
                                     time, 
                                     maxRequestSize, 
                                     aggregateIdleMeter,
-                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+                                    newMeter("NetworkProcessorNum=" + i + ",IdlePercent", "percent", TimeUnit.NANOSECONDS),
                                     numProcessorThreads, 
                                     requestChannel,
                                     quotas,
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 1c46d72..757e316 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -22,8 +22,8 @@ import kafka.utils.Pool
 import kafka.common.ClientIdAndBroker
 
 class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+  val requestTimer = new KafkaTimer(newTimer(metricId + ",ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(metricId + ",ProducerRequestSize")
 }
 
 /**
@@ -38,7 +38,7 @@ class ProducerRequestStats(clientId: String) {
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
   def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 35e3aae..2036aa7 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -21,9 +21,9 @@ import java.util.concurrent.TimeUnit
 import kafka.utils.Pool
 
 class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val serializationErrorRate = newMeter("clientId=%s,SerializationErrorsPerSec".format(clientId), "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter("clientId=%s,ResendsPerSec".format(clientId), "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter("clientId=%s,FailedSendsPerSec".format(clientId), "failed sends", TimeUnit.SECONDS)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 9bb1419..edc77e1 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -17,16 +17,16 @@
 package kafka.producer
 
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdAndTopic, ClientIdAllTopics, ClientIdTopic}
 import kafka.utils.{Pool, threadsafe}
 import java.util.concurrent.TimeUnit
 
 
 @threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + ",MessagesPerSec", "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + ",BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(metricId + ",DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
 }
 
 /**
@@ -34,14 +34,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
  * @param clientId The clientId of the given producer client.
  */
 class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+  private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
 
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..ba4e16d 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -39,7 +39,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
+  val brokerInfo = "brokerHost=%s,brokerPort=%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer")
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 42e9c74..15f6f1a 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,7 +34,7 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize",
+  newGauge("clientId=%s-ProducerQueueSize".format(clientId),
           new Gauge[Int] {
             def value = queue.size
           })
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..f71c1eb 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    metricPrefix + ",MaxLag",
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
+    metricPrefix + ",MinFetchRate",
     {
       new Gauge[Double] {
         // current min fetch rate across all fetchers/topics/partitions
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..c42d2d1 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -46,7 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+  private val brokerInfo = "brokerHost=%s,brokerPort=%s".format(sourceBroker.host, sourceBroker.port)
   private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
@@ -207,7 +207,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
   newGauge(
-    metricId + "-ConsumerLag",
+    metricId + ",ConsumerLag",
     new Gauge[Long] {
       def value = lagVal.get
     }
@@ -230,11 +230,11 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val requestRate = newMeter(metricId + ",RequestsPerSec", "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + ",BytesPerSec", "bytes", TimeUnit.SECONDS)
 }
 
 case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+  override def toString = "clientId=%s,%s,topic=%s,partitionId=%d".format(clientId, brokerInfo, topic, partitionId)
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..360964f 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,11 +105,11 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
 object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+  private val allTopicsStats = new BrokerTopicMetrics("AllTopics,")
 
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut("topic=%s,".format(topic))
   }
 }
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 7602b8d..c6451ea 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import java.net.URI
 import java.text.SimpleDateFormat
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
@@ -42,6 +42,7 @@ object SimpleConsumerPerformance {
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
+    ConsumerConfig.validateClientId(config.clientId)
     val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
 
     // reset to latest or smallest offset
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4f903b..126bab3 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -167,6 +167,7 @@ object SimpleConsumerShell extends Logging {
       System.exit(1)
     }
     if (startingOffset < 0) {
+      ConsumerConfig.validateClientId(clientId)
       val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
                                               ConsumerConfig.SocketBufferSize, clientId)
       try {
@@ -189,6 +190,7 @@ object SimpleConsumerShell extends Logging {
     val replicaString = if(replicaId > 0) "leader" else "replica"
     info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+    ConsumerConfig.validateClientId(clientId)
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..3152953 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -109,8 +109,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=group1,consumer1,threadId=0"),
+                           ("1", "groupId=group1,consumer1,threadId=0"))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -134,8 +134,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=group1,consumer1,threadId=0"),
+                           ("1", "groupId=group1,consumer2,threadId=0"))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -197,8 +197,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=group1,consumer1,threadId=0"),
+                           ("1", "groupId=group1,consumer1,threadId=0"))
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
@@ -222,8 +222,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer1-0"),
-                           ("1", "group1_consumer2-0"))
+    val expected_2 = List( ("0", "groupId=group1,consumer1,threadId=0"),
+                           ("1", "groupId=group1,consumer2,threadId=0"))
     assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
@@ -268,8 +268,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_2 = List( ("0", "group1_consumer0-0"),
-                           ("1", "group1_consumer0-0"))
+    val expected_2 = List( ("0", "groupId=group1,consumer0,threadId=0"),
+                           ("1", "groupId=group1,consumer0,threadId=0"))
     assertEquals(expected_2, actual_2)
 
     zkConsumerConnector1.shutdown
@@ -336,7 +336,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"))
+    val expected_1 = List( ("0", "groupId=group1,consumer1,threadId=0"))
     assertEquals(expected_1, actual_1)
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
