diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 96bd886..3e497b9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
-        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
+        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+                                       config.groupId, 1) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 4269219..15b7bd3 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,13 +20,45 @@ package kafka.server
 import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.cluster.Broker
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1)
+  extends Logging with KafkaMetricsGroup {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  newGauge(
+    metricPrefix + "-MaxLag",
+    new Gauge[Long] {
+      // current max lag across all fetchers/topics/partitions
+      def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
+        fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
+          curMaxThread.max(fetcherLagStatsEntry._2.lag)
+        }).max(curMaxAll)
+      })
+    }
+  )
+
+  newGauge(
+    metricPrefix + "-MinFetchRate",
+    {
+      new Gauge[Double] {
+        // current min fetch rate across all fetchers/topics/partitions
+        def value = {
+          val headRate: Double =
+            fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+          fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
+            fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+          })
+        }
+      }
+    }
+  )
+
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
   }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 48100f4..b7cbb98 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -221,7 +221,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
     stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 7f775ec..fa4e99b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -20,7 +20,8 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
+                                       "Replica-" + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
