diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 4269219..9080b4e 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,13 +20,28 @@ package kafka.server
 import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.cluster.Broker
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging
+                                                                                        with KafkaMetricsGroup {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  newGauge(
+    "MaxFetcherLag",
+    new Gauge[Long] {
+      // current max lag across all fetchers/topics/partitions
+      def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
+        fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
+          curMaxThread.max(fetcherLagStatsEntry._2.lag)
+        }).max(curMaxAll)
+      })
+    }
+  )
+
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
   }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 162c749..91cb2e0 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -219,7 +219,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
     stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
