Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(working copy)
@@ -24,7 +24,6 @@
 import kafka.utils.{TestUtils, Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
-import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.zk.ZooKeeperTestHarness
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import java.util.{LinkedList, Properties}
+import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
 import junit.framework.Assert._
 import org.easymock.EasyMock
@@ -185,11 +185,11 @@
 
     val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[Int,String](config,
-                                                         partitioner = intPartitioner,
-                                                         encoder = null.asInstanceOf[Encoder[String]],
-                                                         keyEncoder = new IntEncoder(),
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                      partitioner = intPartitioner,
+                                                      encoder = null.asInstanceOf[Encoder[String]],
+                                                      keyEncoder = new IntEncoder(),
+                                                      producerPool = producerPool,
+                                                      topicPartitionInfos = topicPartitionInfos)
 
     val topic1Broker1Data = 
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -228,8 +228,7 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos
-    )
+                                                         topicPartitionInfos = topicPartitionInfos)
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@@ -257,7 +256,7 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with UnknownTopicOrPartitionException")
@@ -288,7 +287,7 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -335,7 +334,7 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos)
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -378,7 +377,7 @@
                                                           encoder = new StringEncoder,
                                                           keyEncoder = new StringEncoder,
                                                           producerPool = producerPool,
-                                                          topicPartitionInfos)
+                                                          topicPartitionInfos = topicPartitionInfos)
 
     val producer = new Producer[String, String](config, handler)
     try {
@@ -435,7 +434,7 @@
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos)
+                                                      topicPartitionInfos = topicPartitionInfos)
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -23,7 +23,6 @@
 import junit.framework.Assert._
 
 import kafka.cluster._
-import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -24,7 +24,6 @@
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
-import kafka.message.Message
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
 
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -25,7 +25,6 @@
 import kafka.utils.Utils
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer._
-import kafka.message.Message
 import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
