Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(working copy)
@@ -24,7 +24,6 @@
 import kafka.utils.{TestUtils, Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
-import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.zk.ZooKeeperTestHarness
@@ -57,7 +56,7 @@
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
     serverZk = TestUtils.createServer(config);
-    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
+    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
   }
 
   @After
Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -53,7 +53,7 @@
     logDir = new File(logDirPath)
     time = new MockTime()
     server = TestUtils.createServer(new KafkaConfig(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
   }
 
   @After
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import java.util.{LinkedList, Properties}
+import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
 import junit.framework.Assert._
 import org.easymock.EasyMock
@@ -68,7 +68,10 @@
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(12)
-    val producer = new Producer[String, String](config, mockEventHandler)
+    val producer = new Producer[String, String](config,
+                                                mockEventHandler,
+                                                new ProducerStats(""),
+                                                new ProducerTopicStats(""))
     try {
       // send all 10 messages, should hit the batch size and then reach broker
       producer.send(produceData: _*)
@@ -118,7 +121,7 @@
 
     val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5)
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5, "")
     producerSendThread.start()
 
     for (producerData <- producerDataList)
@@ -143,7 +146,7 @@
     val queueExpirationTime = 200
     val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5)
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5, "")
     producerSendThread.start()
 
     for (producerData <- producerDataList)
@@ -185,11 +188,13 @@
 
     val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[Int,String](config,
-                                                         partitioner = intPartitioner,
-                                                         encoder = null.asInstanceOf[Encoder[String]],
-                                                         keyEncoder = new IntEncoder(),
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                      partitioner = intPartitioner,
+                                                      encoder = null.asInstanceOf[Encoder[String]],
+                                                      keyEncoder = new IntEncoder(),
+                                                      producerPool = producerPool,
+                                                      topicPartitionInfos = topicPartitionInfos,
+                                                      producerStats = new ProducerStats(""),
+                                                      producerTopicStats = new ProducerTopicStats(""))
 
     val topic1Broker1Data = 
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -228,8 +233,9 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos
-    )
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@@ -257,7 +263,9 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with UnknownTopicOrPartitionException")
@@ -288,7 +296,9 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -335,7 +345,9 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -373,14 +385,19 @@
 
     val msgs = TestUtils.getMsgStrings(10)
 
-    val handler = new DefaultEventHandler[String,String]( config,
-                                                          partitioner = null.asInstanceOf[Partitioner[String]],
-                                                          encoder = new StringEncoder,
-                                                          keyEncoder = new StringEncoder,
-                                                          producerPool = producerPool,
-                                                          topicPartitionInfos)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         encoder = new StringEncoder,
+                                                         keyEncoder = new StringEncoder,
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
 
-    val producer = new Producer[String, String](config, handler)
+    val producer = new Producer[String, String](config,
+                                                handler,
+                                                new ProducerStats(""),
+                                                new ProducerTopicStats(""))
     try {
       // send all 10 messages, should create 2 batches and 2 syncproducer calls
       producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@@ -435,7 +452,9 @@
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos)
+                                                      topicPartitionInfos = topicPartitionInfos,
+                                                      producerStats = new ProducerStats(""),
+                                                      producerTopicStats = new ProducerTopicStats(""))
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -65,9 +65,10 @@
     props.put("host", "localhost")
     props.put("port", port1.toString)
 
-    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
-    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
+    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
+    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
 
+
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -23,7 +23,6 @@
 import junit.framework.Assert._
 
 import kafka.cluster._
-import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
@@ -50,7 +49,8 @@
                                                            queue,
                                                            new AtomicLong(0),
                                                            new AtomicLong(0),
-                                                           new AtomicInteger(0)))
+                                                           new AtomicInteger(0),
+                                                           new ConsumerTopicStats("")))
 
   var fetcher: ConsumerFetcherManager = null
 
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -24,7 +24,6 @@
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
-import kafka.message.Message
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
 
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -25,7 +25,6 @@
 import kafka.utils.Utils
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer._
-import kafka.message.Message
 import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
Index: core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(working copy)
@@ -21,7 +21,6 @@
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import kafka.producer.{ProducerConfig, Producer}
-import kafka.message.Message
 import kafka.utils.TestUtils
 import kafka.serializer._
 
