Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -68,10 +68,7 @@
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(12)
-    val producer = new Producer[String, String](config,
-                                                mockEventHandler,
-                                                new ProducerStats(""),
-                                                new ProducerTopicStats(""))
+    val producer = new Producer[String, String](config, mockEventHandler)
     try {
       // send all 10 messages, should hit the batch size and then reach broker
       producer.send(produceData: _*)
@@ -192,9 +189,7 @@
                                                       encoder = null.asInstanceOf[Encoder[String]],
                                                       keyEncoder = new IntEncoder(),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos,
-                                                      producerStats = new ProducerStats(""),
-                                                      producerTopicStats = new ProducerTopicStats(""))
+                                                      topicPartitionInfos = topicPartitionInfos)
 
     val topic1Broker1Data = 
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -233,9 +228,7 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
@@ -263,9 +256,7 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with UnknownTopicOrPartitionException")
@@ -296,9 +287,7 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -345,9 +334,7 @@
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -390,14 +377,9 @@
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos = topicPartitionInfos,
-                                                         producerStats = new ProducerStats(""),
-                                                         producerTopicStats = new ProducerTopicStats(""))
+                                                         topicPartitionInfos = topicPartitionInfos)
 
-    val producer = new Producer[String, String](config,
-                                                handler,
-                                                new ProducerStats(""),
-                                                new ProducerTopicStats(""))
+    val producer = new Producer[String, String](config, handler)
     try {
       // send all 10 messages, should create 2 batches and 2 syncproducer calls
       producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@@ -453,9 +435,7 @@
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos,
-                                                      producerStats = new ProducerStats(""),
-                                                      producerTopicStats = new ProducerTopicStats(""))
+                                                      topicPartitionInfos = topicPartitionInfos)
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -50,7 +50,7 @@
                                                            new AtomicLong(0),
                                                            new AtomicLong(0),
                                                            new AtomicInteger(0),
-                                                           new ConsumerTopicStats("")))
+                                                           ""))
 
   var fetcher: ConsumerFetcherManager = null
 
@@ -84,7 +84,9 @@
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder())
+      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+                                                                             new DefaultEncoder(),
+                                                                             new StringEncoder())
       val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms
       producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
