diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 1a3dbd3..628b9ef 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -18,13 +18,13 @@
 package kafka.server
 
 import scala.collection._
-import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.network._
 import kafka.utils._
-import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import java.util
+import com.yammer.metrics.core.Gauge
 
 
 /**
@@ -66,10 +66,13 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
+  private val requestCounter = new AtomicInteger(0)
+  private val CleanupInterval = 100000
+
   newGauge(
-    "NumDelayedRequests",
+    "NumRequests",
     new Gauge[Int] {
-      def getValue = expiredRequestReaper.unsatisfied.get()
+      def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
     }
   )
 
@@ -78,10 +81,22 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
 
+  def purgeSatisfied() {
+    debug("Purging satisfied requests from purgatory.")
+    expiredRequestReaper.forcePurge()
+    val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
+    debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
+  }
+
   /**
    * Add a new delayed request watching the contained keys
    */
   def watch(delayedRequest: T) {
+    if (requestCounter.getAndIncrement() >= CleanupInterval) {
+      requestCounter.set(0)
+      purgeSatisfied()
+    }
+
     for(key <- delayedRequest.keys) {
       var lst = watchersFor(key)
       lst.add(delayedRequest)
@@ -125,37 +140,29 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
    */
   private class Watchers {
 
-    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
-    private val CleanupThresholdSize = 100
-    private val CleanupThresholdPrct = 0.5
 
-    private val requests = new LinkedList[T]
+    private val requests = new util.ArrayList[T]
 
-    /* you can only change this if you have added something or marked something satisfied */
-    var liveCount = 0.0
+    def numRequests = requests.size
 
     def add(t: T) {
       synchronized {
         requests.add(t)
-        liveCount += 1
-        maybePurge()
       }
     }
 
-    private def maybePurge() {
-      if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
+    def purgeSatisfied(): Int = {
+      synchronized {
         val iter = requests.iterator()
+        var purged = 0
         while(iter.hasNext) {
           val curr = iter.next
-          if(curr.satisfied.get())
+          if(curr.satisfied.get()) {
             iter.remove()
+            purged += 1
+          }
         }
-      }
-    }
-
-    def decLiveCount() {
-      synchronized {
-        liveCount -= 1
+        purged
       }
     }
 
@@ -175,11 +182,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
             if(satisfied) {
               iter.remove()
               val updated = curr.satisfied.compareAndSet(false, true)
-              if(updated == true) {
+              if(updated == true)
                 response += curr
-                liveCount -= 1
-                expiredRequestReaper.satisfyRequest()
-              }
             }
           }
         }
@@ -193,16 +197,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
    */
   private class ExpiredRequestReaper extends Runnable with Logging {
     this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
-    /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
-    private val CleanupThresholdSize = 100
-    private val CleanupThresholdPrct = 0.5
 
     private val delayed = new DelayQueue[T]
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
-    private val needsPurge = new AtomicBoolean(false)
-    /* The count of elements in the delay queue that are unsatisfied */
-    private [kafka] val unsatisfied = new AtomicInteger(0)
+
+    def numRequests = delayed.size()
 
     /** Main loop for the expiry thread */
     def run() {
@@ -214,10 +214,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
           }
         } catch {
           case ie: InterruptedException =>
-            if(needsPurge.getAndSet(false)) {
-              val purged = purgeSatisfied()
-              debug("Forced purge of " + purged + " requests from delay queue.")
-            }
+            val purged = purgeSatisfied()
+            debug("Purged %d requests from delay queue.".format(purged))
           case e: Exception =>
             error("Error in long poll expiry thread: ", e)
         }
@@ -228,13 +226,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     /** Add a request to be expired */
     def enqueue(t: T) {
       delayed.add(t)
-      unsatisfied.incrementAndGet()
-      if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
-        forcePurge()
     }
 
-    private def forcePurge() {
-      needsPurge.set(true)
+    def forcePurge() {
       expirationThread.interrupt()
     }
 
@@ -247,9 +241,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
       debug("Shut down complete.")
     }
 
-    /** Record the fact that we satisfied a request in the stats for the expiry queue */
-    def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
     /**
      * Get the next expired event
      */
@@ -258,9 +249,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
         val curr = delayed.take()
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
-          unsatisfied.getAndDecrement()
-          for(key <- curr.keys)
-            watchersFor(key).decLiveCount()
           return curr
         }
       }
@@ -284,4 +272,4 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     }
   }
 
-}
\ No newline at end of file
+}
