diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 66638f2..764bb02 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -50,7 +50,6 @@ class Producer[K,V](config: ProducerConfig,
       producerSendThread.start()
   }
 
-  private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
   KafkaMetricsReporter.startReporters(config.props)
@@ -81,7 +80,7 @@ class Producer[K,V](config: ProducerConfig,
   private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
       producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
-      producerTopicStats.getProducerAllTopicStats.messageRate.mark()
+      producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
     }
   }
 
@@ -106,7 +105,8 @@ class Producer[K,V](config: ProducerConfig,
           }
       }
       if(!added) {
-        producerStats.droppedMessageRate.mark()
+        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
+        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + message.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
       }else {
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 62ff544..e1610d3 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -24,7 +24,6 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup {
   val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
   val resendRate = newMeter(clientId + "-ResendsPerSec",  "resends", TimeUnit.SECONDS)
   val failedSendRate = newMeter(clientId + "-FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
-  val droppedMessageRate = newMeter(clientId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index fd0b44e..550c5c7 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit
 class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
   val messageRate = newMeter(metricId + "-MessagesPerSec",  "messages", TimeUnit.SECONDS)
   val byteRate = newMeter(metricId + "-BytesPerSec",  "bytes", TimeUnit.SECONDS)
+  val droppedMessageRate = newMeter(metricId + "-DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }
 
 /**
@@ -35,9 +36,9 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
 class ProducerTopicStats(clientId: String) {
   private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
-  private val allTopicStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
+  private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "All.Topics")) // to differentiate from a topic named AllTopics
 
-  def getProducerAllTopicStats(): ProducerTopicMetrics = allTopicStats
+  def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
 
   def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
     stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 5569cc2..0fd733e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -56,7 +56,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
-          producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
+          producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b39cab..6691147 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,7 +34,7 @@ class ProducerSendThread[K,V](val threadName: String,
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
 
-  newGauge(clientId + "-ProducerQueueSize-" + getId,
+  newGauge(clientId + "-ProducerQueueSize",
           new Gauge[Int] {
             def getValue = queue.size
           })
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4e6c8ea..f7fe0de 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -58,6 +58,12 @@ class ReplicaManager(val config: KafkaConfig,
     }
   )
   newGauge(
+    "PartitionCount",
+    new Gauge[Int] {
+      def getValue = allPartitions.size
+    }
+  )
+  newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
       def getValue = {