Index: core/src/test/scala/unit/kafka/common/TopicTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/common/TopicTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/common/TopicTest.scala	(working copy)
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.common
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.{Topic, InvalidTopicException}
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        Topic.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+
+    val validTopicNames = new ArrayBuffer[String]()
+    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
+    for (i <- 0 until validTopicNames.size) {
+      try {
+        Topic.validate(validTopicNames(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
Index: core/src/test/scala/unit/kafka/common/ConfigTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/common/ConfigTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/common/ConfigTest.scala	(working copy)
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.common
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import org.junit.Test
+import kafka.common.InvalidConfigException
+import kafka.producer.ProducerConfig
+import kafka.consumer.ConsumerConfig
+
+class ConfigTest {
+
+  @Test
+  def testInvalidClientIds() {
+    val invalidClientIds = new ArrayBuffer[String]()
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidClientIds += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidClientIds.size) {
+      try {
+        ProducerConfig.validateClientId(invalidClientIds(i))
+        fail("Should throw InvalidClientIdException.")
+      }
+      catch {
+        case e: InvalidConfigException => "This is good."
+      }
+    }
+
+    val validClientIds = new ArrayBuffer[String]()
+    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validClientIds.size) {
+      try {
+        ProducerConfig.validateClientId(validClientIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+
+  @Test
+  def testInvalidGroupIds() {
+    val invalidGroupIds = new ArrayBuffer[String]()
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidGroupIds += "Is" + weirdChar + "illegal"
+    }
+
+    for (i <- 0 until invalidGroupIds.size) {
+      try {
+        ConsumerConfig.validateGroupId(invalidGroupIds(i))
+        fail("Should throw InvalidGroupIdException.")
+      }
+      catch {
+        case e: InvalidConfigException => "This is good."
+      }
+    }
+
+    val validGroupIds = new ArrayBuffer[String]()
+    validGroupIds += ("valid", "GROUP", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validGroupIds.size) {
+      try {
+        ConsumerConfig.validateGroupId(validGroupIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
+
Index: core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(working copy)
@@ -56,7 +56,7 @@
                                                            new AtomicLong(consumedOffset),
                                                            new AtomicLong(0),
                                                            new AtomicInteger(0),
-                                                           new ConsumerTopicStats("")))
+                                                           ""))
   val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
 
   override def setUp() {
@@ -80,8 +80,8 @@
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
                                                     enableShallowIterator = false,
-                                                    consumerTopicStats = new ConsumerTopicStats(""))
-    var receivedMessages = (0 until 5).map(i => iter.next.message).toList
+                                                    clientId = "")
+    val receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
     assertFalse(iter.hasNext)
     assertEquals(1, queue.size) // This is only the shutdown command.
Index: core/src/test/scala/unit/kafka/utils/TopicTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TopicTest.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/utils/TopicTest.scala	(working copy)
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidTopicException
-import org.junit.Test
-
-class TopicTest {
-
-  @Test
-  def testInvalidTopicNames() {
-    val invalidTopicNames = new ArrayBuffer[String]()
-    invalidTopicNames += ("", ".", "..")
-    var longName = "ATCG"
-    for (i <- 1 to 6)
-      longName += longName
-    invalidTopicNames += longName
-    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.')
-    for (weirdChar <- badChars) {
-      invalidTopicNames += "Is" + weirdChar + "funny"
-    }
-
-    for (i <- 0 until invalidTopicNames.size) {
-      try {
-        Topic.validate(invalidTopicNames(i))
-        fail("Should throw InvalidTopicException.")
-      }
-      catch {
-        case e: InvalidTopicException => "This is good."
-      }
-    }
-
-    val validTopicNames = new ArrayBuffer[String]()
-    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_")
-    for (i <- 0 until validTopicNames.size) {
-      try {
-        Topic.validate(validTopicNames(i))
-      }
-      catch {
-        case e: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-}
Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -24,7 +24,6 @@
 import java.util.Random
 import java.util.Properties
 import junit.framework.Assert._
-import kafka.api._
 import kafka.server._
 import kafka.producer._
 import kafka.message._
Index: core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(revision 1420334)
+++ core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(working copy)
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import junit.framework.Assert._
-import collection.mutable.ArrayBuffer
-import kafka.common.InvalidClientIdException
-import org.junit.Test
-
-class ClientIdTest {
-
-  @Test
-  def testInvalidClientIds() {
-    val invalidclientIds = new ArrayBuffer[String]()
-    var longName = "ATCG"
-    for (i <- 1 to 6)
-      longName += longName
-    invalidclientIds += longName
-    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=')
-    for (weirdChar <- badChars) {
-      invalidclientIds += "Is" + weirdChar + "funny"
-    }
-
-    for (i <- 0 until invalidclientIds.size) {
-      try {
-        ClientId.validate(invalidclientIds(i))
-        fail("Should throw InvalidClientIdException.")
-      }
-      catch {
-        case e: InvalidClientIdException => "This is good."
-      }
-    }
-
-    val validClientIds = new ArrayBuffer[String]()
-    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
-    for (i <- 0 until validClientIds.size) {
-      try {
-        ClientId.validate(validClientIds(i))
-      }
-      catch {
-        case e: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/ProducerStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerStats.scala	(revision 0)
+++ core/src/main/scala/kafka/producer/ProducerStats.scala	(working copy)
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
+import kafka.utils.Pool
+
+class ProducerStats(clientId: String) extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
+}
+
+/**
+ * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map.
+ */
+object ProducerStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerStats(k)
+  private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory))
+
+  def getProducerStats(clientId: String) = {
+    statsRegistry.getAndMaybePut(clientId)
+  }
+}
Index: core/src/main/scala/kafka/producer/ProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerConfig.scala	(revision 1420334)
+++ core/src/main/scala/kafka/producer/ProducerConfig.scala	(working copy)
@@ -21,9 +21,36 @@
 import java.util.Properties
 import kafka.utils.{Utils, VerifiableProperties}
 import kafka.message.{CompressionCodec, NoCompressionCodec}
+import kafka.common.{InvalidConfigException, Config}
 
+object ProducerConfig extends Config {
+  def validate(config: ProducerConfig) {
+    validateClientId(config.clientId)
+    validateBatchSize(config.batchSize, config.queueSize)
+    validateProducerType(config.producerType)
+  }
+
+  def validateClientId(clientId: String) {
+    validateChars("clientid", clientId)
+  }
+
+  def validateBatchSize(batchSize: Int, queueSize: Int) {
+    if (batchSize > queueSize)
+      throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize)
+  }
+
+  def validateProducerType(producerType: String) {
+    producerType match {
+      case "sync" =>
+      case "async"=>
+      case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async")
+    }
+  }
+}
+
 class ProducerConfig private (val props: VerifiableProperties)
         extends AsyncProducerConfig with SyncProducerConfigShared {
+  import ProducerConfig._
 
   def this(originalProps: Properties) {
     this(new VerifiableProperties(originalProps))
@@ -85,4 +112,6 @@
   val producerRetries = props.getInt("producer.num.retries", 3)
 
   val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+
+  validate(this)
 }
Index: core/src/main/scala/kafka/producer/ProducerTopicStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerTopicStats.scala	(revision 0)
+++ core/src/main/scala/kafka/producer/ProducerTopicStats.scala	(working copy)
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ClientIdAndTopic
+import kafka.utils.{Pool, threadsafe}
+import java.util.concurrent.TimeUnit
+
+
+@threadsafe
+class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+}
+
+/**
+ * Tracks metrics for each topic the given producer client has produced data to.
+ * @param clientId The clientId of the given producer client.
+ */
+class ProducerTopicStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
+  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
+  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
+
+  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
+
+  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
+    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
+  }
+}
+
+/**
+ * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map.
+ */
+object ProducerTopicStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerTopicStats(k)
+  private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
+
+  def getProducerTopicStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/producer/ProducerRequestStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerRequestStats.scala	(revision 0)
+++ core/src/main/scala/kafka/producer/ProducerRequestStats.scala	(working copy)
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.producer
+
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import java.util.concurrent.TimeUnit
+import kafka.utils.Pool
+import kafka.common.ClientIdAndBroker
+
+class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+  val requestTimer = newTimer(metricId + "-ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+  val requestSizeHist = newHistogram(metricId + "-ProducerRequestSize")
+}
+
+/**
+ * Tracks metrics of requests made by a given producer client to all brokers.
+ * @param clientId ClientId of the given producer
+ */
+class ProducerRequestStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
+  private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
+  private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+
+  def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
+
+  def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+  }
+
+  def requestTimer(brokerInfo: String) = {
+    val specificTimer = getProducerRequestStats(brokerInfo).requestTimer
+    val allBrokersTimer = allBrokersStats.requestTimer
+    new KafkaTimer(List(specificTimer, allBrokersTimer):_*)
+  }
+}
+
+/**
+ * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map.
+ */
+object ProducerRequestStatsRegistry {
+  private val valueFactory = (k: String) => new ProducerRequestStats(k)
+  private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory))
+
+  def getProducerRequestStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
+
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1420334)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -21,8 +21,6 @@
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -39,7 +37,8 @@
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
-  val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
+  val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
+  val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
   trace("Instantiating Scala Sync Producer")
 
@@ -87,9 +86,11 @@
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    producerRequestStats.requestSizeHist.update(producerRequest.sizeInBytes)
+    val requestSize = producerRequest.sizeInBytes
+    producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+    producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
     var response: Receive = null
-    producerRequestStats.requestTimer.time {
+    producerRequestStats.requestTimer(brokerInfo).time {
       response = doSend(producerRequest)
     }
     ProducerResponse.readFrom(response.buffer)
@@ -150,7 +151,3 @@
   }
 }
 