@@ -44,10 +43,7 @@
       props.put("producer.request.required.acks", "-1")
       props.put("serializer.class", classOf[StringEncoder].getName.toString)
       producer = new Producer(new ProducerConfig(props))
-      consumer = new SimpleConsumer(host,
-                                   port,
-                                   1000000,
-                                   64*1024)
+      consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
     }
 
    override def tearDown() {
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import kafka.api.FetchRequestBuilder
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
Index: core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(working copy)
@@ -55,7 +55,8 @@
                                                            queue,
                                                            new AtomicLong(consumedOffset),
                                                            new AtomicLong(0),
-                                                           new AtomicInteger(0)))
+                                                           new AtomicInteger(0),
+                                                           new ConsumerTopicStats("")))
   val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
 
   override def setUp() {
@@ -78,7 +79,8 @@
                                                     consumerConfig.consumerTimeoutMs,
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
-                                                    enableShallowIterator = false)
+                                                    enableShallowIterator = false,
+                                                    consumerTopicStats = new ConsumerTopicStats(""))
     var receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
     assertFalse(iter.hasNext)
Index: core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(working copy)
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.InvalidClientIdException
+import org.junit.Test
+
+class ClientIdTest {
+
+  @Test
+  def testInvalidClientIds() {
+    val invalidclientIds = new ArrayBuffer[String]()
+    invalidclientIds += (".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidclientIds += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidclientIds += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidclientIds.size) {
+      try {
+        ClientId.validate(invalidclientIds(i))
+        fail("Should throw InvalidClientIdException.")
+      }
+      catch {
+        case e: InvalidClientIdException => "This is good."
+      }
+    }
+
+    val validClientIds = new ArrayBuffer[String]()
+    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validClientIds.size) {
+      try {
+        ClientId.validate(validClientIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(working copy)
@@ -23,8 +23,6 @@
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.serializer._
-import kafka.message.Message
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -20,7 +20,7 @@
 import kafka.consumer.SimpleConsumer
 import org.junit.Test
 import junit.framework.Assert._
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
@@ -66,10 +66,7 @@
     server.startup()
 
     producer = new Producer[Int, String](new ProducerConfig(producerConfig))
-    val consumer = new SimpleConsumer(host,
-                                      port,
-                                      1000000,
-                                      64*1024)
+    val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
 
Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1414954)
+++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -29,7 +29,7 @@
 import kafka.javaapi.producer.Producer
 import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
-import kafka.utils.{Utils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
 
Index: core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
===================================================================
--- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala	(revision 1414954)
+++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka
 
 import consumer._
-import message.Message
 import utils.Utils
 import java.util.concurrent.CountDownLatch
 
Index: core/src/test/scala/other/kafka/TestKafkaAppender.scala
===================================================================
--- core/src/test/scala/other/kafka/TestKafkaAppender.scala	(revision 1414954)
+++ core/src/test/scala/other/kafka/TestKafkaAppender.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka
 
-import message.Message
 import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder
Index: core/src/main/scala/kafka/producer/DefaultPartitioner.scala
===================================================================
--- core/src/main/scala/kafka/producer/DefaultPartitioner.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/DefaultPartitioner.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.producer
 
-import kafka.utils.Utils
 
 import kafka.utils._
 
Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala	(working copy)
@@ -20,7 +20,6 @@
 import scala.collection.JavaConversions._
 import joptsimple._
 import java.util.Properties
-import java.util.regex._
 import java.io._
 import kafka.common._
 import kafka.message._
Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -44,7 +44,7 @@
   val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
 
   /* the client application sending the producer requests */
-  val clientId = props.getString("producer.request.client_id",SyncProducerConfig.DefaultClientId)
+  val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
 
   /*
    * The required acks of the producer requests - negative value means ack
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -26,24 +26,37 @@
 import kafka.common.UnavailableProducerException
 
 
-object ProducerPool{
-  def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = {
+object ProducerPool {
+  /**
+   * Used in ProducerPool to initiate a SyncProducer connection with a broker.
+   */
+  def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
-    if(configOpt.isDefined)
-      props.putAll(configOpt.get.props.props)
+    props.putAll(config.props.props)
     new SyncProducer(new SyncProducerConfig(props))
   }
+
+  /**
+   * Used in ClientUtils to send TopicMetadataRequest to a broker.
+   */
+  def createSyncProducer(clientId: String, broker: Broker): SyncProducer = {
+    val props = new Properties()
+    props.put("host", broker.host)
+    props.put("port", broker.port.toString)
+    props.put("producer.request.client_id", clientId)
+    new SyncProducer(new SyncProducerConfig(props))
+  }
 }
 
 class ProducerPool(val config: ProducerConfig) extends Logging {
   private val syncProducers = new HashMap[Int, SyncProducer]
   private val lock = new Object()
 
-  def updateProducer(topicMetaDatas: Seq[TopicMetadata]) {
+  def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
     val newBrokers = new collection.mutable.HashSet[Broker]
-    topicMetaDatas.foreach(tmd => {
+    topicMetadatas.foreach(tmd => {
       tmd.partitionsMetadata.foreach(pmd => {
         if(pmd.leader.isDefined)
           newBrokers+=(pmd.leader.get)
@@ -53,9 +66,9 @@
       newBrokers.foreach(b => {
         if(syncProducers.contains(b.id)){
           syncProducers(b.id).close()
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
         } else
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
       })
     }
   }
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -34,7 +34,7 @@
  */
 @threadsafe
 class SyncProducer(val config: SyncProducerConfig) extends Logging {
-  
+
   private val MaxConnectBackoffMs = 60000
   private var sentOnConnection = 0
 
@@ -42,6 +42,7 @@
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
+  val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
 
   trace("Instantiating Scala Sync Producer")
 
@@ -89,9 +90,9 @@
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+    producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
     var response: Receive = null
-    ProducerRequestStat.requestTimer.time {
+    producerRequestStats.requestTimer.time {
       response = doSend(producerRequest)
     }
     ProducerResponse.readFrom(response.buffer)
@@ -152,7 +153,7 @@
   }
 }
 
-object ProducerRequestStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram("ProducerRequestSize")
+class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
 }
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -16,7 +16,7 @@
  */
 package kafka.producer
 
-import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
+import async.{DefaultEventHandler, ProducerSendThread, EventHandler}
 import kafka.utils._
 import java.util.Random
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
@@ -27,8 +27,11 @@
 
 
 class Producer[K,V](config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V]) // for testing only
-extends Logging {
+                    private val eventHandler: EventHandler[K,V],
+                    private val producerStats: ProducerStats,
+                    private val producerTopicStats: ProducerTopicStats)  // only for unit testing
+  extends Logging {
+
   private val hasShutdown = new AtomicBoolean(false)
   if (config.batchSize > config.queueSize)
     throw new InvalidConfigException("Batch size can't be larger than queue size.")
@@ -47,25 +50,38 @@
                                                        queue,
                                                        eventHandler, 
                                                        config.queueTime, 
-                                                       config.batchSize)
+                                                       config.batchSize,
+                                                       config.clientId)
       producerSendThread.start()
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
   KafkaMetricsReporter.startReporters(config.props)
 
+  def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
+    this(t._1, t._2, t._3, t._4)
+
   def this(config: ProducerConfig) =
-    this(config,
-         new DefaultEventHandler[K,V](config,
-                                      Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
-                                      Utils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                      Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                      new ProducerPool(config)))
+    this {
+      ClientId.validate(config.clientId)
+      val producerStats = new ProducerStats(config.clientId)
+      val producerTopicStats = new ProducerTopicStats(config.clientId)
+      (config,
+       new DefaultEventHandler[K,V](config,
+                                    Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+                                    Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                    Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+                                    new ProducerPool(config),
+                                    producerStats = producerStats,
+                                    producerTopicStats = producerTopicStats),
+       producerStats,
+       producerTopicStats)
+    }
 
   /**
    * Sends the data, partitioned by key to the topic using either the
    * synchronous or the asynchronous producer
-   * @param producerData the producer data object that encapsulates the topic, key and message data
+   * @param messages the producer data object that encapsulates the topic, key and message data
    */
   def send(messages: KeyedMessage[K,V]*) {
     if (hasShutdown.get)
@@ -79,8 +95,8 @@
 
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
-      ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
-      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
+      producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
+      producerTopicStats.getProducerAllTopicStats.messageRate.mark()
     }
   }
 
@@ -105,7 +121,7 @@
           }
       }
       if(!added) {
-        AsyncProducerStats.droppedMessageRate.mark()
+        producerStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + message.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
@@ -131,26 +147,27 @@
 }
 
 @threadsafe
-class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
-  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object ProducerTopicStat {
-  private val valueFactory = (k: String) => new ProducerTopicStat(k)
-  private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ProducerTopicStat("AllTopics")
+class ProducerTopicStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
 
-  def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
+  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
 
-  def getProducerTopicStat(topic: String): ProducerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
+  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
   }
 }
 
-object ProducerStats extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter("SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter( "ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter("FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
 
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -68,11 +68,11 @@
 
   /**
    * It updates the cache by issuing a get topic metadata request to a random broker.
-   * @param topic the topic for which the metadata is to be fetched
+   * @param topics the topics for which the metadata is to be fetched
    */
-  def updateInfo(topics: Set[String]) = {
+  def updateInfo(topics: Set[String]) {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, producerConfig.clientId, brokers)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(working copy)
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer.async
-
-import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
-
-object AsyncProducerStats extends KafkaMetricsGroup {
-  val droppedMessageRate = newMeter("DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
-}
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -33,7 +33,9 @@
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
+                               private val producerStats: ProducerStats,
+                               private val producerTopicStats: ProducerTopicStats)
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
@@ -48,8 +50,8 @@
       serializedData.foreach{
         keyed => 
           val dataSize = keyed.message.payloadSize
-          ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
-          ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+          producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
+          producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
@@ -61,11 +63,11 @@
           // get topics of the outstanding produce requests and refresh metadata for those
           Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
           remainingRetries -= 1
-          ProducerStats.resendRate.mark()
+          producerStats.resendRate.mark()
         }
       }
       if(outstandingProduceRequests.size > 0) {
-        ProducerStats.failedSendRate.mark()
+        producerStats.failedSendRate.mark()
         error("Failed to send the following requests: " + outstandingProduceRequests)
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
@@ -111,7 +113,7 @@
           serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
         case t =>
-          ProducerStats.serializationErrorRate.mark()
+          producerStats.serializationErrorRate.mark()
           if (isSync) {
             throw t
           } else {
Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(revision 1414954)
+++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala	(working copy)
@@ -28,12 +28,13 @@
                               val queue: BlockingQueue[KeyedMessage[K,V]],
                               val handler: EventHandler[K,V],
                               val queueTime: Long,
-                              val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
+                              val batchSize: Int,
+                              val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
 
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge("ProducerQueueSize-" + getId,
+  newGauge(clientId + "-ProducerQueueSize-" + getId,
           new Gauge[Int] {
             def getValue = queue.size
           })
Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 1414954)
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(working copy)
@@ -223,7 +223,7 @@
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
       case 0 =>
-        ControllerStat.offlinePartitionRate.mark()
+        ControllerStats.offlinePartitionRate.mark()
         throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
           "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
           replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
@@ -249,7 +249,7 @@
             // read the controller epoch
             val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
               topicAndPartition.partition).get
-            ControllerStat.offlinePartitionRate.mark()
+            ControllerStats.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
               .format(topicAndPartition) + " since Leader and isr path already exists with value " +
               "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 1414954)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(working copy)
@@ -961,7 +961,7 @@
 
 case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
 
-object ControllerStat extends KafkaMetricsGroup {
+object ControllerStats extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
   val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1414954)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -227,7 +227,7 @@
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      ControllerStat.leaderElectionTimer.time {
+      ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
         if(!isShuttingDown.get()) {
           controllerContext.controllerLock synchronized {
Index: core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala	(revision 1414954)
+++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala	(working copy)
@@ -58,12 +58,12 @@
               .format(liveAssignedReplicasToThisPartition.mkString(",")))
             liveAssignedReplicasToThisPartition.isEmpty match {
               case true =>
-                ControllerStat.offlinePartitionRate.mark()
+                ControllerStats.offlinePartitionRate.mark()
                 throw new PartitionOfflineException(("No replica for partition " +
                   "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
                   " Assigned replicas are: [%s]".format(assignedReplicas))
               case false =>
-                ControllerStat.uncleanLeaderElectionRate.mark()
+                ControllerStats.uncleanLeaderElectionRate.mark()
                 val newLeader = liveAssignedReplicasToThisPartition.head
                 warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
                   "There's potential data loss")
@@ -78,7 +78,7 @@
           partition))
         (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
       case None =>
-        ControllerStat.offlinePartitionRate.mark()
+        ControllerStats.offlinePartitionRate.mark()
         throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
           "replicas assigned to it")
     }
Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(working copy)
@@ -34,7 +34,8 @@
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val enableShallowIterator: Boolean)
+                             val enableShallowIterator: Boolean,
+                             val consumerTopicStats: ConsumerTopicStats)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -48,8 +49,8 @@
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
-    ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
-    ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
+    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
+    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
     item
   }
 
Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(working copy)
@@ -111,6 +111,6 @@
   /**
    * Cliient id is specified by the kafka consumer client, used to distinguish different clients
    */
-  val clientId = props.getString("clientid", groupId)
+  val clientId = props.getString("client.id", groupId)
 }
 
Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala	(working copy)
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-import kafka.utils.{Pool, threadsafe, Logging}
-import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
-
-@threadsafe
-class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
-  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
-}
-
-object ConsumerTopicStat extends Logging {
-  private val valueFactory = (k: String) => new ConsumerTopicStat(k)
-  private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
-  private val allTopicStat = new ConsumerTopicStat("AllTopics")
-
-  def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
-
-  def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
-    stats.getAndMaybePut(topic + "-")
-  }
-}
Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala	(working copy)
@@ -54,7 +54,7 @@
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, config.clientId, brokers).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {
Index: core/src/main/scala/kafka/consumer/KafkaStream.scala
===================================================================
--- core/src/main/scala/kafka/consumer/KafkaStream.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/KafkaStream.scala	(working copy)
@@ -26,11 +26,12 @@
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val enableShallowIterator: Boolean)
+                        val enableShallowIterator: Boolean,
+                        val consumerTopicStats: ConsumerTopicStats)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
 
   /**
    *  Create an iterator over messages in the stream.
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -31,12 +31,12 @@
 
 object SimpleConsumer extends Logging {
   def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
-                             isFromOrdinaryConsumer: Boolean): Long = {
+                             clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
-                                          ConsumerConfig.SocketBufferSize)
+                                          ConsumerConfig.SocketBufferSize, clientId)
       val topicAndPartition = TopicAndPartition(topic, partitionId)
       val request = if(isFromOrdinaryConsumer)
         new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
@@ -56,14 +56,14 @@
   }
 
   def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
-                             earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+                             earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
     val cluster = getCluster(zkClient)
     val broker = cluster.getBroker(brokerId) match {
       case Some(b) => b
       case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
                                                     "getOffsetsBefore request")
     }
-    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer)
+    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
   }
 }
 
@@ -75,10 +75,13 @@
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) extends Logging {
+                     val bufferSize: Int,
+                     val clientId: String) extends Logging {
 
+  ClientId.validate(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
+  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
 
   private def connect(): BlockingChannel = {
     close
@@ -143,12 +146,12 @@
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    FetchRequestAndResponseStat.requestTimer.time {
+    fetchRequestAndResponseStats.requestTimer.time {
       response = sendRequest(request)
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -166,7 +169,7 @@
   }
 }
 
-object FetchRequestAndResponseStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram("FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
 }
Index: core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(revision 0)
+++ core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(working copy)
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
+
+@threadsafe
+class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+}
+
+class ConsumerTopicStats(clientId: String) extends Logging {
+  private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+
+  def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
+
+  def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+  }
+}
+
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -28,7 +28,8 @@
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
-                         private val fetchSize: AtomicInteger) extends Logging {
+                         private val fetchSize: AtomicInteger,
+                         private val consumerTopicStats: ConsumerTopicStats) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@@ -58,8 +59,8 @@
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       fetchedOffset.set(next)
       debug("updated fetch offset of (%s) to %d".format(this, next))
-      ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
-      ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
+      consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
+      consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
     }
   }
   
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1414954)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -35,7 +35,6 @@
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
-import kafka.producer.ProducerConfig
 
 
 /**
@@ -80,6 +79,8 @@
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
         extends ConsumerConnector with Logging with KafkaMetricsGroup {
+
+  ClientId.validate(config.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
@@ -94,6 +95,8 @@
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
+  private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -195,7 +198,7 @@
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
         (queue, stream)
       })
     ).flatten.toList
@@ -399,7 +402,7 @@
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, config.clientId, brokers).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{
@@ -595,9 +598,9 @@
           case None =>
             config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
               case OffsetRequest.LargestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
               case _ =>
                 throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }
@@ -611,7 +614,8 @@
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
-                                                 new AtomicInteger(config.fetchSize))
+                                                 new AtomicInteger(config.fetchSize),
+                                                 consumerTopicStats)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
@@ -667,7 +671,7 @@
       val q = e._2._1
       topicThreadIdAndQueues.put(topicThreadId, q)
       newGauge(
-        config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
           def getValue = q.size
         }
@@ -714,7 +718,8 @@
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.enableShallowIterator)
+                                          config.enableShallowIterator,
+                                          consumerTopicStats)
         (queue, stream)
     }).toList
 
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(working copy)
@@ -19,12 +19,10 @@
 
 import joptsimple._
 import kafka.utils._
-import kafka.producer.ProducerConfig
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
-import java.util.Properties
 import scala.collection.JavaConversions._
 
 /**
@@ -127,7 +125,7 @@
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), clientId, metadataTargetBrokers).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)
@@ -167,7 +165,7 @@
       System.exit(1)
     }
     if(startingOffset < 0)
-      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false)
+      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
 
     // initializing formatter
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@@ -175,7 +173,7 @@
 
     info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
                  .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
-    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
+    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset
Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala
===================================================================
--- core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(working copy)
@@ -24,7 +24,7 @@
 import kafka.consumer._
 import kafka.utils.{Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import kafka.message.{CompressionCodec, Message}
+import kafka.message.CompressionCodec
 
 object ReplayLogProducer extends Logging {
 
Index: core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
===================================================================
--- core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala	(working copy)
@@ -65,7 +65,7 @@
 
       ZkUtils.getBrokerInfo(zkClient, broker) match {
         case Some(brokerInfo) =>
-          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
           val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
Index: core/src/main/scala/kafka/tools/GetOffsetShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/GetOffsetShell.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/GetOffsetShell.scala	(working copy)
@@ -67,7 +67,7 @@
     val partition = options.valueOf(partitionOpt).intValue
     var time = options.valueOf(timeOpt).longValue
     val nOffsets = options.valueOf(nOffsetsOpt).intValue
-    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
+    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
     val topicAndPartition = TopicAndPartition(topic, partition)
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
     val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
Index: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
===================================================================
--- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala	(working copy)
@@ -41,7 +41,7 @@
     val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
     val consumer = brokerInfo match {
       case BrokerIpPattern(ip, port) =>
-        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
+        Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker"))
       case _ =>
         error("Could not parse broker info %s".format(brokerInfo))
         None
Index: core/src/main/scala/kafka/tools/MirrorMaker.scala
===================================================================
--- core/src/main/scala/kafka/tools/MirrorMaker.scala	(revision 1414954)
+++ core/src/main/scala/kafka/tools/MirrorMaker.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.tools
 
-import kafka.message.Message
 import joptsimple.OptionParser
 import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1414954)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -79,14 +79,14 @@
   val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
 }
 
-object BrokerTopicStat extends Logging {
+object BrokerTopicStats extends Logging {
   private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicStat = new BrokerTopicMetrics("AllTopics")
+  private val allTopicStats = new BrokerTopicMetrics("AllTopics")
 
-  def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
+  def getBrokerAllTopicStats(): BrokerTopicMetrics = allTopicStats
 
-  def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
+  def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
     stats.getAndMaybePut(topic + "-")
   }
 }
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -23,7 +23,7 @@
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import org.I0Itec.zkclient.ZkClient
-import kafka.controller.{ControllerStat, KafkaController}
+import kafka.controller.{ControllerStats, KafkaController}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -96,9 +96,9 @@
    *  Forces some dynamic jmx beans to be registered on server startup.
    */
   private def registerStats() {
-    BrokerTopicStat.getBrokerAllTopicStat()
-    ControllerStat.offlinePartitionRate
-    ControllerStat.uncleanLeaderElectionRate
+    BrokerTopicStats.getBrokerAllTopicStats()
+    ControllerStats.offlinePartitionRate
+    ControllerStats.uncleanLeaderElectionRate
   }
 
   /**
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 1414954)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(working copy)
@@ -28,7 +28,7 @@
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) ,
+                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1414954)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -237,8 +237,8 @@
   private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult] = {
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
-      BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
-      BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStats.getBrokerAllTopicStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
         val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -255,8 +255,8 @@
           Runtime.getRuntime.halt(1)
           null
         case e =>
-          BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark()
-          BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
+          BrokerTopicStats.getBrokerAllTopicStats.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic, topicAndPartition.partition), e)
           new ProduceResult(topicAndPartition, e)
        }
@@ -323,8 +323,8 @@
         val partitionData =
           try {
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
-            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
               new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
             } else {
@@ -334,8 +334,8 @@
             }
           } catch {
             case t: Throwable =>
-              BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
-              BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize), t)
               new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
                                              offset, -1L, MessageSet.Empty)
Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision 1414954)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(working copy)
@@ -27,7 +27,7 @@
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, ShutdownableThread}
+import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
@@ -38,12 +38,13 @@
 abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
-
   private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
-  val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
+  val fetcherStats = new FetcherStats(clientId)
+  val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
+  val fetcherLagStats = new FetcherLagStats(clientId)
 
   /* callbacks to be defined in subclass */
 