Index: core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(working copy)
@@ -21,7 +21,6 @@
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import kafka.producer.{ProducerConfig, Producer}
-import kafka.message.Message
 import kafka.utils.TestUtils
 import kafka.serializer._
 
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import kafka.api.FetchRequestBuilder
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
Index: core/src/test/scala/unit/kafka/utils/TopicTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TopicTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/utils/TopicTest.scala	(working copy)
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidTopicException
-import org.junit.Test
-
-class TopicTest {
-
-  @Test
-  def testInvalidTopicNames() {
-    val invalidTopicNames = new ArrayBuffer[String]()
-    invalidTopicNames += ("", ".", "..")
-    var longName = "ATCG"
-    for (i <- 1 to 6)
-      longName += longName
-    invalidTopicNames += longName
-    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
-    for (weirdChar <- badChars) {
-      invalidTopicNames += "Is" + weirdChar + "funny"
-    }
-
-    for (i <- 0 until invalidTopicNames.size) {
-      try {
-        Topic.validate(invalidTopicNames(i))
-        fail("Should throw InvalidTopicException.")
-      }
-      catch {
-        case e: InvalidTopicException => "This is good."
-      }
-    }
-
-    val validTopicNames = new ArrayBuffer[String]()
-    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
-    for (i <- 0 until validTopicNames.size) {
-      try {
-        Topic.validate(validTopicNames(i))
-      }
-      catch {
-        case e: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-}
Index: core/src/test/scala/unit/kafka/utils/TopicClientIdTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TopicClientIdTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/utils/TopicClientIdTest.scala	(working copy)
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.{InvalidClientIdException, InvalidTopicException}
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        Topic.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+
+    val validTopicNames = new ArrayBuffer[String]()
+    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
+    for (i <- 0 until validTopicNames.size) {
+      try {
+        Topic.validate(validTopicNames(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+
+  @Test
+  def testInvalidClientIds() {
+    val invalidclientIds = new ArrayBuffer[String]()
+    invalidclientIds += (".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidclientIds += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
+    for (weirdChar <- badChars) {
+      invalidclientIds += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidclientIds.size) {
+      try {
+        Topic.validate(invalidclientIds(i))
+        fail("Should throw InvalidClientIdException.")
+      }
+      catch {
+        case e: InvalidClientIdException => "This is good."
+      }
+    }
+
+    val validTopicNames = new ArrayBuffer[String]()
+    validTopicNames += ("valid", "CLIENTID", "nAmEs", "ar6", "VaL1d", "_0-9_")
+    for (i <- 0 until validTopicNames.size) {
+      try {
+        ClientId.validate(validTopicNames(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(working copy)
@@ -23,8 +23,6 @@
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.serializer._
-import kafka.message.Message
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -20,7 +20,7 @@
 import kafka.consumer.SimpleConsumer
 import org.junit.Test
 import junit.framework.Assert._
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1410667)
+++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -29,7 +29,7 @@
 import kafka.javaapi.producer.Producer
 import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
-import kafka.utils.{Utils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
 
Index: core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
===================================================================
--- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala	(revision 1410667)
+++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka
 
 import consumer._
-import message.Message
 import utils.Utils
 import java.util.concurrent.CountDownLatch
 
Index: core/src/test/scala/other/kafka/TestKafkaAppender.scala
===================================================================
--- core/src/test/scala/other/kafka/TestKafkaAppender.scala	(revision 1410667)
+++ core/src/test/scala/other/kafka/TestKafkaAppender.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka
 
-import message.Message
 import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder
Index: core/src/main/scala/kafka/producer/DefaultPartitioner.scala
===================================================================
--- core/src/main/scala/kafka/producer/DefaultPartitioner.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/DefaultPartitioner.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.producer
 
-import kafka.utils.Utils
 
 import kafka.utils._
 
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -27,17 +27,17 @@
 
 
 object ProducerPool{
-  def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = {
+  def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker, producerRequestStat: ProducerRequestStat = new ProducerRequestStat("")): SyncProducer = {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
     if(configOpt.isDefined)
       props.putAll(configOpt.get.props.props)
-    new SyncProducer(new SyncProducerConfig(props))
+    new SyncProducer(new SyncProducerConfig(props), producerRequestStat)
   }
 }
 
-class ProducerPool(val config: ProducerConfig) extends Logging {
+class ProducerPool(val config: ProducerConfig, val producerRequestStat: ProducerRequestStat = new ProducerRequestStat("")) extends Logging {
   private val syncProducers = new HashMap[Int, SyncProducer]
   private val lock = new Object()
 
@@ -53,9 +53,9 @@
       newBrokers.foreach(b => {
         if(syncProducers.contains(b.id)){
           syncProducers(b.id).close()
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b, producerRequestStat))
         } else
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b, producerRequestStat))
       })
     }
   }
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -33,7 +33,7 @@
  * Send a message set.
  */
 @threadsafe
-class SyncProducer(val config: SyncProducerConfig) extends Logging {
+class SyncProducer(val config: SyncProducerConfig, private val producerRequestStat: ProducerRequestStat = new ProducerRequestStat("")) extends Logging {
   
   private val MaxConnectBackoffMs = 60000
   private var sentOnConnection = 0
@@ -89,9 +89,9 @@
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+    producerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
     var response: Receive = null
-    ProducerRequestStat.requestTimer.time {
+    producerRequestStat.requestTimer.time {
       response = doSend(producerRequest)
     }
     ProducerResponse.readFrom(response.buffer)
@@ -152,7 +152,7 @@
   }
 }
 
-object ProducerRequestStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram("ProducerRequestSize")
+class ProducerRequestStat(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
 }
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -27,7 +27,8 @@
 
 
 class Producer[K,V](config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V]) // for testing only
+                    private val eventHandler: EventHandler[K,V],
+                    private val producerTopicStat: ProducerTopicStat = new ProducerTopicStat("")) // for testing only
 extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
   if (config.batchSize > config.queueSize)
@@ -47,21 +48,32 @@
                                                        queue,
                                                        eventHandler, 
                                                        config.queueTime, 
-                                                       config.batchSize)
+                                                       config.batchSize,
+                                                       config.clientId)
       producerSendThread.start()
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
+  ClientId.validate(config.clientId)
+  val asyncProducerStats = new AsyncProducerStats(config.clientId)
+
   KafkaMetricsReporter.startReporters(config.props)
 
-  def this(config: ProducerConfig) =
+
+  def this(config: ProducerConfig, producerTopicStat: ProducerTopicStat) =
     this(config,
          new DefaultEventHandler[K,V](config,
                                       Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
                                       Utils.createObject[Encoder[V]](config.serializerClass, config.props),
                                       Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                      new ProducerPool(config)))
+                                      new ProducerPool(config, new ProducerRequestStat(config.clientId)),
+                                      producerStats = new ProducerStats(config.clientId),
+                                      producerTopicStat = producerTopicStat),
+         producerTopicStat)
 
+  def this(config: ProducerConfig) =
+    this(config, new ProducerTopicStat(config.clientId))
+
   /**
    * Sends the data, partitioned by key to the topic using either the
    * synchronous or the asynchronous producer
@@ -79,8 +91,8 @@
 
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
-      ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
-      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
+      producerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
+      producerTopicStat.getProducerAllTopicStat.messageRate.mark()
     }
   }
 
@@ -105,7 +117,7 @@
           }
       }
       if(!added) {
-        AsyncProducerStats.droppedMessageRate.mark()
+        asyncProducerStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + message.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
@@ -131,26 +143,26 @@
 }
 
 @threadsafe
-class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
+class ProducerTopicMetrics(name: String) extends KafkaMetricsGroup {
   val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
   val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object ProducerTopicStat {
-  private val valueFactory = (k: String) => new ProducerTopicStat(k)
-  private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ProducerTopicStat("AllTopics")
+class ProducerTopicStat(clientId: String) {
+  private val valueFactory = (k: String) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[String, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicStat = new ProducerTopicMetrics(clientId + "-AllTopics")
 
-  def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
+  def getProducerAllTopicStat(): ProducerTopicMetrics = allTopicStat
 
-  def getProducerTopicStat(topic: String): ProducerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
+  def getProducerTopicStat(topic: String): ProducerTopicMetrics = {
+    stats.getAndMaybePut(clientId + "-" + topic + "-")
   }
 }
 
-object ProducerStats extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter("SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter( "ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter("FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
 }
 
Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(working copy)
@@ -20,6 +20,6 @@
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 
-object AsyncProducerStats extends KafkaMetricsGroup {
-  val droppedMessageRate = newMeter("DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
+class AsyncProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -33,7 +33,9 @@
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+                               private val producerStats: ProducerStats = new ProducerStats(""),
+                               private val producerTopicStat: ProducerTopicStat = new ProducerTopicStat(""))
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
@@ -48,8 +50,8 @@
       serializedData.foreach{
         keyed => 
           val dataSize = keyed.message.payloadSize
-          ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
-          ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+          producerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
+          producerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
@@ -61,11 +63,11 @@
           // get topics of the outstanding produce requests and refresh metadata for those
           Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
           remainingRetries -= 1
-          ProducerStats.resendRate.mark()
+          producerStats.resendRate.mark()
         }
       }
       if(outstandingProduceRequests.size > 0) {
-        ProducerStats.failedSendRate.mark()
+        producerStats.failedSendRate.mark()
         error("Failed to send the following requests: " + outstandingProduceRequests)
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
@@ -111,7 +113,7 @@
           serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
         case t =>
-          ProducerStats.serializationErrorRate.mark()
+          producerStats.serializationErrorRate.mark()
           if (isSync) {
             throw t
           } else {
Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(revision 1410667)
+++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(working copy)
@@ -28,12 +28,13 @@
                               val queue: BlockingQueue[KeyedMessage[K,V]],
                               val handler: EventHandler[K,V],
                               val queueTime: Long,
-                              val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
+                              val batchSize: Int,
+                              val clientId: String = "") extends Thread(threadName) with Logging with KafkaMetricsGroup {
 
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge("ProducerQueueSize-" + getId,
+  newGauge(clientId + "ProducerQueueSize-" + getId,
           new Gauge[Int] {
             def getValue = queue.size
           })
Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(working copy)
@@ -34,7 +34,8 @@
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val enableShallowIterator: Boolean)
+                             val enableShallowIterator: Boolean,
+                             val consumerTopicStat: ConsumerTopicStat = new ConsumerTopicStat)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -48,8 +49,8 @@
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
-    ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
-    ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
+    consumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
+    consumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
     item
   }
 
Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala	(working copy)
@@ -22,19 +22,19 @@
 import kafka.metrics.KafkaMetricsGroup
 
 @threadsafe
-class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
+class ConsumerTopicMetrics(name: String) extends KafkaMetricsGroup {
   val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
   val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object ConsumerTopicStat extends Logging {
-  private val valueFactory = (k: String) => new ConsumerTopicStat(k)
-  private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ConsumerTopicStat("AllTopics")
+class ConsumerTopicStat(clientId: String = "") extends Logging {
+  private val valueFactory = (k: String) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[String, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStat = new ConsumerTopicMetrics(clientId + "-AllTopics")
 
-  def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
+  def getConsumerAllTopicStat(): ConsumerTopicMetrics = allTopicStat
 
-  def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
+  def getConsumerTopicStat(topic: String): ConsumerTopicMetrics = {
+    stats.getAndMaybePut(clientId + "-" + topic + "-")
   }
 }
Index: core/src/main/scala/kafka/consumer/KafkaStream.scala
===================================================================
--- core/src/main/scala/kafka/consumer/KafkaStream.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/KafkaStream.scala	(working copy)
@@ -26,11 +26,12 @@
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val enableShallowIterator: Boolean)
+                        val enableShallowIterator: Boolean,
+                        val consumerTopicStat: ConsumerTopicStat)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStat)
 
   /**
    *  Create an iterator over messages in the stream.
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -31,12 +31,12 @@
 
 object SimpleConsumer extends Logging {
   def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
-                             isFromOrdinaryConsumer: Boolean): Long = {
+                             fetchRequestAndResponseStat: FetchRequestAndResponseStat, isFromOrdinaryConsumer: Boolean): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
-                                          ConsumerConfig.SocketBufferSize)
+                                          ConsumerConfig.SocketBufferSize, fetchRequestAndResponseStat)
       val topicAndPartition = TopicAndPartition(topic, partitionId)
       val request = if(isFromOrdinaryConsumer)
         new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
@@ -56,14 +56,15 @@
   }
 
   def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
-                             earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+                             earliestOrLatest: Long, fetchRequestAndResponseStat: FetchRequestAndResponseStat,
+                             isFromOrdinaryConsumer: Boolean = true): Long = {
     val cluster = getCluster(zkClient)
     val broker = cluster.getBroker(brokerId) match {
       case Some(b) => b
       case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
                                                     "getOffsetsBefore request")
     }
-    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer)
+    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, fetchRequestAndResponseStat, isFromOrdinaryConsumer)
   }
 }
 
@@ -75,7 +76,8 @@
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) extends Logging {
+                     val bufferSize: Int,
+                     val fetchRequestAndResponseStat: FetchRequestAndResponseStat = new FetchRequestAndResponseStat) extends Logging {
 
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
@@ -143,12 +145,12 @@
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    FetchRequestAndResponseStat.requestTimer.time {
+    fetchRequestAndResponseStat.requestTimer.time {
       response = sendRequest(request)
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -166,7 +168,7 @@
   }
 }
 
-object FetchRequestAndResponseStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram("FetchResponseSize")
+class FetchRequestAndResponseStat(clientId: String = "") extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
 }
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -28,7 +28,8 @@
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
-                         private val fetchSize: AtomicInteger) extends Logging {
+                         private val fetchSize: AtomicInteger,
+                         private val consumerTopicStat: ConsumerTopicStat = new ConsumerTopicStat) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@@ -58,8 +59,8 @@
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       fetchedOffset.set(next)
       debug("updated fetch offset of (%s) to %d".format(this, next))
-      ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
-      ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
+      consumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
+      consumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
     }
   }
   
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1410667)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -35,7 +35,6 @@
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
-import kafka.producer.ProducerConfig
 
 
 /**
@@ -94,6 +93,10 @@
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
+  ClientId.validate(config.clientId)
+  private val consumerTopicStat = new ConsumerTopicStat(config.clientId)
+  private val fetchRequestAndResponseStat = new FetchRequestAndResponseStat(config.clientId)
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -195,7 +198,7 @@
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStat)
         (queue, stream)
       })
     ).flatten.toList
@@ -595,9 +598,9 @@
           case None =>
             config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, fetchRequestAndResponseStat)
               case OffsetRequest.LargestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, fetchRequestAndResponseStat)
               case _ =>
                 throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }
@@ -605,13 +608,8 @@
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
-      val partTopicInfo = new PartitionTopicInfo(topic,
-                                                 leader,
-                                                 partition,
-                                                 queue,
-                                                 consumedOffset,
-                                                 fetchedOffset,
-                                                 new AtomicInteger(config.fetchSize))
+      val partTopicInfo = new PartitionTopicInfo(topic, leader, partition, queue, consumedOffset, fetchedOffset,
+                                                 new AtomicInteger(config.fetchSize), consumerTopicStat)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
@@ -667,7 +665,7 @@
       val q = e._2._1
       topicThreadIdAndQueues.put(topicThreadId, q)
       newGauge(
-        config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
           def getValue = q.size
         }
@@ -714,7 +712,8 @@
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.enableShallowIterator)
+                                          config.enableShallowIterator,
+                                          consumerTopicStat)
         (queue, stream)
     }).toList
 
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision 1410667)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(working copy)
@@ -19,12 +19,10 @@
 
 import joptsimple._
 import kafka.utils._
-import kafka.producer.ProducerConfig
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
-import java.util.Properties
 import scala.collection.JavaConversions._
 
 /**
@@ -124,6 +122,8 @@
                        .maxWait(maxWaitMs)
                        .minBytes(ConsumerConfig.MinFetchBytes)
 
+    val fetchRequestAndResponseStat = new FetchRequestAndResponseStat(clientId)
+
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
@@ -167,7 +167,7 @@
       System.exit(1)
     }
     if(startingOffset < 0)
-      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false)
+      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, fetchRequestAndResponseStat, false)
 
     // initializing formatter
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@@ -175,7 +175,7 @@
 
     info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
-    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
+    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, fetchRequestAndResponseStat)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset
Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala
===================================================================
--- core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(revision 1410667)
+++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(working copy)
@@ -24,7 +24,7 @@
 import kafka.consumer._
 import kafka.utils.{Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.CompressionCodec
 
 object ReplayLogProducer extends Logging {
 
Index: core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
===================================================================
--- core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala	(revision 1410667)
+++ core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import org.I0Itec.zkclient.ZkClient
-import kafka.consumer.{SimpleConsumer, ConsumerConfig}
+import kafka.consumer.{FetchRequestAndResponseStat, SimpleConsumer, ConsumerConfig}
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, Utils}
@@ -65,7 +65,7 @@
 
       ZkUtils.getBrokerInfo(zkClient, broker) match {
         case Some(brokerInfo) =>
-          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, new FetchRequestAndResponseStat(config.clientId))
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
           val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
Index: core/src/main/scala/kafka/tools/MirrorMaker.scala
===================================================================
--- core/src/main/scala/kafka/tools/MirrorMaker.scala	(revision 1410667)
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.tools
 
-import kafka.message.Message
 import joptsimple.OptionParser
 import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 1410667)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(working copy)
@@ -28,7 +28,7 @@
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) ,
+                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,
Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision 1410667)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.cluster.Broker
-import kafka.consumer.SimpleConsumer
+import kafka.consumer.{FetchRequestAndResponseStat, SimpleConsumer}
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
@@ -27,7 +27,7 @@
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, ShutdownableThread}
+import kafka.utils.{ClientId, Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 
 
@@ -38,10 +38,15 @@
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
 
+  ClientId.validate(clientId)
+
   private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val fetchMapLock = new Object
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
-  val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+  private val fetchRequestAndResponseStat = new FetchRequestAndResponseStat(clientId)
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, fetchRequestAndResponseStat)
+  val fetcherStat = new FetcherStat(clientId)
+  val fetcherMetrics = fetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+  val fetcherLagMetrics = new FetcherLagStat(clientId)
 
   val fetchRequestuilder = new FetchRequestBuilder().
           clientId(clientId).
@@ -110,7 +115,7 @@
                     case None => currentOffset.get
                   }
                   fetchMap.put(topicAndPartition, newOffset)
-                  FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+                  fetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
@@ -177,25 +182,25 @@
   def lag = lagVal.get
 }
 
-object FetcherLagMetrics {
+class FetcherLagStat(clientId: String) {
   private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
   private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut( (topic, partitionId) )
+    stats.getAndMaybePut( (clientId + "-" + topic, partitionId) )
   }
 }
 
-class FetcherStat(name: String) extends KafkaMetricsGroup {
+class FetcherMetrics(name: String) extends KafkaMetricsGroup {
   val requestRate = newMeter(name + "RequestsPerSec",  "requests", TimeUnit.SECONDS)
   val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object FetcherStat {
-  private val valueFactory = (k: String) => new FetcherStat(k)
-  private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+class FetcherStat(clientId: String) {
+  private val valueFactory = (k: String) => new FetcherMetrics(k)
+  private val stats = new Pool[String, FetcherMetrics](Some(valueFactory))
 
-  def getFetcherStat(name: String): FetcherStat = {
-    stats.getAndMaybePut(name)
+  def getFetcherStat(name: String): FetcherMetrics = {
+    stats.getAndMaybePut(clientId + "-" + name)
   }
 }
Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(revision 1410667)
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -16,7 +16,6 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.message.Message
 import kafka.serializer._
 import kafka.consumer._
 import scala.collection.JavaConversions.asList
Index: core/src/main/scala/kafka/common/InvalidClientIdException.scala
===================================================================
--- core/src/main/scala/kafka/common/InvalidClientIdException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/InvalidClientIdException.scala	(working copy)
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidClientIdException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
Index: core/src/main/scala/kafka/utils/ClientId.scala
===================================================================
--- core/src/main/scala/kafka/utils/ClientId.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/ClientId.scala	(working copy)
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidClientIdException
+import util.matching.Regex
+
+object ClientId {
+  val legalChars = "[a-zA-Z0-9_-]"
+  val maxNameLength = 200 // arbitrary
+  private val rgx = new Regex(legalChars + "+")
+
+  def validate(clientId: String) {
+    if (clientId.equals(""))
+      return
+    if (clientId.length > maxNameLength)
+      throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(clientId) match {
+      case Some(t) =>
+        if (!t.equals(clientId))
+          throw new InvalidClientIdException("Clientid " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
Index: core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(revision 1410667)
+++ core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(working copy)
@@ -24,7 +24,6 @@
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 
Index: core/src/main/scala/kafka/serializer/Encoder.scala
===================================================================
--- core/src/main/scala/kafka/serializer/Encoder.scala	(revision 1410667)
+++ core/src/main/scala/kafka/serializer/Encoder.scala	(working copy)
@@ -18,8 +18,6 @@
 package kafka.serializer
 
 import kafka.utils.VerifiableProperties
-import kafka.message._
-import kafka.utils.Utils
 
 /**
  * An encoder is a method of turning objects into byte arrays.
Index: core/src/main/scala/kafka/api/FetchRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchRequest.scala	(revision 1410667)
+++ core/src/main/scala/kafka/api/FetchRequest.scala	(working copy)
@@ -33,7 +33,7 @@
   val CurrentVersion = 1.shortValue()
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
-  val ReplicaFetcherClientId = "replica fetcher"
+  val ReplicaFetcherClientId = "replica-fetcher"
   val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
Index: perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(revision 1410667)
+++ perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(working copy)
@@ -20,7 +20,7 @@
 import java.net.URI
 import java.text.SimpleDateFormat
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
+import kafka.consumer.{FetchRequestAndResponseStat, SimpleConsumer}
 import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
@@ -42,7 +42,7 @@
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
-    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
+    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, new FetchRequestAndResponseStat(config.clientId))
 
     // reset to latest or smallest offset
     val topicAndPartition = TopicAndPartition(config.topic, config.partition)
