diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 8b9487c..2690674 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -61,7 +61,7 @@ class Partition(val topic: String,
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
   newGauge(
-    topic + "-" + partitionId + "-UnderReplicated",
+    topic + "|" + partitionId + "|UnderReplicated",
     new Gauge[Int] {
       def value = {
         if (isUnderReplicated) 1 else 0
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..7b52fe6 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -22,5 +22,5 @@ package kafka.common
  * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
  */
 case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
-  override def toString = "%s-%s".format(clientId, brokerInfo)
+  override def toString = "%s|%s".format(clientId, brokerInfo)
 }
diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
index 7acf9e7..1b0a5dd 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
@@ -22,6 +22,6 @@ package kafka.common
  * of many Stats objects.
  */
 case class ClientIdAndTopic(clientId: String, topic: String) {
-  override def toString = "%s-%s".format(clientId, topic)
+  override def toString = "%s|%s".format(clientId, topic)
 }
 
diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala
index d24fb0d..b304cc6 100644
--- a/core/src/main/scala/kafka/common/Config.scala
+++ b/core/src/main/scala/kafka/common/Config.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
 trait Config extends Logging {
 
   def validateChars(prop: String, value: String) {
-    val legalChars = "[a-zA-Z0-9\\._\\-]"
+    val legalChars = "[a-zA-Z0-9\\._\\-\\|]"
     val rgx = new Regex(legalChars + "*")
 
     rgx.findFirstIn(value) match {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..d98095e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
-        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+        extends AbstractFetcherManager("ConsumerFetcherManager|%d".format(SystemTime.milliseconds),
                                        config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
@@ -116,7 +116,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ConsumerFetcherThread(
-      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      "ConsumerFetcherThread|%s|%d|%d".format(consumerIdString, fetcherId, sourceBroker.id),
       config, sourceBroker, partitionMap, this)
   }
 
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..ee777d2 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
-                                      clientId = config.clientId + "-" + name,
+                                      clientId = config.clientId + "|" + name,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
                                       socketBufferSize = config.socketReceiveBufferBytes,
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index ff5f470..797a7a9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -40,7 +40,7 @@ class ConsumerTopicStats(clientId: String) extends Logging {
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
   def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "|"))
   }
 }
 
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 875eeeb..5c2f1a3 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -39,7 +39,7 @@ class FetchRequestAndResponseStats(clientId: String) {
   def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
 
   def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "|"))
   }
 }
 
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6dae149..fb339b8 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -35,7 +35,7 @@ class SimpleConsumer(val host: String,
   ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  val brokerInfo = "host_%s-port_%s".format(host, port)
+  val brokerInfo = "host_%s|port_%s".format(host, port)
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
   private var isClosed = false
 
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index e332633..7f0be74 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -35,7 +35,7 @@ private[kafka] trait TopicCount {
       val consumerSet = new mutable.HashSet[String]
       assert(nConsumers >= 1)
       for (i <- 0 until nConsumers)
-        consumerSet += consumerIdString + "-" + i
+        consumerSet += consumerIdString + "|" + i
       consumerThreadIdsPerTopicMap.put(topic, consumerSet)
     }
     consumerThreadIdsPerTopicMap
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 703b2e2..233054f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -103,11 +103,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       => consumerUuid = consumerId
       case None // generate unique consumerId automatically
       => val uuid = UUID.randomUUID()
-      consumerUuid = "%s-%d-%s".format(
+      consumerUuid = "%s|%d|%s".format(
         InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
         uuid.getMostSignificantBits().toHexString.substring(0,8))
     }
-    config.groupId + "_" + consumerUuid
+    config.groupId + "|" + consumerUuid
   }
   this.logIdent = "[" + consumerIdString + "], "
 
@@ -692,7 +692,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicThreadIdAndQueues.put(topicThreadId, q)
       debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
       newGauge(
-        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        config.clientId + "|" + config.groupId + "|" + topicThreadId._1 + "|" + topicThreadId._2 + "|FetchQueueSize",
         new Gauge[Int] {
           def value = q.size
         }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 21fb715..179e9f7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -211,7 +211,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   def epoch = controllerContext.epoch
 
-  def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
+  def clientId = "id_%d|host_%s|port_%d".format(config.brokerId, config.hostName, config.port)
 
   /**
    * On clean shutdown, the controller first determines the partitions that the
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 46df8d9..6d63da9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -70,15 +70,22 @@ class Log(val dir: File,
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
-  newGauge(name + "-" + "NumLogSegments",
+  val metricName = buildMetricName(dir.getName());
+
+  newGauge(metricName + "|" + "NumLogSegments",
            new Gauge[Int] { def value = numberOfSegments })
 
-  newGauge(name + "-" + "LogEndOffset",
+  newGauge(metricName + "|" + "LogEndOffset",
            new Gauge[Long] { def value = logEndOffset })
            
-  newGauge(name + "-" + "Size", 
+  newGauge(metricName + "|" + "Size",
            new Gauge[Long] {def value = size})
 
+  private def buildMetricName(name: String) = {
+  val lastIndexOfDash  = name.lastIndexOf("-");
+  new String(name.substring(0, lastIndexOfDash) + '|' + name.substring(lastIndexOfDash + 1));
+  }
+
   /** The name of this log */
   def name  = dir.getName()
 
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 9694220..936c3e6 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -38,7 +38,7 @@ class ProducerRequestStats(clientId: String) {
   def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
 
   def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
-    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "|"))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index ed209f4..e79afef 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -41,7 +41,7 @@ class ProducerTopicStats(clientId: String) {
   def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "|"))
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 041cfa5..f0fc9ca 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -37,7 +37,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.sendBufferBytes, config.requestTimeoutMs)
-  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
+  val brokerInfo = "host_%s|port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer")
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..26c0d07 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   this.logIdent = "[" + name + "] "
 
   newGauge(
-    metricPrefix + "-MaxLag",
+    metricPrefix + "|MaxLag",
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
       def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   )
 
   newGauge(
-    metricPrefix + "-MinFetchRate",
+    metricPrefix + "|MinFetchRate",
     {
       new Gauge[Double] {
         // current min fetch rate across all fetchers/topics/partitions
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index db7017b..cf69550 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -45,7 +45,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+  private val brokerInfo = "host_%s|port_%s".format(sourceBroker.host, sourceBroker.port)
   private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
@@ -206,7 +206,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] val lagVal = new AtomicLong(-1L)
   newGauge(
-    metricId + "-ConsumerLag",
+    metricId + "|ConsumerLag",
     new Gauge[Long] {
       def value = lagVal.get
     }
@@ -229,11 +229,11 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
-  val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+  val requestRate = newMeter(metricId + "|RequestsPerSec", "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "|BytesPerSec", "bytes", TimeUnit.SECONDS)
 }
 
 case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+  override def toString = "%s|%s|%s|%d".format(clientId, brokerInfo, topic, partitionId)
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 871212b..c96dfed 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -89,6 +89,6 @@ object BrokerTopicStats extends Logging {
   def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
 
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
-    stats.getAndMaybePut(topic + "-")
+    stats.getAndMaybePut(topic + "|")
   }
 }