@@ -117,7 +118,7 @@
                     case None => currentOffset.get
                   }
                   partitionMap.put(topicAndPartition, newOffset)
-                  FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
@@ -182,10 +183,10 @@
   }
 }
 
-class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
+class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
   private[this] var lagVal = new AtomicLong(-1L)
   newGauge(
-    name._1 + "-" + name._2 + "-ConsumerLag",
+    clientIdTopicPartition + "-ConsumerLag",
     new Gauge[Long] {
       def getValue = lagVal.get
     }
@@ -198,25 +199,29 @@
   def lag = lagVal.get
 }
 
-object FetcherLagMetrics {
-  private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
-  private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(clientId: String) {
+  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+  private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
-  def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut( (topic, partitionId) )
+  def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
+    stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
   }
 }
 
-class FetcherStat(name: String) extends KafkaMetricsGroup {
-  val requestRate = newMeter(name + "RequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val requestRate = newMeter(clientIdTopic + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object FetcherStat {
-  private val valueFactory = (k: String) => new FetcherStat(k)
-  private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+class FetcherStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
 
-  def getFetcherStat(name: String): FetcherStat = {
-    stats.getAndMaybePut(name)
+  def getFetcherStats(name: String): FetcherMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
   }
 }
+
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+}
Index: core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala	(revision 1414954)
+++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala	(working copy)
@@ -29,9 +29,11 @@
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) {
-  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+                     val bufferSize: Int,
+                     val clientId: String) {
 
+  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize, clientId)
+
   /**
    *  Fetch a set of messages from a topic. This version of the fetch method
    *  takes the Scala version of a fetch request (i.e.,
Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(revision 1414954)
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -16,7 +16,6 @@
  */
 package kafka.javaapi.consumer
 
