diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 96bd886..3661ef5 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -29,6 +29,8 @@ import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
 import java.util.concurrent.atomic.AtomicInteger
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 /**
  *  Usage:
@@ -38,7 +40,8 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
-        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
+        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1)
+        with KafkaMetricsGroup {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
@@ -47,6 +50,13 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private var leaderFinderThread: ShutdownableThread = null
   private val correlationId = new AtomicInteger(0)
 
+  newGauge(
+    "%s-%s-MaxLag".format(config.groupId, config.clientId),
+    new Gauge[Long] {
+      def value = maxLag
+    }
+  )
+
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
     override def doWork() {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 4269219..62f94c1 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -27,6 +27,15 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  protected def maxLag = {
+    // current max lag across all fetchers/topics/partitions
+    fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
+      fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
+        curMaxThread.max(fetcherLagStatsEntry._2.lag)
+      }).max(curMaxAll)
+    })
+  }
+
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
   }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 162c749..91cb2e0 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -219,7 +219,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
     stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 7f775ec..cb0f9cb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -18,10 +18,19 @@
 package kafka.server
 
 import kafka.cluster.Broker
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers)
+        with KafkaMetricsGroup {
 
+  newGauge(
+    "Replica-%d-MaxLag".format(brokerConfig.brokerId),
+    new Gauge[Long] {
+      def value = maxLag
+    }
+  )
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
   }
