diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index d61f724..2a57b8d 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -76,6 +76,11 @@ case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long],
 
 object FetchRequest {
   val CurrentVersion = 1.shortValue()
+  val DefaultCorrelationId = -1
+  val DefaultClientId = ""
+  val DefaultReplicaId = -1
+  val DefaultMaxWait = 0
+  val DefaultMinBytes = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
@@ -95,11 +100,11 @@ object FetchRequest {
 }
 
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
-                        correlationId: Int = 0,
-                        clientId: String = "",
-                        replicaId: Int = -1,
-                        maxWait: Int = 0,
-                        minBytes: Int = 0,
+                        correlationId: Int = FetchRequest.DefaultCorrelationId,
+                        clientId: String = FetchRequest.DefaultClientId,
+                        replicaId: Int = FetchRequest.DefaultReplicaId,
+                        maxWait: Int = FetchRequest.DefaultMaxWait,
+                        minBytes: Int = FetchRequest.DefaultMinBytes,
                         offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
 
   // ensure that a topic "X" appears in at most one OffsetDetail
@@ -138,13 +143,14 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
 }
 
+
 class FetchRequestBuilder() {
-  private var correlationId = -1
+  private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
-  private var clientId = ""
-  private var replicaId = -1
-  private var maxWait = 0
-  private var minBytes = 0
+  private var clientId = FetchRequest.DefaultClientId
+  private var replicaId = FetchRequest.DefaultReplicaId
+  private var maxWait = FetchRequest.DefaultMaxWait
+  private var minBytes = FetchRequest.DefaultMinBytes
   private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b0a100..64fa32e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -66,7 +66,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
     requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
     
     // Now check any outstanding fetches this produce just unblocked
-    val satisfied = produceRequest.data.flatMap(topicData => fetchRequestPurgatory.update(topicData.topic, topicData))
+    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
+    for(topicData <- produceRequest.data) {
+      for(partition <- topicData.partitionData)
+        satisfied ++= fetchRequestPurgatory.update((topicData.topic, partition.partition), topicData)
+    }
+    // send any newly unblocked responses
     for(fetchReq <- satisfied) {
        val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
@@ -137,7 +142,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     } else {
-      val keys: Seq[Any] = fetchRequest.offsetInfo.map(_.topic)
+      // create a list of (topic, partition) pairs to use as keys for this delayed request
+      val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
       val delayedFetch = new DelayedFetch(keys, request, fetchRequest, fetchRequest.maxWait, availableBytes)
       fetchRequestPurgatory.watch(delayedFetch)
     }
diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala
index a48e1f9..e300ad0 100644
--- a/core/src/main/scala/kafka/server/MessageSetSend.scala
+++ b/core/src/main/scala/kafka/server/MessageSetSend.scala
@@ -68,7 +68,4 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: In
   
   def sendSize: Int = size.asInstanceOf[Int] + header.capacity
   
-  // TODO: test this works
-  def empty: Boolean = messages.sizeInBytes == 0
-  
 }
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 8a59c9e..b30a7ee 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -162,6 +162,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
               if(updated == true) {
                 response += curr
                 liveCount -= 1
+                expiredRequestReaper.satisfyRequest()
               }
             }
           }
@@ -184,8 +185,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
     private val needsPurge = new AtomicBoolean(false)
+    /* The count of elements in the delay queue that are unsatisfied */
     private val unsatisfied = new AtomicInteger(0)
     
+    /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
         try {
@@ -194,7 +197,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
         } catch {
           case ie: InterruptedException => 
             if(needsPurge.getAndSet(false)) {
-              val purged = purgeExpired()
+              val purged = purgeSatisfied()
               debug("Forced purge of " + purged + " requests from delay queue.")
             }
           case e: Exception => 
@@ -204,6 +207,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
       shutdownLatch.countDown()
     }
     
+    /** Add a request to be expired */
     def enqueue(t: T) {
       delayed.add(t)
       unsatisfied.incrementAndGet()
@@ -216,6 +220,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
       expirationThread.interrupt()
     }
     
+    /** Shutdown the expiry thread*/
     def shutdown() {
       debug("Shutting down request expiry thread")
       running.set(false)
@@ -223,6 +228,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
       shutdownLatch.await()
     }
     
+    /** Record the fact that we satisfied a request in the stats for the expiry queue */
+    def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
+    
     /**
      * Get the next expired event
      */
@@ -231,6 +239,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
         val curr = delayed.take()
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
+          unsatisfied.getAndDecrement()
           for(key <- curr.keys)
             watchersFor(key).decLiveCount()
           return curr
@@ -242,7 +251,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R] {
     /**
      * Delete all expired events from the delay queue
      */
-    private def purgeExpired(): Int = {
+    private def purgeSatisfied(): Int = {
       var purged = 0
       val iter = delayed.iterator()
       while(iter.hasNext) {