-import kafka.message.Message
 import kafka.serializer._
 import kafka.consumer._
 import scala.collection.JavaConversions.asList
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1414954)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -24,7 +24,7 @@
 import kafka.utils._
 import scala.math._
 import java.text.NumberFormat
-import kafka.server.BrokerTopicStat
+import kafka.server.BrokerTopicStats
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
@@ -244,8 +244,8 @@
     if(messageSetInfo.count == 0) {
       (-1L, -1L)
     } else {
-      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
+      BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
 
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)
Index: core/src/main/scala/kafka/common/InvalidClientIdException.scala
===================================================================
--- core/src/main/scala/kafka/common/InvalidClientIdException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/InvalidClientIdException.scala	(working copy)
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidClientIdException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
Index: core/src/main/scala/kafka/utils/Topic.scala
===================================================================
--- core/src/main/scala/kafka/utils/Topic.scala	(revision 1414954)
+++ core/src/main/scala/kafka/utils/Topic.scala	(working copy)
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import util.matching.Regex
-
-object Topic {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 255
-  private val rgx = new Regex(legalChars + "+")
-
-  def validate(topic: String) {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.length > maxNameLength)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(topic) match {
-      case Some(t) =>
-        if (!t.equals(topic))
-          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}
Index: core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
===================================================================
--- core/src/main/scala/kafka/utils/ClientIdAndTopic.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/ClientIdAndTopic.scala	(working copy)
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import kafka.common.InvalidClientIdException
+import util.matching.Regex
+
+object ClientId {
+  val legalChars = "[a-zA-Z0-9_-]"
+  val maxNameLength = 200 // to prevent hitting filename max length limit
+  private val rgx = new Regex(legalChars + "*")
+
+  def validate(clientId: String) {
+    if (clientId.length > maxNameLength)
+      throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(clientId) match {
+      case Some(t) =>
+        if (!t.equals(clientId))
+          throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+object Topic {
+  val legalChars = "[a-zA-Z0-9_-]"
+  val maxNameLength = 255
+  private val rgx = new Regex(legalChars + "+")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) =>
+        if (!t.equals(topic))
+          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+case class ClientIdAndTopic(topic: String, clientId: String) {
+  override def toString = "%s-%s".format(clientId, topic)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(revision 1414954)
+++ core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala	(working copy)
@@ -24,7 +24,6 @@
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 
Index: core/src/main/scala/kafka/serializer/Encoder.scala
===================================================================
--- core/src/main/scala/kafka/serializer/Encoder.scala	(revision 1414954)
+++ core/src/main/scala/kafka/serializer/Encoder.scala	(working copy)
@@ -18,8 +18,6 @@
 package kafka.serializer
 
 import kafka.utils.VerifiableProperties
-import kafka.message._
-import kafka.utils.Utils
 
 /**
  * An encoder is a method of turning objects into byte arrays.
Index: core/src/main/scala/kafka/serializer/Decoder.scala
===================================================================
--- core/src/main/scala/kafka/serializer/Decoder.scala	(revision 1414954)
+++ core/src/main/scala/kafka/serializer/Decoder.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.serializer
 
-import kafka.message._
 import kafka.utils.VerifiableProperties
 
 /**
Index: core/src/main/scala/kafka/api/FetchRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchRequest.scala	(revision 1414954)
+++ core/src/main/scala/kafka/api/FetchRequest.scala	(working copy)
@@ -33,7 +33,7 @@
   val CurrentVersion = 1.shortValue()
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
-  val ReplicaFetcherClientId = "replica fetcher"
+  val ReplicaFetcherClientId = "replica-fetcher"
   val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
Index: core/src/main/scala/kafka/client/ClientUtils.scala
===================================================================
--- core/src/main/scala/kafka/client/ClientUtils.scala	(revision 1414954)
+++ core/src/main/scala/kafka/client/ClientUtils.scala	(working copy)
@@ -12,14 +12,14 @@
  */
 object ClientUtils extends Logging{
 
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], clientId: String, brokers: Seq[Broker]): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
+      val producer: SyncProducer = ProducerPool.createSyncProducer(clientId + "-FetchTopicMetadata", brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
Index: perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(revision 1414954)
+++ perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala	(working copy)
@@ -42,7 +42,7 @@
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
-    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
+    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
 
     // reset to latest or smallest offset
     val topicAndPartition = TopicAndPartition(config.topic, config.partition)
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1414954)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -109,7 +109,7 @@
         
         // read data from queue
         URI uri = _request.getURI();
-        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);
+        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize, "KafkaETLContext");
         
         // get available offset range
         _offsetRange = getOffsetRange();
Index: examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
===================================================================
--- examples/src/main/java/kafka/examples/SimpleConsumerDemo.java	(revision 1414954)
+++ examples/src/main/java/kafka/examples/SimpleConsumerDemo.java	(working copy)
@@ -59,7 +59,8 @@
     SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
                                                        KafkaProperties.kafkaServerPort,
                                                        KafkaProperties.connectionTimeOut,
-                                                       KafkaProperties.kafkaProducerBufferSize);
+                                                       KafkaProperties.kafkaProducerBufferSize,
+                                                       KafkaProperties.clientId);
 
     System.out.println("Testing single fetch");
     FetchRequest req = new FetchRequestBuilder()
