diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 1a3dbd3..3c21432 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -73,6 +73,13 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     }
   )
 
+  newGauge(
+    "WatchersForKey",
+    new Gauge[Int] {
+      def getValue = watchersForKey.values.map(_.requests.size()).sum
+    }
+  )
+
   /* background thread expiring requests that have been waiting too long */
   private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
@@ -129,7 +136,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
 
-    private val requests = new LinkedList[T]
+    val requests = new LinkedList[T]
 
     /* you can only change this if you have added something or marked something satisfied */
     var liveCount = 0.0
@@ -211,6 +218,17 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
           val curr = pollExpired()
           curr synchronized {
             expire(curr)
+            curr.keys.foreach(key => {
+              val watchers = watchersFor(key)
+              watchers synchronized {
+                val iter = watchers.requests.iterator()
+                while(iter.hasNext) {
+                  val request = iter.next()
+                  if(request.satisfied.get())
+                    iter.remove()
+                }
+              }
+            })
           }
         } catch {
           case ie: InterruptedException =>
@@ -284,4 +302,4 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     }
   }
 
-}
\ No newline at end of file
+}
