Index: core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(revision 1415362)
+++ core/src/test/scala/unit/kafka/utils/ClientIdTest.scala	(working copy)
@@ -27,7 +27,6 @@
   @Test
   def testInvalidClientIds() {
     val invalidclientIds = new ArrayBuffer[String]()
-    invalidclientIds += (".", "..")
     var longName = "ATCG"
     for (i <- 1 to 6)
       longName += longName
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1415362)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -39,7 +39,7 @@
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
-  val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host + "-port_" + config.port)
+  val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host, config.port))
 
   trace("Instantiating Scala Sync Producer")
 
@@ -150,7 +150,7 @@
   }
 }
 
-class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
+class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
 }
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1415362)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -81,7 +81,7 @@
   ClientId.validate(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
-  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_" + host + "-port_" + port)
+  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId + "-host_%s-port_%s".format(host, port))
 
   private def connect(): BlockingChannel = {
     close
@@ -169,7 +169,7 @@
   }
 }
 
-class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
 }
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 1415362)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(working copy)
@@ -28,7 +28,7 @@
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host, sourceBroker.port),
+                                clientId = FetchRequest.ReplicaFetcherClientId,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,
Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision 1415362)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(working copy)
@@ -42,9 +42,10 @@
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
-  val fetcherStats = new FetcherStats(clientId)
+  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+  val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
   val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
-  val fetcherLagStats = new FetcherLagStats(clientId)
+  val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
 
   /* callbacks to be defined in subclass */
 
@@ -65,7 +66,7 @@
 
   override def doWork() {
     val fetchRequestuilder = new FetchRequestBuilder().
-            clientId(clientId).
+            clientId(clientId + "-" + brokerInfo).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)
