Index: core/src/main/scala/kafka/server/RequestPurgatory.scala
===================================================================
--- core/src/main/scala/kafka/server/RequestPurgatory.scala	(revision 1417098)
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala	(working copy)
@@ -73,6 +73,13 @@
     }
   )
 
+  newGauge(
+    "WatchersForKey",
+    new Gauge[Int] {
+      def getValue = watchersForKey.values.map(_.requests.size()).sum
+    }
+  )
+
   /* background thread expiring requests that have been waiting too long */
   private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
@@ -129,7 +136,7 @@
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
 
-    private val requests = new LinkedList[T]
+    val requests = new LinkedList[T]
 
     /* you can only change this if you have added something or marked something satisfied */
     var liveCount = 0.0
@@ -277,6 +284,18 @@
         val curr = iter.next()
         if(curr.satisfied.get) {
           iter.remove()
+          val keys = curr.keys
+          for(key <- keys) {
+            val watchers = watchersFor(key)
+            watchers synchronized {
+              val iter = watchers.requests.iterator()
+              while(iter.hasNext) {
+                val request = iter.next()
+                if(request.satisfied.get())
+                  iter.remove()
+              }
+            }
+          }
           purged += 1
         }
       }
