Index: core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala	(revision 1407310)
+++ core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala	(working copy)
@@ -29,15 +29,17 @@
   def testKafkaTimer() {
     val clock = new ManualClock
     val testRegistry = new MetricsRegistry(clock)
-    val metric = testRegistry.newTimer(this.getClass, "TestTimer")
+    val timer1 = testRegistry.newTimer(this.getClass, "TestTimer1")
+    val timer2 = testRegistry.newTimer(this.getClass, "TestTimer2")
+    KafkaTimer.time({clock.addMillis(1000)}, timer1, timer2)
+    KafkaTimer.time({clock.addMillis(1000)}, timer2)
+    assertEquals(1, timer1.getCount())
+    assertTrue((timer1.getMax() - 1000).abs <= Double.Epsilon)
+    assertTrue((timer1.getMin() - 1000).abs <= Double.Epsilon)
 
-    val timer = new KafkaTimer(metric)
-    timer.time {
-      clock.addMillis(1000)
-    }
-    assertEquals(1, metric.getCount())
-    assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
+    assertEquals(2, timer2.getCount())
+    assertTrue((timer2.getMax() - 1000).abs <= Double.Epsilon)
+    assertTrue((timer2.getMin() - 1000).abs <= Double.Epsilon)
   }
 
   private class ManualClock extends Clock {
Index: core/src/main/scala/kafka/log/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/log/FileMessageSet.scala	(revision 1407310)
+++ core/src/main/scala/kafka/log/FileMessageSet.scala	(working copy)
@@ -149,11 +149,8 @@
   /**
    * Commit all written data to the physical disk
    */
-  def flush() = {
-    LogFlushStats.logFlushTimer.time {
-      channel.force(true)
-    }
-  }
+  def flush() = channel.force(true)
+
   
   /**
    * Close this message set
@@ -183,9 +180,4 @@
     channel.position(targetSize)
     _size.set(targetSize)
   }
-  
-}
-
-object LogFlushStats extends KafkaMetricsGroup {
-  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-}
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1407310)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -27,9 +27,11 @@
 import kafka.server.BrokerTopicStat
 import kafka.message._
 import kafka.common._
-import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import java.util.concurrent.TimeUnit
 
+
 object Log {
   val LogFileSuffix = ".log"
   val IndexFileSuffix = ".index"
@@ -481,7 +483,9 @@
     lock synchronized {
       debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
           time.milliseconds)
-      segments.view.last.flush()
+      KafkaTimer.time({segments.view.last.flush()},
+                      LogFlushStats.logFlushTimer,
+                      LogFlushStats.logFlushTimerForPartitions.getOrElseUpdate(name, newTimer("LogFlushRateAndTimeMs-Partition %s".format(name), TimeUnit.MILLISECONDS, TimeUnit.SECONDS)))
       unflushed.set(0)
       lastflushedTime.set(time.milliseconds)
      }
Index: core/src/main/scala/kafka/log/LogSegment.scala
===================================================================
--- core/src/main/scala/kafka/log/LogSegment.scala	(revision 1407310)
+++ core/src/main/scala/kafka/log/LogSegment.scala	(working copy)
@@ -4,7 +4,12 @@
 import java.io.File
 import kafka.message._
 import kafka.utils._
+import com.yammer.metrics.core.Timer
+import java.util.concurrent.TimeUnit
+import kafka.common.TopicAndPartition
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
+
 /**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
  * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
@@ -144,8 +149,8 @@
    * Flush this log segment to disk
    */
   def flush() {
-    messageSet.flush()
-    index.flush()
+      messageSet.flush()
+      index.flush()
   }
   
   /**
@@ -155,5 +160,11 @@
     Utils.swallow(index.close)
     Utils.swallow(messageSet.close)
   }
-  
-}
\ No newline at end of file
+}
+
+
+object LogFlushStats extends KafkaMetricsGroup {
+  val logFlushTimer = newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+  // map of timer for each partitions, key is of the "topic-partitionID" format
+  val logFlushTimerForPartitions = new collection.mutable.HashMap[String, Timer]
+}
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 1407310)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(working copy)
@@ -868,5 +868,5 @@
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
-  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val leaderElectionTimer = newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
 }
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1407310)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -22,7 +22,9 @@
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.IZkChildListener
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
+import kafka.metrics.KafkaTimer
 
+
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
  * transitions to move the replica to another legal state. The different states that a replica can be in are -
@@ -232,7 +234,7 @@
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      ControllerStat.leaderElectionTimer.time {
+      KafkaTimer.time({
         info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
         if(!isShuttingDown.get()) {
           controllerContext.controllerLock synchronized {
@@ -255,7 +257,7 @@
             }
           }
         }
-      }
+      }, ControllerStat.leaderElectionTimer)
     }
   }
 }
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1407310)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -91,9 +91,7 @@
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
     var response: Receive = null
-    ProducerRequestStat.requestTimer.time {
-      response = doSend(producerRequest)
-    }
+    KafkaTimer.time({response = doSend(producerRequest)}, ProducerRequestStat.requestTimer)
     ProducerResponse.readFrom(response.buffer)
   }
 
@@ -153,6 +151,6 @@
 }
 
 object ProducerRequestStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestTimer = newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
   val requestSizeHist = newHistogram("ProducerRequestSize")
 }
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1407310)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -102,6 +102,7 @@
       }
       if(!added) {
         AsyncProducerStats.droppedMessageRate.mark()
+        AsyncProducerStats.perTopicDroppedMessageRate.getOrElseUpdate(data.topic, AsyncProducerStats.newMeter("DroppedMessagesPerSec-" + data.topic,  "drops", TimeUnit.SECONDS))
         error("Event queue is full of unsent messages, could not send event: " + data.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
       }else {
Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(revision 1407310)
+++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala	(working copy)
@@ -19,7 +19,10 @@
 
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Meter
 
+
 object AsyncProducerStats extends KafkaMetricsGroup {
   val droppedMessageRate = newMeter("DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
+  val perTopicDroppedMessageRate = new collection.mutable.HashMap[String, Meter]
 }
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(revision 1407310)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala	(working copy)
@@ -143,9 +143,9 @@
    */
   def fetch(request: FetchRequest): FetchResponse = {
     var response: Receive = null
-    FetchRequestAndResponseStat.requestTimer.time {
-      response = sendRequest(request)
-    }
+    KafkaTimer.time({response = sendRequest(request)},
+      FetchRequestAndResponseStat.requestTimer
+    )
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
     FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
@@ -167,6 +167,6 @@
 }
 
 object FetchRequestAndResponseStat extends KafkaMetricsGroup {
-  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestTimer = newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
   val respondSizeHist = newHistogram("FetchResponseSize")
 }
Index: core/src/main/scala/kafka/metrics/KafkaTimer.scala
===================================================================
--- core/src/main/scala/kafka/metrics/KafkaTimer.scala	(revision 1407310)
+++ core/src/main/scala/kafka/metrics/KafkaTimer.scala	(working copy)
@@ -20,20 +20,17 @@
 import com.yammer.metrics.core.Timer
 
 /**
- * A wrapper around metrics timer object that provides a convenient mechanism
- * to time code blocks. This pattern was borrowed from the metrics-scala_2.9.1
- * package.
- * @param metric The underlying timer object.
+ * Wrapping the timing function, which supports multiple timers
  */
-class KafkaTimer(metric: Timer) {
 
-  def time[A](f: => A): A = {
-    val ctx = metric.time
+object KafkaTimer {
+  def time[A](f: => A, times: Timer*): A = {
+    val ctxSeq = times.map(_.time())
     try {
       f
     }
     finally {
-      ctx.stop()
+      ctxSeq.foreach(_.stop())
     }
   }
 }
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1407310)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -52,7 +52,15 @@
       def getValue = leaderPartitions.size
     }
   )
+
   newGauge(
+    "PartitionCount",
+    new Gauge[Int] {
+      def getValue = allPartitions.size
+    }
+  )
+
+  newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
       def getValue = {