-class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
-}
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1420334)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -22,20 +22,15 @@
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{QueueFullException, InvalidConfigException}
+import kafka.common.QueueFullException
 import kafka.metrics._
 
 
 class Producer[K,V](config: ProducerConfig,
-                    private val eventHandler: EventHandler[K,V],
-                    private val producerStats: ProducerStats,
-                    private val producerTopicStats: ProducerTopicStats)  // only for unit testing
+                    private val eventHandler: EventHandler[K,V])  // only for unit testing
   extends Logging {
 
   private val hasShutdown = new AtomicBoolean(false)
-  if (config.batchSize > config.queueSize)
-    throw new InvalidConfigException("Batch size can't be larger than queue size.")
-
   private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
 
   private val random = new Random
@@ -53,30 +48,20 @@
                                                        config.batchSize,
                                                        config.clientId)
       producerSendThread.start()
-    case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
+  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
+
   KafkaMetricsReporter.startReporters(config.props)
 
-  def this(t: (ProducerConfig, EventHandler[K,V], ProducerStats, ProducerTopicStats)) =
-    this(t._1, t._2, t._3, t._4)
-
   def this(config: ProducerConfig) =
-    this {
-      ClientId.validate(config.clientId)
-      val producerStats = new ProducerStats(config.clientId)
-      val producerTopicStats = new ProducerTopicStats(config.clientId)
-      (config,
-       new DefaultEventHandler[K,V](config,
-                                    Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
-                                    Utils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                    Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
-                                    new ProducerPool(config),
-                                    producerStats = producerStats,
-                                    producerTopicStats = producerTopicStats),
-       producerStats,
-       producerTopicStats)
-    }
+    this(config,
+         new DefaultEventHandler[K,V](config,
+                                      Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
+                                      Utils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                      Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+                                      new ProducerPool(config)))
 
   /**
    * Sends the data, partitioned by key to the topic using either the
@@ -146,28 +131,4 @@
   }
 }
 
-@threadsafe
-class ProducerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
-}
 
-class ProducerTopicStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
-
-  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
-
-  def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
-  }
-}
-
-class ProducerStats(clientId: String) extends KafkaMetricsGroup {
-  val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
-  val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
-  val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
-}
-
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1420334)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -33,9 +33,7 @@
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
-                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
-                               private val producerStats: ProducerStats,
-                               private val producerTopicStats: ProducerTopicStats)
+                               private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
@@ -45,6 +43,9 @@
 
   private val lock = new Object()
 
+  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
+  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
+
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(working copy)
@@ -35,12 +35,13 @@
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
                              val enableShallowIterator: Boolean,
-                             val consumerTopicStats: ConsumerTopicStats)
+                             val clientId: String)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
+  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
 
   override def next(): MessageAndMetadata[K, V] = {
     val item = super.next()
Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(working copy)
@@ -19,9 +19,10 @@
 
 import java.util.Properties
 import kafka.api.OffsetRequest
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils._
+import kafka.common.{InvalidConfigException, Config}
 
-object ConsumerConfig {
+object ConsumerConfig extends Config {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
   val FetchSize = 1024 * 1024
@@ -43,6 +44,28 @@
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
+
+  def validate(config: ConsumerConfig) {
+    validateClientId(config.clientId)
+    validateGroupId(config.groupId)
+    validateAutoOffsetReset(config.autoOffsetReset)
+  }
+
+  def validateClientId(clientId: String) {
+    validateChars("clientid", clientId)
+  }
+
+  def validateGroupId(groupId: String) {
+    validateChars("groupid", groupId)
+  }
+
+  def validateAutoOffsetReset(autoOffsetReset: String) {
+    autoOffsetReset match {
+      case OffsetRequest.SmallestTimeString =>
+      case OffsetRequest.LargestTimeString =>
+      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+    }
+  }
 }
 
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
@@ -109,8 +132,10 @@
   val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
 
   /**
-   * Cliient id is specified by the kafka consumer client, used to distinguish different clients
+   * Client id is specified by the kafka consumer client, used to distinguish different clients
    */
   val clientId = props.getString("clientid", groupId)
+
+  validate(this)
 }
 
Index: core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
===================================================================
--- core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala	(revision 0)
+++ core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala	(working copy)
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.utils.Pool
+import java.util.concurrent.TimeUnit
+import kafka.common.ClientIdAndBroker
+
+class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+  val requestTimer = newTimer(metricId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+  val requestSizeHist = newHistogram(metricId + "-FetchResponseSize")
+}
+
+/**
+ * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers.
+ * @param clientId ClientId of the given consumer
+ */
+class FetchRequestAndResponseStats(clientId: String) {
+  private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
+  private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
+  private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "All.Brokers"))
+
+  def getFetchRequestAndResponseAllBrokerStats(): FetchRequestAndResponseMetrics = allBrokersStats
+
+  def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
+    stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo))
+  }
+
+  def requestTimer(brokerInfo: String) = {
+    val specificTimer = getFetchRequestAndResponseStats(brokerInfo).requestTimer
+    val allBrokersTimer = allBrokersStats.requestTimer
+    new KafkaTimer(List(specificTimer, allBrokersTimer):_*)
+  }
+}
+
+/**
+ * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map.
+ */
+object FetchRequestAndResponseStatsRegistry {
+  private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k)
+  private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory))
+
+  def getFetchRequestAndResponseStats(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
+
+
Index: core/src/main/scala/kafka/consumer/KafkaStream.scala
===================================================================
--- core/src/main/scala/kafka/consumer/KafkaStream.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/KafkaStream.scala	(working copy)
@@ -27,11 +27,11 @@
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
                         val enableShallowIterator: Boolean,
-                        val consumerTopicStats: ConsumerTopicStats)
+                        val clientId: String)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, consumerTopicStats)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId)
 
   /**
    *  Create an iterator over messages in the stream.
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -20,8 +20,6 @@
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.utils.ZkUtils._
 import collection.immutable
 import kafka.common.{TopicAndPartition, KafkaException}
@@ -87,10 +85,11 @@
                      val bufferSize: Int,
                      val clientId: String) extends Logging {
 
-  ClientId.validate(clientId)
+  ConsumerConfig.validateClientId(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
+  val brokerInfo = "host_%s-port_%s".format(host, port)
+  private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
 
   private def connect(): BlockingChannel = {
     close
@@ -155,12 +154,13 @@
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    fetchRequestAndResponseStats.requestTimer.time {
+    fetchRequestAndResponseStats.requestTimer(brokerInfo).time {
       response = sendRequest(request)
     }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-    fetchRequestAndResponseStats.respondSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokerStats.requestSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -178,7 +178,3 @@
   }
 }
 
-class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
-}
Index: core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala	(working copy)
@@ -17,20 +17,25 @@
 
 package kafka.consumer
 
-import kafka.utils.{ClientIdAndTopic, Pool, threadsafe, Logging}
+import kafka.utils.{Pool, threadsafe, Logging}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ClientIdAndTopic
 
 @threadsafe
-class ConsumerTopicMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val messageRate = newMeter(clientIdTopic + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
+  val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
+/**
+ * Tracks metrics for each topic the given consumer client has consumed data from.
+ * @param clientId The clientId of the given consumer client.
+ */
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics"))
+  private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
 
   def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
 
@@ -39,3 +44,14 @@
   }
 }
 
+/**
+ * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map.
+ */
+object ConsumerTopicStatsRegistry {
+  private val valueFactory = (k: String) => new ConsumerTopicStats(k)
+  private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory))
+
+  def getConsumerTopicStat(clientId: String) = {
+    globalStats.getAndMaybePut(clientId)
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -29,11 +29,13 @@
                          private val consumedOffset: AtomicLong,
                          private val fetchedOffset: AtomicLong,
                          private val fetchSize: AtomicInteger,
-                         private val consumerTopicStats: ConsumerTopicStats) extends Logging {
+                         private val clientId: String) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
 
+  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
+
   def getConsumeOffset() = consumedOffset.get
 
   def getFetchOffset() = fetchedOffset.get
Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 1420334)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -20,7 +20,7 @@
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import locks.ReentrantLock
-import scala.collection._
+import collection._
 import kafka.cluster._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -35,6 +35,7 @@
 import com.yammer.metrics.core.Gauge
 import kafka.api.OffsetRequest
 import kafka.metrics._
+import scala.Some
 
 
 /**
@@ -80,7 +81,6 @@
                                                 val enableFetcher: Boolean) // for testing only
         extends ConsumerConnector with Logging with KafkaMetricsGroup {
 
-  ClientId.validate(config.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
@@ -95,8 +95,6 @@
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
-  private val consumerTopicStats = new ConsumerTopicStats(config.clientId)
-
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -198,7 +196,7 @@
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, consumerTopicStats)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
         (queue, stream)
       })
     ).flatten.toList
@@ -601,8 +599,6 @@
                 SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
               case OffsetRequest.LargestTimeString =>
                 SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
-              case _ =>
-                throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }
         }
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
@@ -615,7 +611,7 @@
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize),
-                                                 consumerTopicStats)
+                                                 config.clientId)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
@@ -719,7 +715,7 @@
                                           keyDecoder, 
                                           valueDecoder, 
                                           config.enableShallowIterator,
-                                          consumerTopicStats)
+                                          config.clientId)
         (queue, stream)
     }).toList
 
Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision 1420334)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(working copy)
@@ -19,7 +19,7 @@
 
 import kafka.cluster.Broker
 import kafka.consumer.SimpleConsumer
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
@@ -27,7 +27,7 @@
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{ClientIdAndTopic, Pool, ShutdownableThread}
+import kafka.utils.{Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
@@ -43,9 +43,10 @@
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
   private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
-  val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
+  private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+  val fetcherStats = new FetcherStats(metricId)
   val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
-  val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
+  val fetcherLagStats = new FetcherLagStats(metricId)
 
   /* callbacks to be defined in subclass */
 
@@ -66,7 +67,7 @@
 
   override def doWork() {
     val fetchRequestuilder = new FetchRequestBuilder().
-            clientId(clientId + "-" + brokerInfo).
+            clientId(clientId).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)
@@ -184,10 +185,10 @@
   }
 }
 
-class FetcherLagMetrics(clientIdTopicPartition: ClientIdTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
   private[this] var lagVal = new AtomicLong(-1L)
   newGauge(
-    clientIdTopicPartition + "-ConsumerLag",
+    metricId + "-ConsumerLag",
     new Gauge[Long] {
       def getValue = lagVal.get
     }
@@ -200,29 +201,34 @@
   def lag = lagVal.get
 }
 
-class FetcherLagStats(clientId: String) {
-  private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
+class FetcherLagStats(metricId: ClientIdAndBroker) {
+  private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
+  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
-    stats.getAndMaybePut(new ClientIdTopicPartition(clientId, topic, partitionId))
+    stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
   }
 }
 
-class FetcherMetrics(clientIdTopic: ClientIdAndTopic) extends KafkaMetricsGroup {
-  val requestRate = newMeter(clientIdTopic + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
-  val byteRate = newMeter(clientIdTopic + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+class FetcherMetrics(metricId: ClientIdBrokerTopic) extends KafkaMetricsGroup {
+  val requestRate = newMeter(metricId + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-class FetcherStats(clientId: String) {
-  private val valueFactory = (k: ClientIdAndTopic) => new FetcherMetrics(k)
-  private val stats = new Pool[ClientIdAndTopic, FetcherMetrics](Some(valueFactory))
+class FetcherStats(metricId: ClientIdAndBroker) {
+  private val valueFactory = (k: ClientIdBrokerTopic) => new FetcherMetrics(k)
+  private val stats = new Pool[ClientIdBrokerTopic, FetcherMetrics](Some(valueFactory))
 
   def getFetcherStats(name: String): FetcherMetrics = {
-    stats.getAndMaybePut(new ClientIdAndTopic(clientId, name))
+    stats.getAndMaybePut(new ClientIdBrokerTopic(metricId.clientId, metricId.brokerInfo, name))
   }
 }
 
-case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
-  override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
+case class ClientIdBrokerTopic(clientId: String, brokerInfo: String, topic: String) {
+  override def toString = "%s-%s-%s".format(clientId, brokerInfo, topic)
 }
+
+case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
+  override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+}
+
Index: core/src/main/scala/kafka/common/ClientIdAndTopic.scala
===================================================================
--- core/src/main/scala/kafka/common/ClientIdAndTopic.scala	(revision 0)
+++ core/src/main/scala/kafka/common/ClientIdAndTopic.scala	(working copy)
@@ -0,0 +1,27 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (clientId, topic) pairs are used in the creation
+ * of many Stats objects.
+ */
+case class ClientIdAndTopic(clientId: String, topic: String) {
+  override def toString = "%s-%s".format(clientId, topic)
+}
+
Index: core/src/main/scala/kafka/common/Topic.scala
===================================================================
--- core/src/main/scala/kafka/common/Topic.scala	(revision 0)
+++ core/src/main/scala/kafka/common/Topic.scala	(working copy)
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import util.matching.Regex
+
+object Topic {
+  private val legalChars = "[a-zA-Z0-9_-]"
+  private val maxNameLength = 255
+  private val rgx = new Regex(legalChars + "+")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) =>
+        if (!t.equals(topic))
+          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/common/InvalidClientIdException.scala
===================================================================
--- core/src/main/scala/kafka/common/InvalidClientIdException.scala	(revision 1420334)
+++ core/src/main/scala/kafka/common/InvalidClientIdException.scala	(working copy)
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class InvalidClientIdException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
Index: core/src/main/scala/kafka/common/Config.scala
===================================================================
--- core/src/main/scala/kafka/common/Config.scala	(revision 0)
+++ core/src/main/scala/kafka/common/Config.scala	(working copy)
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import util.matching.Regex
+import kafka.utils.Logging
+
+trait Config extends Logging {
+
+  def validateChars(prop: String, value: String) {
+    val legalChars = "[a-zA-Z0-9_-]"
+    val rgx = new Regex(legalChars + "*")
+
+    rgx.findFirstIn(value) match {
+      case Some(t) =>
+        if (!t.equals(value))
+          throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+      case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
+    }
+  }
+}
+
+
Index: core/src/main/scala/kafka/common/ClientIdAndBroker.scala
===================================================================
--- core/src/main/scala/kafka/common/ClientIdAndBroker.scala	(revision 0)
+++ core/src/main/scala/kafka/common/ClientIdAndBroker.scala	(working copy)
@@ -0,0 +1,26 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (clientId, brokerInfo) pairs are used to create
+ * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
+ */
+case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
+  override def toString = "%s-%s".format(clientId, brokerInfo)
+}
Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(revision 1420334)
+++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(working copy)
@@ -21,6 +21,7 @@
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection.mutable
+import kafka.common.Topic
 
 object CreateTopicCommand extends Logging {
 
Index: core/src/main/scala/kafka/utils/ClientIdAndTopic.scala
===================================================================
--- core/src/main/scala/kafka/utils/ClientIdAndTopic.scala	(revision 1420334)
+++ core/src/main/scala/kafka/utils/ClientIdAndTopic.scala	(working copy)
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.common.InvalidTopicException
-import kafka.common.InvalidClientIdException
-import util.matching.Regex
-
-object ClientId {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 200 // to prevent hitting filename max length limit
-  private val rgx = new Regex(legalChars + "*")
-
-  def validate(clientId: String) {
-    if (clientId.length > maxNameLength)
-      throw new InvalidClientIdException("ClientId is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(clientId) match {
-      case Some(t) =>
-        if (!t.equals(clientId))
-          throw new InvalidClientIdException("ClientId " + clientId + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidClientIdException("ClientId " + clientId + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}
-
-object Topic {
-  val legalChars = "[a-zA-Z0-9_-]"
-  val maxNameLength = 255
-  private val rgx = new Regex(legalChars + "+")
-
-  def validate(topic: String) {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.length > maxNameLength)
-      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(topic) match {
-      case Some(t) =>
-        if (!t.equals(topic))
-          throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -")
-      case None => throw new InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, _ and -")
-    }
-  }
-}
-
-case class ClientIdAndTopic(clientId: String, topic:String) {
-  override def toString = "%s-%s".format(clientId, topic)
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/metrics/KafkaTimer.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaTimer.scala	(revision 1420334)
+++ core/src/main/scala/kafka/metrics/KafkaTimer.scala	(working copy)
@@ -21,19 +21,18 @@
 
 /**
  * A wrapper around metrics timer object that provides a convenient mechanism
- * to time code blocks. This pattern was borrowed from the metrics-scala_2.9.1
- * package.
- * @param metric The underlying timer object.
+ * to time code blocks for a sequence of timers.
+ * @param metrics The list of underlying timer objects.
  */
-class KafkaTimer(metric: Timer) {
+class KafkaTimer(metrics: Timer*) {
 
   def time[A](f: => A): A = {
-    val ctx = metric.time
+    val ctxSeq = metrics.map(_.time())
     try {
       f
     }
     finally {
-      ctx.stop()
+      ctxSeq.foreach(_.stop())
     }
   }
 }
